Multi-Agent Trading Architecture: Production-Ready Automated Trading¶
Series: The Intelligent Assistant (Article 8 of 13) Author: Bert Frichot Category: Trading Audience: Quant Developers, AI Engineers Difficulty: Advanced Reading Time: 28 minutes Word Count: ~6,500
Abstract¶
How we designed a multi-agent system for automated trading with clear agent boundaries, finite state machines for trade lifecycle management, circuit breakers for API reliability, and distributed tracing for audit trails. This article covers the architecture patterns that enable reliable, observable, and safe automated trading across Alpaca and Schwab brokers.
1. Why Multi-Agent for Trading¶
1.1 The Complexity Problem¶
Automated trading involves: - Real-time market data (prices, volume) - Order execution (entries, exits, modifications) - Strategy logic (signals, timing) - Risk management (sizing, limits) - Position tracking (P&L, stops)
Monolithic approach: One big script doing everything. - Hard to test - Hard to extend - Hard to debug - Single failure kills everything
1.2 The Multi-Agent Solution¶
Each concern becomes an independent agent with: - Single responsibility: Do one thing well - Clear boundaries: Defined inputs and outputs - Loose coupling: Communicate via messages - Independent failure: One agent down doesn't kill others
┌─────────────────────────────────────────────────────────┐
│ TRADING AGENTS │
├─────────────────────────────────────────────────────────┤
│ │
│ MarketDataAgent ─→ StrategyAgent ─→ RiskAgent │
│ │ │ │ │
│ │ ↓ ↓ │
│ └───────→ OrderExecutionAgent ←──┘ │
│ │ │
│ ↓ │
│ PositionManagementAgent │
│ │
└─────────────────────────────────────────────────────────┘
2. Agent Definitions¶
2.1 MarketDataAgent¶
Responsibility: Real-time quotes, historical bars, market status
class MarketDataAgent:
"""Fetches and streams market data."""
def get_quote(self, symbol: str) -> Quote:
"""Get latest price for symbol."""
pass
def get_bars(self, symbol: str, timeframe: str) -> List[Bar]:
"""Get historical OHLCV data."""
pass
def check_market_status(self) -> MarketStatus:
"""Is market open, closed, pre-market, after-hours?"""
pass
Message Inputs:
- market_data.get_quote - Fetch current price
- market_data.get_bars - Fetch historical data
- market_data.check_status - Market open/closed?
Message Outputs:
- market_data.quote_updated - New price available
- market_data.bars_received - Historical data ready
- market_data.market_status - Current market state
2.2 StrategyAgent¶
Responsibility: Signal generation, entry/exit logic
class StrategyAgent:
"""Generates trading signals based on strategy rules."""
def scan_candidates(self) -> List[str]:
"""Find symbols meeting initial criteria."""
pass
def evaluate_setup(self, symbol: str) -> Optional[Signal]:
"""Check if entry criteria met."""
pass
def manage_position(self, symbol: str) -> PositionAction:
"""Monitor open position for exit conditions."""
pass
State Machine: Trade Lifecycle (see Section 3)
2.3 RiskManagementAgent¶
Responsibility: Position sizing, portfolio limits, drawdown management
class RiskManagementAgent:
"""Enforces risk rules and approves trades."""
def calculate_size(self, symbol: str, entry: float) -> int:
"""Determine position size (max $1000)."""
pass
def check_limits(self, symbol: str, quantity: int) -> bool:
"""Validate against portfolio rules."""
pass
def approve_trade(self, signal: Signal) -> Approval:
"""Final risk approval."""
pass
Rules: - Max position size: $1,000 - Max portfolio exposure: 20% per position - Min cash reserve: 50% - Max drawdown: 20% - Stop loss required: 5%
2.4 OrderExecutionAgent¶
Responsibility: Order submission, fills, cancellations
class OrderExecutionAgent:
"""Manages order lifecycle with broker."""
def submit_order(self, order: Order) -> OrderId:
"""Place order with broker."""
pass
def cancel_order(self, order_id: OrderId) -> bool:
"""Cancel pending order."""
pass
def get_order_status(self, order_id: OrderId) -> OrderStatus:
"""Check order state."""
pass
Circuit Breakers: Yes (for Schwab and Alpaca APIs)
2.5 PositionManagementAgent¶
Responsibility: Position tracking, stop loss management, P&L
class PositionManagementAgent:
"""Tracks positions and manages protective orders."""
def track_position(self, fill: Fill) -> Position:
"""Add new position from fill."""
pass
def set_stop_loss(self, symbol: str, stop_price: float) -> OrderId:
"""Place stop loss order."""
pass
def calculate_pnl(self, symbol: str) -> float:
"""Current unrealized P&L."""
pass
3. State Machines¶
3.1 Trade Lifecycle State Machine¶
┌─────────────────────────────────────────────────────────┐
│ TRADE LIFECYCLE STATES │
├─────────────────────────────────────────────────────────┤
│ │
│ SCANNING │
│ │ strategy.candidate_found │
│ ↓ │
│ SETUP_DETECTED │
│ │ risk.trade_approved │
│ ↓ │
│ ENTRY_APPROVED │
│ │ order.submit │
│ ↓ │
│ ORDER_PLACED │
│ │ order.filled │
│ ↓ │
│ FILLED │
│ │ position.stop_set │
│ ↓ │
│ HOLDING │
│ │ position.target_hit OR position.stop_hit │
│ ↓ │
│ EXIT_TRIGGERED │
│ │ order.submit (exit) │
│ ↓ │
│ ORDER_CLOSING │
│ │ order.filled (exit) │
│ ↓ │
│ CLOSED │
│ │
└─────────────────────────────────────────────────────────┘
3.2 Implementation¶
class TradeStateMachine:
"""Manages trade lifecycle with strict transition rules."""
VALID_TRANSITIONS = {
"SCANNING": ["SETUP_DETECTED"],
"SETUP_DETECTED": ["ENTRY_APPROVED", "SCANNING"],
"ENTRY_APPROVED": ["ORDER_PLACED"],
"ORDER_PLACED": ["FILLED", "CANCELLED"],
"FILLED": ["HOLDING"],
"HOLDING": ["EXIT_TRIGGERED"],
"EXIT_TRIGGERED": ["ORDER_CLOSING"],
"ORDER_CLOSING": ["CLOSED"],
"CLOSED": [], # Terminal state
}
def __init__(self, symbol: str):
self.symbol = symbol
self.state = "SCANNING"
self.history = []
def transition(self, new_state: str, trigger: str):
"""Transition to new state with validation."""
if new_state not in self.VALID_TRANSITIONS[self.state]:
raise InvalidTransitionError(
f"Cannot transition from {self.state} to {new_state}"
)
self.history.append({
"from": self.state,
"to": new_state,
"trigger": trigger,
"timestamp": datetime.now()
})
self.state = new_state
3.3 Why State Machines Matter¶
Without state machine:
# Bug: Can fill before placing order
if order_filled:
update_position() # What if order wasn't placed?
With state machine:
# Error caught immediately
try:
trade_sm.transition("FILLED", "order.filled")
except InvalidTransitionError:
# Cannot FILL from SCANNING state
log_error("Tried to fill non-existent order")
4. Message Bus Architecture¶
4.1 Redis Pub/Sub¶
Agents communicate via Redis message bus:
class RedisBus:
"""Event-driven message bus using Redis Pub/Sub."""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.pubsub = self.redis.pubsub()
def publish(self, channel: str, message: dict):
"""Publish message to channel."""
self.redis.publish(channel, json.dumps(message))
def subscribe(self, channel: str, callback: Callable):
"""Subscribe to channel with callback."""
self.pubsub.subscribe(**{channel: callback})
4.2 Message Format¶
{
"event": "strategy.entry.signal",
"timestamp": "2025-10-28T14:30:00Z",
"trace_id": "trade-123-abc",
"data": {
"symbol": "AAPL",
"entry_price": 175.50,
"stop_price": 166.72,
"target_price": 201.82,
"quantity": 5,
"risk_reward_ratio": 3.0
},
"metadata": {
"source": "StrategyAgent",
"correlation_id": "scan-456"
}
}
4.3 Event Naming Convention¶
<agent>.<action>.<status>
Examples:
- market_data.quote.updated
- order.submit.success
- strategy.entry.signal
- risk.limit.exceeded
- position.stop.hit
5. Circuit Breakers¶
5.1 Why Circuit Breakers¶
APIs fail. Without protection: 1. Repeated failures slow down everything 2. Rate limits hit, account blocked 3. Cascading failures across agents
Circuit breaker pattern: After N failures, stop trying for T seconds.
5.2 Implementation¶
class CircuitBreaker:
"""Fail-fast protection for external APIs."""
def __init__(self, failure_threshold=3, timeout=60):
self.state = "CLOSED" # Normal operation
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection."""
# If OPEN, fail fast
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.timeout:
self.state = "HALF_OPEN"
else:
raise CircuitBreakerOpenError("API unavailable")
try:
result = func(*args, **kwargs)
# Success: reset if HALF_OPEN
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise
5.3 Circuit Breaker States¶
┌──────────────────────────────────────────────────────┐
│ CIRCUIT BREAKER STATES │
├──────────────────────────────────────────────────────┤
│ │
│ CLOSED ──[3 failures]──→ OPEN │
│ ↑ │ │
│ │ │ [timeout expires] │
│ │ ↓ │
│ └──[success]──────── HALF_OPEN │
│ │ │
│ │ [failure] │
│ └──→ OPEN │
│ │
└──────────────────────────────────────────────────────┘
5.4 Broker-Specific Breakers¶
class SchwabCircuitBreaker(CircuitBreaker):
"""Special handling for Schwab token expiration."""
def call(self, func, *args, **kwargs):
try:
return super().call(func, *args, **kwargs)
except TokenExpiredError:
# Token expiration = immediate OPEN
self.state = "OPEN"
self.last_failure_time = time.time()
logger.error(
"Schwab token expired. Run: "
"uv run python3 tools/schwab_setup.py"
)
raise
6. Full Trade Execution Flow¶
6.1 Step-by-Step Walkthrough¶
1. StrategyAgent scans for candidates
├─ Publishes: strategy.scan.started
├─ Calls: MarketDataAgent.get_quotes([symbols])
├─ Evaluates: MA20, RSI, volume for each symbol
└─ Publishes: strategy.candidate_found (symbol=AAPL)
2. StrategyAgent evaluates setup
├─ Calculates: entry=$175.50, stop=$166.72, target=$201.82
├─ Validates: risk/reward > 2.5
└─ Publishes: strategy.entry.signal (AAPL @ $175.50)
3. RiskManagementAgent approves trade
├─ Subscribes to: strategy.entry.signal
├─ Calculates: position size (max $1000 / $175.50 = 5 shares)
├─ Checks: portfolio limit ($877.50 < $1000 ✓)
├─ Checks: cash reserve (buying power > $877.50 ✓)
└─ Publishes: risk.trade.approved (AAPL, qty=5)
4. OrderExecutionAgent places entry order
├─ Subscribes to: risk.trade.approved
├─ Creates: limit order (AAPL, qty=5, price=$175.50)
├─ Calls: AlpacaAPI.submit_order() [with circuit breaker]
└─ Publishes: order.submitted (order_id=abc123)
5. OrderExecutionAgent receives fill
├─ Monitors: order status
├─ Detects: order status = FILLED
└─ Publishes: order.filled (AAPL, fill_price=$175.50)
6. PositionManagementAgent tracks position
├─ Subscribes to: order.filled
├─ Creates: position record (AAPL, qty=5, entry=$175.50)
└─ Publishes: position.opened (AAPL)
7. PositionManagementAgent sets stop loss
├─ Creates: stop order (AAPL, qty=5, stop=$166.72)
├─ Calls: AlpacaAPI.submit_order(type=STOP)
└─ Publishes: position.stop.set (AAPL, stop=$166.72)
8. StrategyAgent monitors position
├─ Subscribes to: market_data.quote.updated (AAPL)
├─ Checks: current price vs target ($201.82)
└─ Updates: trade state machine (HOLDING)
9. PositionManagementAgent detects target hit
├─ Detects: current price >= $201.82
├─ Publishes: position.target.hit (AAPL)
└─ Updates: trade state machine (EXIT_TRIGGERED)
10. OrderExecutionAgent places exit order
├─ Subscribes to: position.target.hit
├─ Creates: sell order (AAPL, qty=5, price=$201.82)
└─ Publishes: order.submitted (exit)
11. Trade closes
├─ Exit order fills
├─ P&L calculated: ($201.82 - $175.50) × 5 = $131.60
└─ Publishes: position.closed (AAPL, pnl=$131.60)
7. Distributed Tracing¶
7.1 Why Tracing¶
When a trade fails, you need to know: - Which agent failed? - What was the input? - How long did each step take? - What was the error?
7.2 Trace Structure¶
Trade Execution Trace (trade-123-abc):
├─ strategy.scan_candidates [500ms]
│ ├─ market_data.get_quotes [200ms]
│ └─ strategy.evaluate_criteria [300ms]
├─ strategy.evaluate_setup [100ms]
├─ risk.approve_trade [50ms]
│ ├─ risk.calculate_size [20ms]
│ └─ risk.check_limits [30ms]
├─ order.submit [300ms]
│ └─ alpaca_api.place_order [280ms]
├─ position.track [50ms]
├─ position.set_stop [200ms]
│ └─ alpaca_api.place_stop_order [180ms]
└─ Total: 1200ms
7.3 OpenTelemetry Integration¶
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
tracer = trace.get_tracer(__name__)
class OrderExecutionAgent:
def submit_order(self, order: Order) -> OrderId:
with tracer.start_as_current_span("order.submit") as span:
span.set_attribute("order.symbol", order.symbol)
span.set_attribute("order.quantity", order.quantity)
span.set_attribute("order.type", order.type)
try:
result = self.broker.submit(order)
span.set_attribute("order.id", result.id)
return result
except Exception as e:
span.set_attribute("error.type", type(e).__name__)
span.set_attribute("error.message", str(e))
raise
7.4 Trace Attributes¶
| Attribute | Example | Purpose |
|---|---|---|
trade.symbol |
AAPL | Stock being traded |
trade.entry_price |
175.50 | Entry price |
trade.quantity |
5 | Share count |
order.id |
abc123 | Broker order ID |
circuit_breaker.state |
CLOSED | Breaker status |
error.type |
InsufficientFunds | Exception type |
8. Error Handling¶
8.1 Circuit Breaker Scenario¶
1. MarketDataAgent calls AlpacaAPI.get_quote()
2. Request times out (network issue)
3. AlpacaCircuitBreaker.failure_count: 1 → 2 → 3
4. On 3rd failure: Circuit breaker OPENS
5. All subsequent requests fail immediately
6. After 60 seconds: Circuit breaker enters HALF_OPEN
7. Next request succeeds → Circuit breaker CLOSES
8.2 State Machine Invalid Transition¶
1. StrategyAgent generates entry signal
2. Bug: OrderExecutionAgent misses order.submit step
3. PositionManagementAgent receives stale order.filled event
4. TradeStateMachine.transition("FILLED") called
5. Current state: ENTRY_APPROVED (not ORDER_PLACED)
6. Raises: InvalidTransitionError
7. Trade aborted, no position opened
8.3 Market Closed¶
1. PositionManagementAgent detects target hit
2. OrderExecutionAgent tries to place exit order
3. MarketDataAgent.check_status() returns CLOSED
4. Exit order rejected
5. Agent waits for market open
6. On market open: Retry exit order
9. Infrastructure¶
9.1 Docker Compose¶
version: '3.8'
services:
redis:
image: redis:alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # Jaeger UI
- "6831:6831/udp"
healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:14269/"]
volumes:
redis_data:
9.2 Directory Structure¶
trading_agents/
├── __init__.py
├── base_agent.py # Base class with tracing
├── market_data_agent.py
├── order_execution_agent.py
├── strategy_agent.py
├── risk_management_agent.py
├── position_management_agent.py
├── state_machines/
│ ├── trade_state_machine.py
│ └── order_state_machine.py
├── circuit_breakers/
│ ├── base_breaker.py
│ ├── schwab_breaker.py
│ └── alpaca_breaker.py
├── message_bus/
│ ├── redis_bus.py
│ └── message.py
└── tracing/
└── tracer.py
10. Testing Strategy¶
10.1 Unit Tests¶
def test_valid_transition_scanning_to_setup():
sm = TradeStateMachine("AAPL")
assert sm.state == "SCANNING"
sm.transition("SETUP_DETECTED", "candidate_found")
assert sm.state == "SETUP_DETECTED"
def test_invalid_transition_scanning_to_filled():
sm = TradeStateMachine("AAPL")
with pytest.raises(InvalidTransitionError):
sm.transition("FILLED", "order_filled")
def test_circuit_breaker_opens_after_3_failures():
breaker = CircuitBreaker()
mock_api = MagicMock(side_effect=TimeoutError)
for i in range(3):
with pytest.raises(TimeoutError):
breaker.call(mock_api)
assert breaker.state == "OPEN"
10.2 Integration Tests¶
@pytest.mark.asyncio
async def test_full_trade_lifecycle():
"""Test complete trade from scan to close."""
# Setup agents with mock broker
mock_broker = MockBroker()
agents = setup_agents(mock_broker)
# Trigger scan
await agents.strategy.scan_candidates()
# Wait for async processing
await asyncio.sleep(2)
# Verify order placed
assert len(mock_broker.orders) == 1
# Simulate fill
mock_broker.fill_order(mock_broker.orders[0].id)
await asyncio.sleep(1)
# Verify position tracked
assert "AAPL" in agents.position.positions
# Verify stop loss set
stop_orders = [o for o in mock_broker.orders if o.type == "STOP"]
assert len(stop_orders) == 1
11. Lessons Learned¶
11.1 State Machines Prevent Bugs¶
Without state machines, we had bugs where: - Orders filled before being placed - Positions closed without opening - Duplicate stop losses set
Lesson: Explicit state transitions catch logic errors early.
11.2 Circuit Breakers Are Essential¶
Early version had no circuit breakers. Results: - Hit rate limits repeatedly - Schwab blocked API access - System slowed to crawl
Lesson: Fail fast with circuit breakers, not slow retries.
11.3 Tracing Is Not Optional¶
When debugging production issues: - "Why did this trade fail?" - "How long did order submission take?" - "Which agent errored?"
Lesson: Instrument everything. Traces save hours of debugging.
11.4 Paper Trade First¶
Every strategy runs 24-48 hours in paper account before live: - Validates state machine transitions - Tests circuit breaker behavior - Catches edge cases in market hours
Lesson: Paper trading catches bugs that unit tests miss.
Conclusion¶
The multi-agent trading architecture provides:
- Clear boundaries: Each agent has one responsibility
- State machines: Trade lifecycle is explicit and validated
- Circuit breakers: API failures handled gracefully
- Message bus: Loose coupling via Redis pub/sub
- Distributed tracing: Full audit trail for debugging
The key insight: Trading systems need production patterns. State machines, circuit breakers, and tracing aren't optional—they're what separate working systems from production systems.
Series Navigation: - Previous: Article 7 - Self-Maintaining Automation - Current: Article 8 - Multi-Agent Trading Architecture - Next: Article 9 - Client Intelligence Pipeline - Unlocks: Articles 11, 12