Plugin Development
DataMgmt Node supports a plugin system for extending functionality. This guide covers creating, installing, and managing plugins.
Overview
Plugins can:
- Add custom data processing logic
- Integrate with external services
- Implement custom compliance rules
- Extend API functionality
- Add custom event handlers
Plugin Structure
Plugins are Python modules placed in the plugins/ directory. Each plugin follows a naming convention:
plugins/
├── __init__.py
├── audit_plugin.py # Class: AuditPlugin
├── backup_plugin.py # Class: BackupPlugin
└── notify_plugin.py # Class: NotifyPlugin
Naming Convention
| File Name | Expected Class Name |
|---|---|
example_plugin.py |
Example_pluginPlugin |
audit_plugin.py |
Audit_pluginPlugin |
custom_plugin.py |
Custom_pluginPlugin |
Note
The class name is derived from the filename: capitalize the module name and append Plugin.
Creating a Plugin
Basic Plugin Template
# plugins/example_plugin.py
import logging
logger = logging.getLogger(__name__)
class Example_pluginPlugin:
"""Example plugin for DataMgmt Node.
This plugin demonstrates the basic structure and available hooks.
"""
def __init__(self, node):
"""Initialize the plugin.
Args:
node: The Node instance, providing access to all services.
"""
self.node = node
self.name = "Example Plugin"
self.version = "1.0.0"
def initialize(self):
"""Called when the plugin is loaded.
Use this to:
- Set up resources
- Register event handlers
- Initialize connections
"""
logger.info(f"{self.name} v{self.version} initialized")
# Example: Subscribe to events
self.node.event_bus.subscribe(
EventType.DATA_SHARED,
self.on_data_shared
)
def shutdown(self):
"""Called when the node is stopping.
Use this to:
- Clean up resources
- Close connections
- Save state
"""
logger.info(f"{self.name} shutting down")
async def on_data_shared(self, event):
"""Handle data shared events."""
data_hash = event.data.get('data_hash')
logger.info(f"Data shared: {data_hash[:16]}...")
Accessing Node Services
Plugins have full access to the node instance:
class My_pluginPlugin:
def __init__(self, node):
self.node = node
def initialize(self):
# Access blockchain interface
balance = self.node.blockchain_interface.get_balance(address)
# Access data manager
data = self.node.data_manager.get_data(hash)
# Access token manager
tokens = self.node.token_manager.supported_tokens
# Access P2P network
peers = self.node.p2p_network.get_connected_peers()
# Access compliance manager
history = self.node.compliance_manager.get_compliance_history()
# Encrypt/decrypt data
encrypted = self.node.encrypt_data("secret")
decrypted = self.node.decrypt_data(encrypted)
# Publish events to dashboard
await self.node.event_bus.publish(Event(
type=EventType.COMPLIANCE_EVENT,
data={'custom': 'data'}
))
Plugin Examples
Audit Logger Plugin
Log all data operations to a file:
# plugins/audit_plugin.py
import json
import logging
from datetime import datetime
from pathlib import Path
from dashboard.event_bus import EventType
logger = logging.getLogger(__name__)
class Audit_pluginPlugin:
"""Audit logging plugin for compliance tracking."""
def __init__(self, node):
self.node = node
self.audit_file = Path(node.config.data_dir) / 'audit.log'
def initialize(self):
# Subscribe to relevant events
events = [
EventType.DATA_SHARED,
EventType.DATA_RECEIVED,
EventType.TRANSFER_COMPLETED,
EventType.TRANSFER_FAILED,
]
for event_type in events:
self.node.event_bus.subscribe(event_type, self._log_event)
logger.info(f"Audit plugin initialized, logging to {self.audit_file}")
def shutdown(self):
self._log_entry("SHUTDOWN", {"node_id": self.node.config.node_id})
async def _log_event(self, event):
"""Log event to audit file."""
self._log_entry(event.type.value, event.data)
def _log_entry(self, event_type: str, data: dict):
"""Write audit log entry."""
entry = {
'timestamp': datetime.utcnow().isoformat(),
'node_id': self.node.config.node_id,
'event': event_type,
'data': data
}
with open(self.audit_file, 'a') as f:
f.write(json.dumps(entry) + '\n')
Webhook Notification Plugin
Send notifications to external services:
# plugins/webhook_plugin.py
import aiohttp
import logging
from dashboard.event_bus import EventType
logger = logging.getLogger(__name__)
class Webhook_pluginPlugin:
"""Send webhook notifications for important events."""
WEBHOOK_URL = "https://hooks.example.com/notify"
def __init__(self, node):
self.node = node
self.session = None
def initialize(self):
self.session = aiohttp.ClientSession()
# Subscribe to important events
self.node.event_bus.subscribe(
EventType.TRANSFER_COMPLETED,
self._notify
)
self.node.event_bus.subscribe(
EventType.ERROR,
self._notify
)
logger.info("Webhook plugin initialized")
def shutdown(self):
if self.session:
# Note: In real code, use async cleanup
pass
async def _notify(self, event):
"""Send webhook notification."""
try:
payload = {
'event': event.type.value,
'data': event.data,
'timestamp': event.timestamp,
'node_id': self.node.config.node_id
}
async with self.session.post(
self.WEBHOOK_URL,
json=payload,
timeout=aiohttp.ClientTimeout(total=5)
) as response:
if response.status != 200:
logger.warning(f"Webhook failed: {response.status}")
except Exception as e:
logger.error(f"Webhook error: {e}")
Data Validation Plugin
Add custom data validation before sharing:
# plugins/validation_plugin.py
import json
import logging
import re
logger = logging.getLogger(__name__)
class Validation_pluginPlugin:
"""Validate data before sharing."""
# Patterns to block (PII, secrets, etc.)
BLOCKED_PATTERNS = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{16}\b', # Credit card
r'password\s*[:=]\s*\S+', # Passwords
r'api[_-]?key\s*[:=]\s*\S+', # API keys
]
def __init__(self, node):
self.node = node
self._original_share = None
def initialize(self):
# Wrap the share_data method
self._original_share = self.node.share_data
self.node.share_data = self._validated_share
logger.info("Validation plugin initialized")
def shutdown(self):
# Restore original method
if self._original_share:
self.node.share_data = self._original_share
async def _validated_share(self, data, recipient, **kwargs):
"""Validate data before sharing."""
# Check for blocked patterns
data_str = str(data)
for pattern in self.BLOCKED_PATTERNS:
if re.search(pattern, data_str, re.IGNORECASE):
raise ValueError(
f"Data contains blocked pattern: {pattern}"
)
# Check data size
if len(data_str) > 1024 * 1024: # 1MB
raise ValueError("Data exceeds maximum size (1MB)")
# Call original method
return await self._original_share(data, recipient, **kwargs)
Configuration
Plugin Directory
Set the plugin directory in your .env:
Or use an absolute path:
Plugin Configuration Files
Plugins can read their own configuration:
# plugins/configurable_plugin.py
import json
from pathlib import Path
class Configurable_pluginPlugin:
def __init__(self, node):
self.node = node
self.config = self._load_config()
def _load_config(self):
config_file = Path(self.node.config.plugin_dir) / 'configurable.json'
if config_file.exists():
with open(config_file) as f:
return json.load(f)
return {'enabled': True} # Defaults
Plugin Lifecycle
sequenceDiagram
participant Node
participant PluginManager
participant Plugin
Node->>PluginManager: load_plugins()
PluginManager->>Plugin: __init__(node)
PluginManager->>Plugin: initialize()
Note over Plugin: Plugin is active
Node->>PluginManager: shutdown_plugins()
PluginManager->>Plugin: shutdown()
Note over Plugin: Plugin is stopped
Lifecycle Methods
| Method | When Called | Purpose |
|---|---|---|
__init__(node) |
Plugin instantiation | Store node reference, set up instance |
initialize() |
After construction | Start services, subscribe to events |
shutdown() |
Node stopping | Clean up resources, save state |
Best Practices
Error Handling
Always handle errors gracefully:
def initialize(self):
try:
self._setup_resources()
except Exception as e:
logger.error(f"Plugin init failed: {e}")
# Don't raise - allow node to continue
Async Operations
Use async for I/O operations:
async def on_event(self, event):
async with aiohttp.ClientSession() as session:
await session.post(url, json=event.data)
Resource Cleanup
Always clean up in shutdown:
def shutdown(self):
if self.connection:
self.connection.close()
if self.file_handle:
self.file_handle.close()
Logging
Use module-level loggers:
import logging
logger = logging.getLogger(__name__)
# Use appropriate levels
logger.debug("Detailed info")
logger.info("Normal operation")
logger.warning("Unexpected but handled")
logger.error("Errors needing attention")
Debugging Plugins
Enable Debug Logging
Check Plugin Loading
# In Python console
from datamgmtnode.services.plugin_manager import PluginManager
pm = PluginManager(node, './plugins')
pm.load_plugins()
print(pm.plugins) # See loaded plugins
Common Issues
| Issue | Cause | Solution |
|---|---|---|
| Plugin not loading | Wrong class name | Check naming convention |
| Import error | Missing dependency | Install required packages |
| Plugin crashes node | Unhandled exception | Add try/except in initialize() |
| Events not received | Wrong subscription | Verify EventType enum value |
See Also
- Architecture - System architecture
- API Reference - API documentation
- Testing - Testing plugins