Skip to content

Architecture

Technical architecture of DataMgmt Node.

System Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                              DataMgmt Node                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                         API Layer                                    │   │
│  │  ┌──────────────────────┐    ┌──────────────────────┐              │   │
│  │  │    Internal API      │    │    External API      │              │   │
│  │  │    (Port 8080)       │    │    (Port 8081)       │              │   │
│  │  │  • Health check      │    │  • Share data        │              │   │
│  │  │  • Token balance     │    │  • Get data          │              │   │
│  │  │  • Token transfer    │    │  • Verify compliance │              │   │
│  │  │  • Token management  │    │  • Network stats     │              │   │
│  │  └──────────────────────┘    └──────────────────────┘              │   │
│  │                    ↓                      ↓                         │   │
│  │  ┌──────────────────────────────────────────────────────────────┐  │   │
│  │  │                    Middleware Layer                           │  │   │
│  │  │  • Rate Limiting  • Error Handling  • Request Validation     │  │   │
│  │  └──────────────────────────────────────────────────────────────┘  │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                     ↓                                       │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                         Service Layer                                │   │
│  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────────┐   │   │
│  │  │   Data     │ │   Token    │ │  Payment   │ │   Compliance   │   │   │
│  │  │  Manager   │ │  Manager   │ │ Processor  │ │    Manager     │   │   │
│  │  └────────────┘ └────────────┘ └────────────┘ └────────────────┘   │   │
│  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────────┐   │   │
│  │  │    Key     │ │   Auth     │ │  Plugin    │ │                │   │   │
│  │  │  Manager   │ │  Module    │ │  Manager   │ │                │   │   │
│  │  └────────────┘ └────────────┘ └────────────┘ └────────────────┘   │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                     ↓                                       │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                        Infrastructure Layer                          │   │
│  │  ┌────────────────┐  ┌────────────────┐  ┌────────────────────┐    │   │
│  │  │   P2P Network  │  │   Blockchain   │  │      Storage       │    │   │
│  │  │   (Kademlia)   │  │   Interface    │  │  (LevelDB/RocksDB) │    │   │
│  │  └────────────────┘  └────────────────┘  └────────────────────┘    │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Component Details

API Layer

Internal API (api/internal_api.py)

Handles node management operations:

class InternalAPI:
    """Internal API for node management operations (port 8080)."""

    def __init__(self, node):
        self.node = node
        self.rate_limiter = create_internal_rate_limiter()

    async def start(self):
        app = aiohttp.web.Application(middlewares=[...])
        app.router.add_get('/health', self.health_check)
        app.router.add_get('/balance/{address}', self.get_balance)
        app.router.add_post('/transfer', self.transfer)
        app.router.add_get('/tokens', self.list_tokens)
        app.router.add_post('/tokens', self.add_token)

External API (api/external_api.py)

Handles data sharing operations:

class ExternalAPI:
    """External API for data sharing operations (port 8081)."""

    async def start(self):
        app = aiohttp.web.Application(middlewares=[...])
        app.router.add_get('/health', self.health_check)
        app.router.add_post('/share_data', self.share_data)
        app.router.add_get('/data/{data_hash}', self.get_data)
        app.router.add_get('/verify_data/{data_hash}', self.verify_data)
        app.router.add_get('/compliance_history', self.get_compliance_history)
        app.router.add_get('/network/stats', self.get_network_stats)
        app.router.add_get('/network/peers', self.get_peers)

Service Layer

DataManager (services/data_manager.py)

Local data storage using LevelDB or RocksDB:

class DataManager:
    def __init__(self, db_path):
        self.db = self._init_database()  # LevelDB or RocksDB

    def store_data(self, key, value): ...
    def get_data(self, key): ...
    def delete_data(self, key): ...

TokenManager (services/token_manager.py)

ERC-20 token management:

class TokenManager:
    def __init__(self, blockchain_interface, native_token_address):
        self.blockchain = blockchain_interface
        self.supported_tokens = {native_token_address}

    def get_balance(self, address, token): ...
    def transfer_tokens(self, from_addr, to_addr, amount, token): ...
    def add_supported_token(self, address, abi): ...

KeyManager (services/key_manager.py)

Secure encryption key management:

class KeyManager:
    def __init__(self, keys_dir, master_password=None):
        self._master_password = master_password or os.getenv('KEY_MASTER_PASSWORD')
        self._keys: dict[int, bytes] = {}

    def initialize(self) -> Fernet: ...
    def rotate_key(self) -> int: ...
    def get_cipher(self, version: int) -> Fernet: ...

ComplianceManager (services/compliance_manager.py)

Blockchain compliance recording:

class ComplianceManager:
    def record_compliance_event(self, event_type, event_data): ...
    def verify_compliance(self, event_type, event_hash): ...
    def get_compliance_history(self, filters=None): ...

