Cross-Chain DeFi Examples¶
Examples of using Tesseract for DeFi operations across rollups.
Overview¶
Tesseract enables atomic operations across multiple rollups, perfect for DeFi applications like:
- Cross-chain swaps
- Arbitrage execution
- Liquidity rebalancing
- Cross-chain lending
Cross-Chain Swap¶
Execute an atomic swap between two rollups:
import os
import time
from dataclasses import dataclass
@dataclass
class SwapOrder:
"""Cross-chain swap order."""
source_chain: str
target_chain: str
source_token: str
target_token: str
amount: int
min_receive: int
def execute_cross_chain_swap(order: SwapOrder):
"""Execute atomic cross-chain swap using Tesseract."""
print(f"Executing cross-chain swap:")
print(f" {order.source_chain} -> {order.target_chain}")
print(f" {order.amount} {order.source_token} -> {order.target_token}")
# Generate swap transaction ID
swap_id = os.urandom(32)
# Encode swap payload
payload = encode_swap_payload(order)
# Step 1: Lock on source chain
lock_tx_id = os.urandom(32)
source_contract.functions.buffer_transaction(
lock_tx_id,
order.source_chain,
order.target_chain,
payload,
b'\x00' * 32, # No dependency
int(time.time()) + 120 # 2 minute window
).transact({'from': operator.address})
print(f" Locked on source: {lock_tx_id.hex()[:16]}...")
# Step 2: Release on target (depends on lock)
release_tx_id = os.urandom(32)
target_contract.functions.buffer_transaction(
release_tx_id,
order.target_chain,
order.source_chain,
payload,
lock_tx_id, # Depends on lock
int(time.time()) + 120
).transact({'from': operator.address})
print(f" Release queued: {release_tx_id.hex()[:16]}...")
# Step 3: Wait and resolve
time.sleep(5)
# Resolve lock first
source_contract.functions.resolve_dependency(lock_tx_id).transact({
'from': operator.address
})
# Then resolve release
target_contract.functions.resolve_dependency(release_tx_id).transact({
'from': operator.address
})
print(f" Swap resolved!")
return {
'swap_id': swap_id.hex(),
'lock_tx': lock_tx_id.hex(),
'release_tx': release_tx_id.hex()
}
def encode_swap_payload(order: SwapOrder) -> bytes:
"""Encode swap parameters into payload."""
# Simple encoding for example
return (
f"{order.source_token}:{order.target_token}:"
f"{order.amount}:{order.min_receive}"
).encode()
# Usage
order = SwapOrder(
source_chain="0x1111111111111111111111111111111111111111",
target_chain="0x2222222222222222222222222222222222222222",
source_token="ETH",
target_token="USDC",
amount=1_000_000_000_000_000_000, # 1 ETH in wei
min_receive=3000_000_000 # 3000 USDC (6 decimals)
)
result = execute_cross_chain_swap(order)
Arbitrage Execution¶
Execute atomic arbitrage across multiple DEXes:
@dataclass
class ArbitrageOpportunity:
"""Arbitrage opportunity details."""
buy_chain: str
sell_chain: str
token: str
buy_price: float
sell_price: float
amount: int
expected_profit: float
def execute_arbitrage(opp: ArbitrageOpportunity):
"""Execute atomic arbitrage."""
print(f"Executing arbitrage:")
print(f" Buy on {opp.buy_chain} at {opp.buy_price}")
print(f" Sell on {opp.sell_chain} at {opp.sell_price}")
print(f" Expected profit: {opp.expected_profit}")
execution_time = int(time.time()) + 30
# Step 1: Buy transaction
buy_tx_id = os.urandom(32)
buy_payload = encode_buy_order(opp)
buy_contract.functions.buffer_transaction(
buy_tx_id,
opp.buy_chain,
opp.sell_chain,
buy_payload,
b'\x00' * 32,
execution_time
).transact({'from': operator.address})
# Step 2: Sell transaction (depends on buy)
sell_tx_id = os.urandom(32)
sell_payload = encode_sell_order(opp)
sell_contract.functions.buffer_transaction(
sell_tx_id,
opp.sell_chain,
opp.buy_chain,
sell_payload,
buy_tx_id, # Must buy before selling
execution_time
).transact({'from': operator.address})
print(f" Buy TX: {buy_tx_id.hex()[:16]}...")
print(f" Sell TX: {sell_tx_id.hex()[:16]}...")
# Wait for execution time
time.sleep(32)
# Execute atomically
buy_contract.functions.resolve_dependency(buy_tx_id).transact({
'from': operator.address
})
sell_contract.functions.resolve_dependency(sell_tx_id).transact({
'from': operator.address
})
# Verify both ready
buy_ready = buy_contract.functions.is_transaction_ready(buy_tx_id).call()
sell_ready = sell_contract.functions.is_transaction_ready(sell_tx_id).call()
if buy_ready and sell_ready:
print(" Arbitrage executed successfully!")
buy_contract.functions.mark_executed(buy_tx_id).transact({
'from': operator.address
})
sell_contract.functions.mark_executed(sell_tx_id).transact({
'from': operator.address
})
return True
else:
print(" Arbitrage failed - rolling back")
return False
def encode_buy_order(opp: ArbitrageOpportunity) -> bytes:
return f"BUY:{opp.token}:{opp.amount}:{opp.buy_price}".encode()
def encode_sell_order(opp: ArbitrageOpportunity) -> bytes:
return f"SELL:{opp.token}:{opp.amount}:{opp.sell_price}".encode()
Liquidity Rebalancing¶
Rebalance liquidity across multiple pools:
@dataclass
class RebalanceConfig:
"""Liquidity rebalancing configuration."""
pools: list # List of pool addresses
target_ratios: dict # {pool: target_ratio}
token: str
total_liquidity: int
def rebalance_liquidity(config: RebalanceConfig):
"""Rebalance liquidity across pools atomically."""
print(f"Rebalancing liquidity for {config.token}:")
# Calculate required transfers
transfers = calculate_transfers(config)
if not transfers:
print(" No rebalancing needed")
return
# Create dependent transactions for each transfer
transactions = []
previous_tx = b'\x00' * 32
for i, transfer in enumerate(transfers):
tx_id = os.urandom(32)
payload = encode_transfer(transfer)
contract.functions.buffer_transaction(
tx_id,
transfer['from_pool'],
transfer['to_pool'],
payload,
previous_tx, # Chain transfers
int(time.time()) + 60
).transact({'from': operator.address})
transactions.append({
'tx_id': tx_id,
'from': transfer['from_pool'],
'to': transfer['to_pool'],
'amount': transfer['amount']
})
print(f" Transfer {i+1}: {transfer['amount']} from {transfer['from_pool'][:10]}...")
previous_tx = tx_id
# Resolve all transfers in order
time.sleep(62)
for tx in transactions:
contract.functions.resolve_dependency(tx['tx_id']).transact({
'from': operator.address
})
print(f" Rebalancing complete!")
return transactions
def calculate_transfers(config: RebalanceConfig) -> list:
"""Calculate required transfers for rebalancing."""
# Simplified calculation
transfers = []
# ... calculation logic
return transfers
def encode_transfer(transfer: dict) -> bytes:
return f"TRANSFER:{transfer['amount']}".encode()
Cross-Chain Lending¶
Coordinate a cross-chain lending position:
@dataclass
class LendingPosition:
"""Cross-chain lending position."""
collateral_chain: str
borrow_chain: str
collateral_token: str
collateral_amount: int
borrow_token: str
borrow_amount: int
def open_cross_chain_position(position: LendingPosition):
"""Open atomic cross-chain lending position."""
print(f"Opening cross-chain lending position:")
print(f" Collateral: {position.collateral_amount} {position.collateral_token}")
print(f" Borrow: {position.borrow_amount} {position.borrow_token}")
execution_time = int(time.time()) + 60
# Step 1: Deposit collateral
deposit_tx_id = os.urandom(32)
deposit_payload = encode_deposit(position)
collateral_contract.functions.buffer_transaction(
deposit_tx_id,
position.collateral_chain,
position.borrow_chain,
deposit_payload,
b'\x00' * 32,
execution_time
).transact({'from': operator.address})
print(f" Deposit TX: {deposit_tx_id.hex()[:16]}...")
# Step 2: Borrow (depends on collateral deposit)
borrow_tx_id = os.urandom(32)
borrow_payload = encode_borrow(position)
borrow_contract.functions.buffer_transaction(
borrow_tx_id,
position.borrow_chain,
position.collateral_chain,
borrow_payload,
deposit_tx_id, # Must deposit collateral first
execution_time
).transact({'from': operator.address})
print(f" Borrow TX: {borrow_tx_id.hex()[:16]}...")
# Execute atomically
time.sleep(62)
collateral_contract.functions.resolve_dependency(deposit_tx_id).transact({
'from': operator.address
})
borrow_contract.functions.resolve_dependency(borrow_tx_id).transact({
'from': operator.address
})
# Verify
deposit_ready = collateral_contract.functions.is_transaction_ready(
deposit_tx_id
).call()
borrow_ready = borrow_contract.functions.is_transaction_ready(
borrow_tx_id
).call()
if deposit_ready and borrow_ready:
# Execute both
collateral_contract.functions.mark_executed(deposit_tx_id).transact({
'from': operator.address
})
borrow_contract.functions.mark_executed(borrow_tx_id).transact({
'from': operator.address
})
print(" Position opened successfully!")
return {
'deposit_tx': deposit_tx_id.hex(),
'borrow_tx': borrow_tx_id.hex()
}
else:
print(" Failed to open position")
return None
def encode_deposit(position: LendingPosition) -> bytes:
return f"DEPOSIT:{position.collateral_token}:{position.collateral_amount}".encode()
def encode_borrow(position: LendingPosition) -> bytes:
return f"BORROW:{position.borrow_token}:{position.borrow_amount}".encode()
Multi-Chain Governance¶
Execute governance decisions across multiple chains:
def execute_governance_proposal(proposal_id: str, target_chains: list):
"""Execute governance proposal across all target chains."""
print(f"Executing proposal {proposal_id} across {len(target_chains)} chains")
execution_time = int(time.time()) + 300 # 5 minute coordination window
transactions = []
previous_tx = b'\x00' * 32
# Create chained execution across all chains
for i, chain in enumerate(target_chains):
tx_id = os.urandom(32)
payload = encode_governance_action(proposal_id, chain)
contracts[chain].functions.buffer_transaction(
tx_id,
chain,
target_chains[(i + 1) % len(target_chains)],
payload,
previous_tx,
execution_time
).transact({'from': operator.address})
transactions.append({
'chain': chain,
'tx_id': tx_id
})
previous_tx = tx_id
print(f" {chain}: {tx_id.hex()[:16]}...")
# Wait and resolve all
time.sleep(302)
for tx in transactions:
contracts[tx['chain']].functions.resolve_dependency(
tx['tx_id']
).transact({'from': operator.address})
# Verify all ready
all_ready = all(
contracts[tx['chain']].functions.is_transaction_ready(tx['tx_id']).call()
for tx in transactions
)
if all_ready:
# Execute all
for tx in transactions:
contracts[tx['chain']].functions.mark_executed(
tx['tx_id']
).transact({'from': operator.address})
print(" Governance proposal executed on all chains!")
else:
print(" Proposal execution failed")
def encode_governance_action(proposal_id: str, chain: str) -> bytes:
return f"GOVERNANCE:{proposal_id}:{chain}".encode()
Best Practices for DeFi¶
- Set appropriate coordination windows - Allow for network latency
- Validate prices before execution - Check for slippage
- Implement circuit breakers - Stop on unusual conditions
- Monitor gas prices - Ensure profitability
- Handle partial failures - Plan rollback strategies
Next Steps¶
- Dependency Chains - Complex dependencies
- Basic Usage - Simple examples
- Security Model - Security considerations