Building Custom Venues¶
Add support for new trading venues by implementing the BaseVenue interface.
The BaseVenue Interface¶
from polybot.venues.base import (
BaseVenue, Ticker, Market, Order, OrderResult,
Position, Balance
)
from polybot.venues.types import (
VenueType, VenueCapabilities, MarketType,
OrderSide, OrderStatus, OrderType
)
class MyVenue(BaseVenue):
"""Custom venue implementation."""
venue_type = VenueType.CUSTOM # Add to VenueType enum
def __init__(self, settings=None):
super().__init__(settings)
self._client = None
def get_capabilities(self) -> VenueCapabilities:
"""Declare what this venue supports."""
return VenueCapabilities(
market_types=[MarketType.BINARY, MarketType.SCALAR],
supports_limit_orders=True,
supports_market_orders=True,
supports_websocket=True,
supports_margin=False,
max_leverage=1.0,
)
# === Lifecycle ===
async def connect(self) -> None:
"""Initialize connections and authenticate."""
self._client = MyVenueClient(
api_key=self._settings.my_venue.api_key,
api_secret=self._settings.my_venue.api_secret,
)
await self._client.authenticate()
self._connected = True
async def disconnect(self) -> None:
"""Clean up connections."""
if self._client:
await self._client.close()
self._connected = False
# === Market Data ===
async def get_markets(
self,
market_type: Optional[MarketType] = None
) -> List[Market]:
"""Fetch available markets."""
raw_markets = await self._client.get_markets()
markets = [self._parse_market(m) for m in raw_markets]
if market_type:
markets = [m for m in markets if m.market_type == market_type]
return markets
async def get_ticker(self, symbol: str) -> Ticker:
"""Get current prices for a symbol."""
data = await self._client.get_ticker(symbol)
return Ticker(
symbol=symbol,
venue=self.venue_type,
bid=data["bid"],
ask=data["ask"],
last=data.get("last"),
volume_24h=data.get("volume"),
)
async def subscribe_prices(
self,
symbols: List[str],
callback: Callable[[Ticker], None],
) -> None:
"""Subscribe to real-time price updates."""
async def on_message(data):
ticker = self._parse_ticker(data)
callback(ticker)
await self._client.ws_subscribe(symbols, on_message)
# === Trading ===
async def place_order(self, order: Order) -> OrderResult:
"""Submit an order."""
# Shadow mode simulation
if self.shadow_mode:
return self._simulate_order(order)
try:
result = await self._client.place_order(
symbol=order.symbol,
side=order.side.value,
order_type=order.order_type.value,
size=order.size,
price=order.price,
)
return OrderResult(
success=True,
order_id=result["order_id"],
status=OrderStatus.PENDING,
venue=self.venue_type,
)
except Exception as e:
return OrderResult(
success=False,
error=str(e),
venue=self.venue_type,
)
async def cancel_order(
self,
order_id: str,
symbol: Optional[str] = None
) -> bool:
"""Cancel an open order."""
try:
await self._client.cancel_order(order_id)
return True
except Exception:
return False
async def get_order(
self,
order_id: str,
symbol: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""Get order details."""
return await self._client.get_order(order_id)
async def get_open_orders(
self,
symbol: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Get all open orders."""
return await self._client.get_open_orders(symbol)
# === Positions & Account ===
async def get_positions(self) -> List[Position]:
"""Get all open positions."""
raw_positions = await self._client.get_positions()
return [self._parse_position(p) for p in raw_positions]
async def get_balance(
self,
currency: Optional[str] = None
) -> Balance:
"""Get account balance."""
data = await self._client.get_balance()
return Balance(
venue=self.venue_type,
currency=currency or "USD",
total=data["total"],
available=data["available"],
locked=data.get("locked", 0),
)
# === Helper Methods ===
def _parse_market(self, raw: dict) -> Market:
"""Convert raw API response to Market."""
return Market(
symbol=raw["symbol"],
venue=self.venue_type,
market_type=MarketType.BINARY,
description=raw.get("description"),
is_active=raw.get("active", True),
)
def _parse_ticker(self, raw: dict) -> Ticker:
"""Convert raw WebSocket message to Ticker."""
return Ticker(
symbol=raw["symbol"],
venue=self.venue_type,
bid=raw["bid"],
ask=raw["ask"],
)
def _parse_position(self, raw: dict) -> Position:
"""Convert raw position to Position."""
return Position(
symbol=raw["symbol"],
venue=self.venue_type,
side="long" if raw["size"] > 0 else "short",
size=abs(raw["size"]),
entry_price=raw["entry_price"],
)
def _simulate_order(self, order: Order) -> OrderResult:
"""Simulate order for shadow mode."""
return OrderResult(
success=True,
order_id=f"shadow_{uuid.uuid4().hex[:8]}",
status=OrderStatus.FILLED,
filled_size=order.size,
filled_price=order.price,
venue=self.venue_type,
)
Required Methods¶
| Method | Description |
|---|---|
get_capabilities() |
Return venue capabilities |
connect() |
Initialize connections |
disconnect() |
Clean up |
get_markets() |
List available markets |
get_ticker() |
Get current prices |
subscribe_prices() |
Real-time price stream |
place_order() |
Submit order |
cancel_order() |
Cancel order |
get_order() |
Get order details |
get_open_orders() |
List open orders |
get_positions() |
List positions |
get_balance() |
Get account balance |
Configuration¶
Add venue configuration:
# config.py
class MyVenueConfig(BaseSettings):
model_config = SettingsConfigDict(
env_prefix="MY_VENUE_",
env_file=".env"
)
api_key: str = Field(default="")
api_secret: str = Field(default="")
base_url: str = Field(default="https://api.myvenue.com")
environment: str = Field(default="sandbox")
Add to Settings:
Registration¶
Add to venue types:
# venues/types.py
class VenueType(Enum):
POLYMARKET = "polymarket"
KALSHI = "kalshi"
BINANCE = "binance"
MY_VENUE = "my_venue" # Add your venue
Register in venues module:
# venues/__init__.py
from polybot.venues.my_venue import MyVenue
VENUE_REGISTRY = {
VenueType.MY_VENUE: MyVenue,
# ...
}
Testing¶
import pytest
from polybot.venues.my_venue import MyVenue
@pytest.fixture
async def venue():
v = MyVenue()
await v.connect()
yield v
await v.disconnect()
async def test_get_markets(venue):
markets = await venue.get_markets()
assert len(markets) > 0
assert all(m.venue == VenueType.MY_VENUE for m in markets)
async def test_get_ticker(venue):
ticker = await venue.get_ticker("BTC-USD")
assert ticker.bid > 0
assert ticker.ask >= ticker.bid
async def test_shadow_order(venue):
venue.set_shadow_mode(True)
order = Order(
symbol="BTC-USD",
side=OrderSide.BUY,
order_type=OrderType.LIMIT,
size=1.0,
price=50000.0,
)
result = await venue.place_order(order)
assert result.success
assert "shadow" in result.order_id
Best Practices¶
- Handle rate limits - Implement backoff
- Support shadow mode - For testing
- Parse errors carefully - Map to standard types
- Log extensively - Debug integration issues
- Test with sandbox - Before production