WebSocket Reference¶
dgbit-api exposes two WebSocket endpoints. They are currently scaffolds: the routes accept connections and echo client messages back, but the NNG event bus is not yet wired into them. Treat the documented event payloads below as the planned envelope shape, not as something the API will emit unprompted today.
Connection¶
Event Stream¶
The route is implemented in dgbit_api.api.routes.websocket_events. Today it sends {"type": "ping", "received": <client message>} back for every message received and registers the connection in a ConnectionManager so that future event-bus integration can broadcast to all subscribers.
Job-Specific Stream¶
The route emits a single {"type": "subscribed", "job_uuid": "...", "message": "..."} on connect and replies to client messages with {"type": "ack", "job_uuid": "..."}.
Event Format¶
All events follow this format:
Planned Event Types¶
The EventPublisher / EventSubscriber classes in dgbit_services.events define a generic NNG pub/sub layer. The event-type strings below are the conventions used elsewhere in the codebase (notably the route docstring lists job.created, job.started, job.completed, job.failed, and trade.executed). Wire them into the WebSocket layer once the bridge is implemented.
Job Events¶
job.created¶
Emitted when a new job is created.
{
"type": "job.created",
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"job_type": "backtest",
"status": "pending"
}
}
job.started¶
Emitted when a job starts processing.
{
"type": "job.started",
"timestamp": "2024-01-15T10:30:01Z",
"payload": {
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"job_type": "backtest",
"status": "running"
}
}
job.completed¶
Emitted when a job completes successfully.
{
"type": "job.completed",
"timestamp": "2024-01-15T10:30:45Z",
"payload": {
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"job_type": "backtest",
"status": "completed",
"result": {
"total_trades": 15,
"win_rate": 0.60,
"total_return": 0.12
}
}
}
job.failed¶
Emitted when a job fails.
{
"type": "job.failed",
"timestamp": "2024-01-15T10:30:45Z",
"payload": {
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"job_type": "backtest",
"status": "failed",
"error": "Insufficient data for backtest"
}
}
Trade Events¶
trade.entered¶
Emitted when a position is opened.
{
"type": "trade.entered",
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"symbol": "BTCUSDT",
"side": "long",
"quantity": 0.001,
"entry_price": 42150.00,
"take_profit": 42993.00,
"stop_loss": 41728.50,
"strategy": "wavelet_reversal"
}
}
trade.exited¶
Emitted when a position is closed.
{
"type": "trade.exited",
"timestamp": "2024-01-15T11:15:00Z",
"payload": {
"symbol": "BTCUSDT",
"side": "long",
"quantity": 0.001,
"entry_price": 42150.00,
"exit_price": 42993.00,
"exit_type": "take_profit",
"pnl": 0.843,
"pnl_pct": 2.0,
"duration_minutes": 45
}
}
Signal Events¶
signal.generated¶
Emitted when a strategy generates a signal.
{
"type": "signal.generated",
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"strategy": "wavelet_reversal",
"symbol": "BTCUSDT",
"signal_value": 0.82,
"direction": "long",
"confidence": "high"
}
}
Client Examples¶
JavaScript/Browser¶
const ws = new WebSocket('ws://localhost:8000/api/ws/events');
ws.onopen = () => {
console.log('Connected to event stream');
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(`Event: ${data.type}`, data.payload);
switch (data.type) {
case 'job.completed':
handleJobComplete(data.payload);
break;
case 'trade.entered':
handleTradeEnter(data.payload);
break;
case 'trade.exited':
handleTradeExit(data.payload);
break;
}
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
ws.onclose = () => {
console.log('Disconnected from event stream');
// Implement reconnection logic
};
function handleJobComplete(payload) {
console.log(`Job ${payload.job_id} completed`);
console.log(`Win rate: ${(payload.result.win_rate * 100).toFixed(1)}%`);
}
Python¶
import asyncio
import websockets
import json
async def event_listener():
uri = "ws://localhost:8000/api/ws/events"
async with websockets.connect(uri) as ws:
print("Connected to event stream")
async for message in ws:
event = json.loads(message)
event_type = event['type']
payload = event['payload']
print(f"Event: {event_type}")
if event_type == 'job.completed':
print(f" Job {payload['job_id']} completed")
print(f" Win rate: {payload['result']['win_rate']:.1%}")
elif event_type == 'trade.entered':
print(f" Entered {payload['side']} {payload['symbol']}")
print(f" Entry: {payload['entry_price']}")
elif event_type == 'trade.exited':
print(f" Exited {payload['symbol']}")
print(f" PnL: {payload['pnl_pct']:.2%}")
asyncio.run(event_listener())
Python with Reconnection¶
import asyncio
import websockets
import json
from typing import Callable
class EventClient:
def __init__(self, url: str, handlers: dict[str, Callable] = None):
self.url = url
self.handlers = handlers or {}
self.ws = None
self.running = False
async def connect(self):
self.running = True
while self.running:
try:
async with websockets.connect(self.url) as ws:
self.ws = ws
print("Connected")
await self._listen()
except websockets.ConnectionClosed:
print("Connection closed, reconnecting...")
await asyncio.sleep(5)
except Exception as e:
print(f"Error: {e}, reconnecting...")
await asyncio.sleep(5)
async def _listen(self):
async for message in self.ws:
event = json.loads(message)
handler = self.handlers.get(event['type'])
if handler:
await handler(event['payload'])
async def stop(self):
self.running = False
if self.ws:
await self.ws.close()
# Usage
async def on_trade_entered(payload):
print(f"Trade entered: {payload}")
client = EventClient(
"ws://localhost:8000/api/ws/events",
handlers={
'trade.entered': on_trade_entered,
}
)
asyncio.run(client.connect())
Heartbeat¶
The server sends periodic ping frames to keep the connection alive. Most WebSocket clients handle this automatically.
If implementing a custom client, respond to ping frames with pong frames.
Error Handling¶
Connection Errors¶
If the WebSocket connection fails:
- Wait 5 seconds
- Attempt to reconnect
- Use exponential backoff for repeated failures
Message Errors¶
Invalid messages are logged server-side. The connection remains open.
Best Practices¶
- Always handle reconnection - Connections can drop
- Process events asynchronously - Don't block the message loop
- Filter events - Only process events you care about
- Implement heartbeat monitoring - Detect stale connections