Infrastructure Layer

P2PNetwork (network/p2p_network.py)

Kademlia-based peer-to-peer network:

class P2PNetwork:
    def __init__(self, node, port, initial_peers, data_dir):
        self.dht_server = None
        self.known_peers: Dict[str, PeerInfo] = {}

    async def start(self): ...
    async def stop(self): ...
    async def send_data(self, data_hash, data): ...
    async def get_data(self, data_hash): ...

BlockchainInterface (blockchain/evm_blockchain_interface.py)

EVM blockchain interaction:

class EVMBlockchainInterface:
    def __init__(self, rpc_url, private_key):
        self.w3 = Web3(Web3.HTTPProvider(rpc_url))
        self.account = Account.from_key(private_key)

    def connect(self): ...
    def send_transaction(self, tx_data): ...
    def call_contract(self, contract_address, abi, function, *args): ...

Data Flow

Share Data Flow

Client                 External API           Node Core              P2P Network
   │                        │                     │                       │
   │── POST /share_data ──>│                     │                       │
   │                        │── validate ──────>│                       │
   │                        │                     │── encrypt ─────────>│
   │                        │                     │                       │
   │                        │                     │── store locally ───>│
   │                        │                     │                       │
   │                        │                     │── distribute ──────>│
   │                        │                     │                       │
   │                        │                     │── record compliance ─>│
   │                        │                     │                       │
   │<── tx_hash ───────────│<── response ───────│                       │

Retrieve Data Flow

Client                 External API           Node Core              P2P Network
   │                        │                     │                       │
   │── GET /data/{hash} ──>│                     │                       │
   │                        │── validate ──────>│                       │
   │                        │                     │── check local ─────>│
   │                        │                     │                       │
   │                        │                     │   [if not found]     │
   │                        │                     │── query DHT ───────>│
   │                        │                     │                       │
   │                        │                     │<── data ────────────│
   │                        │                     │                       │
   │<── data ──────────────│<── response ───────│                       │

Configuration

NodeConfig

class NodeConfig:
    blockchain_type: str      # 'evm'
    blockchain_url: str       # RPC endpoint
    private_key: str          # Signing key
    native_token_address: str # Token contract
    db_path: str              # LevelDB path
    sqlite_db_path: str       # SQLite path
    p2p_port: int             # P2P listening port
    plugin_dir: str           # Plugin directory
    node_id: str              # Node identifier
    node_signature: str       # Node signature
    initial_peers: list       # Bootstrap peers
    data_dir: str             # Data directory

    def validate(self): ...   # Validate configuration

Configuration Validation

def validate(self):
    errors = []

    if self.blockchain_type not in ['evm']:
        errors.append("Unsupported blockchain_type")

    if not self.blockchain_url:
        errors.append("blockchain_url is required")

    if not (1 <= self.p2p_port <= 65535):
        errors.append("p2p_port out of range")

    if errors:
        raise ConfigurationError(errors)

Error Handling

Middleware Pattern

@aiohttp.web.middleware
async def error_middleware(self, request, handler):
    try:
        return await handler(request)
    except ValidationError as e:
        return json_response({'error': e.message}, status=422)
    except aiohttp.web.HTTPException:
        raise
    except Exception as e:
        logger.exception(f"Error: {e}")
        return json_response({'error': 'Internal error'}, status=500)

Validation Errors

class ValidationError(Exception):
    def __init__(self, message: str, field: str = None):
        self.message = message
        self.field = field

Plugin System

Plugin Interface

class BasePlugin:
    def __init__(self, node):
        self.node = node

    def initialize(self):
        """Called when plugin is loaded."""
        pass

    def shutdown(self):
        """Called when node shuts down."""
        pass

Plugin Loading

class PluginManager:
    def load_plugins(self):
        for filename in os.listdir(self.plugin_dir):
            if filename.endswith('.py'):
                module = __import__(f'plugins.{module_name}')
                plugin_class = getattr(module, f'{name}Plugin')
                plugin = plugin_class(self.node)
                plugin.initialize()

Testing

Test Structure

tests/
├── test_authorisation.py
├── test_compliance_manager.py
├── test_data_manager.py
├── test_p2p_network.py
├── test_payment_processor.py
├── test_token_manager.py
├── test_api_validation.py
├── test_rate_limiter.py
├── test_config.py
└── test_key_manager.py

Test Patterns

class TestDataManager:
    @pytest.fixture
    def temp_db_path(self, tmp_path):
        return str(tmp_path / "testdb")

    def test_store_and_get_data(self, temp_db_path):
        dm = DataManager(temp_db_path)
        dm.store_data("key1", "value1")
        assert dm.get_data("key1") == "value1"

Next Steps