Trading Multi-Agent System - Architecture Design¶
Date: 2025-10-28 Phase: 1 of 3 (Trading Multi-Agent → Isolation → IT Raven Enterprise) Status: Implementation In Progress
Overview¶
Production-ready multi-agent system for automated trading with: - State machines for trade lifecycle management - Circuit breakers for API reliability - Distributed tracing for audit trail - Event-driven architecture for loose coupling - Agent boundaries for single responsibility
Agent Boundaries¶
1. MarketDataAgent¶
Responsibility: Real-time quotes, historical bars, market status
Capabilities: - Get latest quote for symbol - Get historical bars (1min, 5min, 1hour, 1day) - Subscribe to real-time price streams - Check market open/closed status
Message Inputs:
- market_data.get_quote - Fetch current price
- market_data.get_bars - Fetch historical data
- market_data.subscribe - Start price stream
- market_data.check_status - Market open/closed?
Message Outputs:
- market_data.quote_updated - New price available
- market_data.bars_received - Historical data ready
- market_data.stream_tick - Real-time price update
- market_data.market_status - Open/closed/pre-market/after-hours
State: Stateless (pure data fetcher)
Circuit Breaker: Yes (Alpaca API) - Failure threshold: 3 - Timeout: 60 seconds - Half-open retry: 1 request
Rate Limiting: 200 req/min (Alpaca limit)
2. OrderExecutionAgent¶
Responsibility: Order submission, fills, cancellations, modifications
Capabilities: - Submit market/limit/stop orders - Cancel pending orders - Modify order prices - Track order status (submitted → filled/rejected/cancelled)
Message Inputs:
- order.submit - Place new order
- order.cancel - Cancel pending order
- order.modify - Change price/quantity
- order.get_status - Check order state
Message Outputs:
- order.submitted - Order sent to broker
- order.filled - Order executed
- order.partially_filled - Partial execution
- order.rejected - Order rejected by broker
- order.cancelled - Order cancelled
State: Maintains order tracking (order_id → order_status)
Circuit Breakers: - Schwab API: 3 failures → 60s timeout - Alpaca API: 3 failures → 60s timeout
Rate Limiting: - Schwab: 120 req/min - Alpaca: 200 req/min
3. StrategyAgent¶
Responsibility: Signal generation, entry/exit logic, trend following rules
Capabilities: - Scan for entry candidates - Evaluate setups (MA20, RSI, volume) - Generate entry signals - Manage position (trailing stops, profit targets) - Decide when to exit
Message Inputs:
- strategy.scan_candidates - Find potential entries
- strategy.evaluate_setup - Check if criteria met
- strategy.manage_position - Monitor open trade
- strategy.should_exit - Check exit conditions
Message Outputs:
- strategy.candidate_found - Potential entry detected
- strategy.entry_signal - Criteria met, ready to enter
- strategy.hold_signal - Continue holding position
- strategy.exit_signal - Time to exit (target/stop/time)
State Machine: Trade Lifecycle (see below)
States:
- SCANNING - Looking for setups
- SETUP_DETECTED - Found candidate, evaluating
- ENTRY_APPROVED - Ready to place order
- HOLDING - Position open, monitoring
- EXIT_TRIGGERED - Exit condition met
4. RiskManagementAgent¶
Responsibility: Position sizing, portfolio limits, drawdown management
Capabilities: - Calculate position size (max $1000) - Check portfolio limits (max 20% per position) - Enforce cash reserve (min 50%) - Track drawdown (max 20%) - Approve/reject trades based on risk
Message Inputs:
- risk.calculate_size - Determine position size
- risk.check_limits - Validate against rules
- risk.assess_portfolio - Check overall risk
- risk.approve_trade - Final risk approval
Message Outputs:
- risk.size_calculated - Position size determined
- risk.limit_exceeded - Trade rejected (too risky)
- risk.trade_approved - Risk checks passed
- risk.drawdown_alert - Drawdown threshold reached
State: Portfolio-level risk metrics
Rules: - Max position size: $1000 - Max portfolio exposure: 20% per position - Min cash reserve: 50% - Max drawdown: 20% - Stop loss required: 5% for longs, +5% for shorts
5. PositionManagementAgent¶
Responsibility: Position tracking, stop loss management, P&L calculation
Capabilities: - Track all open positions - Set/update stop losses - Calculate unrealized P&L - Monitor for stop/target hits - Handle short positions correctly
Message Inputs:
- position.track - Add new position
- position.set_stop - Place stop loss order
- position.update_stop - Modify stop (trail higher)
- position.calculate_pnl - Get current P&L
- position.check_exit - Stop/target hit?
Message Outputs:
- position.opened - New position tracked
- position.stop_set - Stop loss placed
- position.stop_hit - Stop loss triggered
- position.target_hit - Profit target reached
- position.pnl_updated - P&L recalculated
State: Position inventory (symbol → position_data)
Special Handling: Short positions (BUY stops ABOVE entry)
Message Bus Architecture¶
Technology: Redis Pub/Sub
Event Naming Convention: <agent>.<action>.<status>
Examples:
- market_data.quote.updated
- order.submit.success
- strategy.entry.signal
- risk.limit.exceeded
- position.stop.hit
Message Format:
{
"event": "strategy.entry.signal",
"timestamp": "2025-10-28T14:30:00Z",
"trace_id": "trade-123-abc",
"data": {
"symbol": "AAPL",
"entry_price": 175.50,
"stop_price": 166.72,
"target_price": 201.82,
"quantity": 5,
"risk_reward_ratio": 3.0,
"strategy": "trend_following_3_to_1"
},
"metadata": {
"source": "StrategyAgent",
"correlation_id": "scan-456",
"priority": "high"
}
}
State Machines¶
Trade Lifecycle State Machine¶
States:
SCANNING - Looking for entry setups
SETUP_DETECTED - Found candidate, evaluating criteria
ENTRY_APPROVED - Risk approved, ready to place order
ORDER_PLACED - Entry order submitted to broker
FILLED - Entry order executed
HOLDING - Position open, stop loss set
EXIT_TRIGGERED - Exit condition met (target/stop/time)
ORDER_CLOSING - Exit order submitted
CLOSED - Position closed, trade complete
Transitions:
SCANNING → SETUP_DETECTED
Trigger: strategy.candidate_found
Condition: Price > MA20, RSI 50-70, Volume > avg
SETUP_DETECTED → ENTRY_APPROVED
Trigger: risk.trade_approved
Condition: Position size < $1000, portfolio < 20%, cash > 50%
ENTRY_APPROVED → ORDER_PLACED
Trigger: order.submit
Condition: Market open
ORDER_PLACED → FILLED
Trigger: order.filled
Condition: Broker confirms execution
FILLED → HOLDING
Trigger: position.stop_set
Condition: Stop loss order placed successfully
HOLDING → EXIT_TRIGGERED
Trigger: position.target_hit OR position.stop_hit OR strategy.exit_signal
Condition: Exit criteria met
EXIT_TRIGGERED → ORDER_CLOSING
Trigger: order.submit (exit)
Condition: Market open
ORDER_CLOSING → CLOSED
Trigger: order.filled (exit)
Condition: Broker confirms execution
Invalid Transitions (raise exception):
- SCANNING → ORDER_PLACED (must evaluate setup first)
- SETUP_DETECTED → FILLED (must place order first)
- ORDER_PLACED → CLOSED (must fill and hold first)
Order State Machine¶
States:
DRAFT - Order created, not submitted
SUBMITTED - Sent to broker
ACCEPTED - Broker accepted order
PARTIALLY_FILLED - Some shares executed
FILLED - Fully executed
CANCELLED - User/broker cancelled
REJECTED - Broker rejected order
EXPIRED - Time-in-force expired
Transitions:
DRAFT → SUBMITTED
Trigger: order.submit
SUBMITTED → ACCEPTED
Trigger: broker confirmation
ACCEPTED → PARTIALLY_FILLED
Trigger: partial fill event
PARTIALLY_FILLED → FILLED
Trigger: remaining shares filled
ACCEPTED → CANCELLED
Trigger: order.cancel OR market closed
SUBMITTED → REJECTED
Trigger: broker rejection (insufficient funds, invalid symbol, etc)
Circuit Breaker Implementation¶
Pattern: Fail-fast with exponential backoff
SchwabCircuitBreaker¶
class SchwabCircuitBreaker:
"""
Circuit breaker for Schwab API with special token handling.
States:
CLOSED - Normal operation
OPEN - API unavailable, reject requests immediately
HALF_OPEN - Testing if API recovered
Thresholds:
- Failure count: 3
- Timeout: 60 seconds
- Half-open retry: 1 request
"""
def __init__(self):
self.state = "CLOSED"
self.failure_count = 0
self.failure_threshold = 3
self.timeout = 60 # seconds
self.last_failure_time = None
self.half_open_success = False
def call(self, func, *args, **kwargs):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.timeout:
self.state = "HALF_OPEN"
logger.info("Schwab circuit breaker entering HALF_OPEN")
else:
raise CircuitBreakerOpenError("Schwab API unavailable")
try:
result = func(*args, **kwargs)
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
logger.info("Schwab circuit breaker recovered, now CLOSED")
return result
except TokenExpiredError:
logger.error("Schwab token expired. Run: uv run python3 tools/schwab_setup.py")
self.state = "OPEN"
self.last_failure_time = time.time()
raise
except RateLimitError as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
logger.error("Schwab circuit breaker OPENED (rate limit)")
raise
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
logger.error(f"Schwab circuit breaker OPENED: {str(e)}")
raise
AlpacaCircuitBreaker¶
class AlpacaCircuitBreaker:
"""
Circuit breaker for Alpaca API (paper + live accounts).
Thresholds:
- Failure count: 3
- Timeout: 60 seconds
- Rate limit: 200 req/min
"""
def __init__(self, account_type="paper"):
self.account_type = account_type
self.state = "CLOSED"
self.failure_count = 0
self.failure_threshold = 3
self.timeout = 60
self.last_failure_time = None
def call(self, func, *args, **kwargs):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.timeout:
self.state = "HALF_OPEN"
logger.info(f"Alpaca {self.account_type} circuit breaker entering HALF_OPEN")
else:
raise CircuitBreakerOpenError(f"Alpaca {self.account_type} API unavailable")
try:
result = func(*args, **kwargs)
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
logger.info(f"Alpaca {self.account_type} circuit breaker recovered")
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
logger.error(f"Alpaca {self.account_type} circuit breaker OPENED: {str(e)}")
raise
Distributed Tracing¶
Technology: OpenTelemetry + Jaeger
Trace Structure:
Trade Execution Trace (trade-123-abc):
├─ strategy.scan_candidates [500ms]
│ ├─ market_data.get_quotes [200ms]
│ └─ strategy.evaluate_criteria [300ms]
├─ strategy.evaluate_setup [100ms]
├─ risk.approve_trade [50ms]
│ ├─ risk.calculate_size [20ms]
│ └─ risk.check_limits [30ms]
├─ order.submit [300ms]
│ └─ alpaca_api.place_order [280ms]
├─ position.track [50ms]
├─ position.set_stop [200ms]
│ └─ alpaca_api.place_stop_order [180ms]
└─ Total: 1200ms
Instrumentation Points: - Agent method entry/exit - External API calls (Schwab, Alpaca) - State machine transitions - Circuit breaker state changes - Message bus publish/subscribe
Trace Attributes:
- trade.symbol: Stock symbol
- trade.entry_price: Entry price
- trade.quantity: Share count
- trade.strategy: Strategy name
- order.id: Order ID
- order.status: Order status
- circuit_breaker.state: CLOSED/OPEN/HALF_OPEN
- error.type: Exception type (if failed)
Directory Structure¶
/Users/bertfrichot/mem-agent-mcp/
├── trading_agents/ # NEW: Multi-agent trading system
│ ├── __init__.py
│ ├── base_agent.py # Base agent class with tracing
│ ├── market_data_agent.py # MarketDataAgent
│ ├── order_execution_agent.py # OrderExecutionAgent
│ ├── strategy_agent.py # StrategyAgent
│ ├── risk_management_agent.py # RiskManagementAgent
│ ├── position_management_agent.py # PositionManagementAgent
│ ├── state_machines/
│ │ ├── __init__.py
│ │ ├── trade_state_machine.py # Trade lifecycle
│ │ └── order_state_machine.py # Order lifecycle
│ ├── circuit_breakers/
│ │ ├── __init__.py
│ │ ├── base_breaker.py # Base circuit breaker
│ │ ├── schwab_breaker.py # Schwab API breaker
│ │ └── alpaca_breaker.py # Alpaca API breaker
│ ├── message_bus/
│ │ ├── __init__.py
│ │ ├── redis_bus.py # Redis pub/sub
│ │ └── message.py # Message schema
│ ├── tracing/
│ │ ├── __init__.py
│ │ └── tracer.py # OpenTelemetry setup
│ └── config.py # Agent configuration
├── tests/
│ └── trading_agents/ # NEW: Agent tests
│ ├── test_market_data_agent.py
│ ├── test_order_execution_agent.py
│ ├── test_strategy_agent.py
│ ├── test_risk_management_agent.py
│ ├── test_position_management_agent.py
│ ├── test_trade_state_machine.py
│ ├── test_circuit_breakers.py
│ └── test_integration.py # Full trade lifecycle
├── docker-compose.yml # Redis + Jaeger
└── docs/architecture/
└── trading-multi-agent-design.md # This file
Infrastructure Setup¶
Docker Compose¶
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:6379-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 3
jaeger:
image: jaegertracing/all-in-one:latest
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686" # Jaeger UI
- "14268:14268"
- "14250:14250"
- "9411:9411"
healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:14269/"]
interval: 10s
timeout: 3s
retries: 3
volumes:
redis_data:
Starting Infrastructure¶
# Start Redis + Jaeger
docker-compose up -d
# Verify
docker-compose ps
# View logs
docker-compose logs -f
# Access Jaeger UI
open http://localhost:16686
# Test Redis
redis-cli ping
# Should return: PONG
Agent Coordination Flow¶
Example: Full Trade Execution¶
1. StrategyAgent scans for candidates
├─ Publishes: strategy.scan.started
├─ Calls: MarketDataAgent.get_quotes([symbols])
├─ Evaluates: MA20, RSI, volume for each symbol
└─ Publishes: strategy.candidate_found (symbol=AAPL)
2. StrategyAgent evaluates setup
├─ Publishes: strategy.evaluate.started (symbol=AAPL)
├─ Calculates: entry, stop, target prices
├─ Validates: risk/reward > 2.5
└─ Publishes: strategy.entry.signal (AAPL @ $175.50)
3. RiskManagementAgent approves trade
├─ Subscribes to: strategy.entry.signal
├─ Calculates: position size (max $1000 / $175.50 = 5 shares)
├─ Checks: portfolio limit (5 × $175.50 = $877.50 < $1000 ✓)
├─ Checks: cash reserve (buying power > $877.50 ✓)
└─ Publishes: risk.trade.approved (AAPL, qty=5)
4. OrderExecutionAgent places entry order
├─ Subscribes to: risk.trade.approved
├─ Creates: limit order (AAPL, qty=5, price=$175.50)
├─ Calls: AlpacaAPI.submit_order() [with circuit breaker]
└─ Publishes: order.submitted (order_id=abc123)
5. OrderExecutionAgent receives fill
├─ Monitors: order status via polling or webhook
├─ Detects: order status = FILLED
├─ Publishes: order.filled (AAPL, qty=5, fill_price=$175.50)
└─ Updates: order state machine (SUBMITTED → FILLED)
6. PositionManagementAgent tracks position
├─ Subscribes to: order.filled
├─ Creates: position record (AAPL, qty=5, entry=$175.50)
├─ Calculates: stop=$166.72 (-5%), target=$201.82 (+15%)
└─ Publishes: position.opened (AAPL)
7. PositionManagementAgent sets stop loss
├─ Creates: stop order (AAPL, qty=5, stop=$166.72)
├─ Calls: AlpacaAPI.submit_order(type=STOP) [with circuit breaker]
└─ Publishes: position.stop.set (AAPL, stop=$166.72)
8. StrategyAgent monitors position
├─ Subscribes to: market_data.quote.updated (AAPL)
├─ Checks: current price vs target ($201.82)
├─ Updates: trade state machine (HOLDING)
└─ Waits for: target hit OR stop hit
9. PositionManagementAgent detects target hit
├─ Monitors: real-time quotes
├─ Detects: current price >= $201.82
├─ Publishes: position.target.hit (AAPL)
└─ Updates: trade state machine (HOLDING → EXIT_TRIGGERED)
10. OrderExecutionAgent places exit order
├─ Subscribes to: position.target.hit
├─ Creates: limit order (AAPL, qty=5, price=$201.82, side=SELL)
├─ Calls: AlpacaAPI.submit_order() [with circuit breaker]
└─ Publishes: order.submitted (exit order_id=xyz789)
11. OrderExecutionAgent receives exit fill
├─ Monitors: exit order status
├─ Detects: order status = FILLED
├─ Publishes: order.filled (AAPL exit, fill_price=$201.82)
└─ Updates: order state machine (SUBMITTED → FILLED)
12. PositionManagementAgent closes trade
├─ Subscribes to: order.filled (exit)
├─ Calculates: P&L ($201.82 - $175.50) × 5 = $131.60
├─ Updates: trade state machine (EXIT_TRIGGERED → CLOSED)
├─ Publishes: position.closed (AAPL, pnl=$131.60, reason=TARGET_HIT)
└─ Cancels: stop loss order (no longer needed)
13. StrategyAgent logs trade
├─ Subscribes to: position.closed
├─ Records: trade in journal
├─ Updates: strategy metrics (win rate, avg R/R)
└─ Publishes: strategy.trade.logged (AAPL)
Total execution time: ~2-5 seconds (synchronous steps)
Total hold time: Hours to days (async monitoring)
Error Handling¶
Circuit Breaker Scenarios¶
Scenario 1: Alpaca API Down
1. MarketDataAgent calls AlpacaAPI.get_quote()
2. Request fails (timeout, 500 error, network issue)
3. AlpacaCircuitBreaker.failure_count increments (1 → 2 → 3)
4. On 3rd failure: Circuit breaker opens
5. All subsequent requests fail immediately with CircuitBreakerOpenError
6. After 60 seconds: Circuit breaker enters HALF_OPEN
7. Next request succeeds → Circuit breaker closes
8. OR next request fails → Circuit breaker reopens for another 60s
Scenario 2: Schwab Token Expired
1. MarketDataAgent calls SchwabAPI.get_quote()
2. API returns 401 Unauthorized (token expired)
3. SchwabCircuitBreaker detects TokenExpiredError
4. Circuit breaker immediately opens (don't retry expired tokens)
5. Log error: "Schwab token expired. Run: uv run python3 tools/schwab_setup.py"
6. All agents stop using Schwab API
7. Manual intervention required (refresh token)
8. After token refresh: Circuit breaker can be manually reset
Scenario 3: Rate Limit Hit
1. Multiple agents call AlpacaAPI rapidly
2. API returns 429 Too Many Requests
3. AlpacaCircuitBreaker detects RateLimitError
4. Circuit breaker opens after 3 rate limit errors
5. Agents wait 60 seconds before retrying
6. During wait: All requests fail fast (no API calls)
7. After timeout: Circuit breaker enters HALF_OPEN
8. Rate limit should be cleared by now
9. Next request succeeds → Circuit breaker closes
State Machine Invalid Transitions¶
Scenario 4: Try to Fill Before Placing Order
1. StrategyAgent generates entry signal
2. Bug: OrderExecutionAgent misses order.submit step
3. PositionManagementAgent receives order.filled event (stale/duplicate)
4. TradeStateMachine.fill() called while state = ENTRY_APPROVED
5. Raises: InvalidTransitionError("Cannot transition from ENTRY_APPROVED to FILLED")
6. Error logged with trace_id for debugging
7. Trade aborted, no position opened
Scenario 5: Market Closed During Exit
1. PositionManagementAgent detects target hit
2. OrderExecutionAgent tries to place exit order
3. MarketDataAgent.check_status() returns CLOSED
4. Exit order rejected (cannot trade when market closed)
5. OrderStateMachine remains in DRAFT (not submitted)
6. Agent waits for market open
7. On market open: Retry exit order submission
Distributed Trace Error Analysis¶
Scenario 6: Order Rejected by Broker
Trace: trade-123-abc (FAILED)
├─ strategy.scan_candidates [✓ 500ms]
├─ strategy.evaluate_setup [✓ 100ms]
├─ risk.approve_trade [✓ 50ms]
├─ order.submit [✗ 300ms] ERROR: InsufficientFunds
│ └─ alpaca_api.place_order [280ms]
│ └─ HTTP 403: "Insufficient buying power"
└─ Total: 950ms (FAILED)
Trace attributes:
- error.type: InsufficientFunds
- error.message: "Insufficient buying power"
- order.symbol: AAPL
- order.quantity: 5
- order.estimated_cost: $877.50
- account.buying_power: $500.00 ← ROOT CAUSE
Resolution:
- RiskManagementAgent needs to check buying_power BEFORE approving trade
- Add check: required_capital < account.buying_power
Testing Strategy¶
Unit Tests¶
# tests/trading_agents/test_trade_state_machine.py
def test_valid_transition_scanning_to_setup():
sm = TradeStateMachine("AAPL")
assert sm.state == "SCANNING"
sm.detect_setup(entry=175.50, stop=166.72, target=201.82)
assert sm.state == "SETUP_DETECTED"
assert sm.entry_price == 175.50
def test_invalid_transition_scanning_to_filled():
sm = TradeStateMachine("AAPL")
with pytest.raises(InvalidTransitionError):
sm.filled(fill_price=175.50) # Can't fill without placing order
def test_circuit_breaker_opens_after_3_failures():
breaker = AlpacaCircuitBreaker()
mock_api = MagicMock(side_effect=TimeoutError)
for i in range(3):
with pytest.raises(TimeoutError):
breaker.call(mock_api)
assert breaker.state == "OPEN"
with pytest.raises(CircuitBreakerOpenError):
breaker.call(mock_api)
def test_circuit_breaker_half_open_after_timeout():
breaker = AlpacaCircuitBreaker()
breaker.state = "OPEN"
breaker.last_failure_time = time.time() - 61 # 61 seconds ago
mock_api = MagicMock(return_value="success")
result = breaker.call(mock_api)
assert result == "success"
assert breaker.state == "CLOSED"
Integration Tests¶
# tests/trading_agents/test_integration.py
@pytest.mark.asyncio
async def test_full_trade_lifecycle():
"""
Integration test: Scan → Setup → Entry → Holding → Exit
Uses real agents with mocked broker APIs
"""
# Setup
redis_bus = RedisBus()
mock_alpaca = MockAlpacaAPI()
market_agent = MarketDataAgent(redis_bus, mock_alpaca)
strategy_agent = StrategyAgent(redis_bus)
risk_agent = RiskManagementAgent(redis_bus)
order_agent = OrderExecutionAgent(redis_bus, mock_alpaca)
position_agent = PositionManagementAgent(redis_bus, mock_alpaca)
# Start agents
await asyncio.gather(
market_agent.start(),
strategy_agent.start(),
risk_agent.start(),
order_agent.start(),
position_agent.start()
)
# Trigger scan
candidates = await strategy_agent.scan_candidates()
assert len(candidates) > 0
# Verify setup detection
await asyncio.sleep(1) # Wait for async processing
assert strategy_agent.state == "SETUP_DETECTED"
# Verify risk approval
await asyncio.sleep(1)
assert risk_agent.last_decision == "APPROVED"
# Verify order placed
await asyncio.sleep(1)
assert len(mock_alpaca.orders) == 1
assert mock_alpaca.orders[0].symbol == "AAPL"
# Simulate fill
mock_alpaca.fill_order(mock_alpaca.orders[0].id, price=175.50)
await asyncio.sleep(1)
# Verify position opened
assert len(position_agent.positions) == 1
assert position_agent.positions["AAPL"].entry_price == 175.50
# Verify stop loss set
await asyncio.sleep(1)
stop_orders = [o for o in mock_alpaca.orders if o.type == "STOP"]
assert len(stop_orders) == 1
assert stop_orders[0].stop_price == 166.72
# Simulate target hit
mock_alpaca.update_quote("AAPL", price=201.82)
await asyncio.sleep(2) # Wait for monitoring loop
# Verify exit order placed
exit_orders = [o for o in mock_alpaca.orders if o.side == "SELL"]
assert len(exit_orders) >= 1
# Simulate exit fill
mock_alpaca.fill_order(exit_orders[0].id, price=201.82)
await asyncio.sleep(1)
# Verify trade closed
assert position_agent.positions["AAPL"].state == "CLOSED"
assert position_agent.positions["AAPL"].pnl > 0
# Cleanup
await asyncio.gather(
market_agent.stop(),
strategy_agent.stop(),
risk_agent.stop(),
order_agent.stop(),
position_agent.stop()
)
Deployment Checklist¶
Infrastructure¶
- [ ] Docker Compose running (Redis + Jaeger)
- [ ] Redis accessible (redis-cli ping)
- [ ] Jaeger UI accessible (http://localhost:16686)
Configuration¶
- [ ]
.envfiles present (trading-command-center/.env) - [ ] Alpaca API keys configured
- [ ] Schwab API keys configured
- [ ] Schwab OAuth tokens fresh (<7 days)
Agents¶
- [ ] All 5 agents implemented
- [ ] State machines tested (unit tests pass)
- [ ] Circuit breakers tested (unit tests pass)
- [ ] Integration tests pass (full trade lifecycle)
Observability¶
- [ ] OpenTelemetry instrumentation added
- [ ] Traces visible in Jaeger UI
- [ ] Error traces include stack traces
- [ ] Performance metrics captured
Safety¶
- [ ] Paper trading only (ALPACA_PAPER_API_KEY used)
- [ ] Position size limits enforced ($1000 max)
- [ ] Stop losses required (5% for longs)
- [ ] Portfolio limits checked (20% max per position)
- [ ] Cash reserve maintained (50% minimum)
Agentic Parallelism Patterns¶
Competitive Ensemble Pattern (Pattern 6)¶
Status: 🔴 Not Implemented (MEDIUM PRIORITY)
Reference: docs/architecture/AGENTIC_PARALLELISM_PATTERNS.md
The Competitive Ensemble pattern addresses a fundamental limitation of single-strategy trading: each strategy has blind spots. By running multiple diverse strategy agents in parallel and selecting the best signal, we reduce the risk of any single strategy's failure mode.
Current Flow (Single Strategy):
Proposed Flow (Competitive Ensemble):
Market Data
├─→ [Parallel] TrendFollowingAgent
│ Signal: BUY AAPL @ $175, confidence 0.72
├─→ [Parallel] MeanReversionAgent
│ Signal: HOLD (no signal)
├─→ [Parallel] MomentumAgent
│ Signal: BUY AAPL @ $175, confidence 0.85
└─→ SignalAggregator (Judge)
- Consensus: 2/3 agents agree on BUY
- Best confidence: 0.85 (Momentum)
- Final: BUY AAPL with position size weighted by consensus
Implementation for Trading System:
# trading_agents/strategy_ensemble.py
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Optional
from dataclasses import dataclass
@dataclass
class TradingSignal:
action: str # BUY, SELL, HOLD
symbol: str
price: float
confidence: float
strategy: str
reasoning: str
class StrategyEnsemble:
"""Competitive ensemble of diverse trading strategies."""
def __init__(self, strategies: List[BaseStrategy]):
self.strategies = strategies
self.executor = ThreadPoolExecutor(max_workers=len(strategies))
def get_signal(self, symbol: str, market_data: dict) -> Optional[TradingSignal]:
"""Run all strategies in parallel, aggregate signals."""
# Launch parallel strategy evaluation
futures = {
strategy.name: self.executor.submit(
strategy.evaluate, symbol, market_data
)
for strategy in self.strategies
}
# Collect signals
signals: List[TradingSignal] = []
for name, future in futures.items():
try:
signal = future.result(timeout=5.0)
if signal and signal.action != "HOLD":
signals.append(signal)
except Exception as e:
logger.warning(f"Strategy {name} failed: {e}")
if not signals:
return None # No actionable signals
# Aggregate using voting + confidence weighting
return self._aggregate_signals(signals)
def _aggregate_signals(self, signals: List[TradingSignal]) -> TradingSignal:
"""Select best signal using consensus + confidence."""
# Group by action
buy_signals = [s for s in signals if s.action == "BUY"]
sell_signals = [s for s in signals if s.action == "SELL"]
# Consensus voting
if len(buy_signals) > len(sell_signals):
candidates = buy_signals
elif len(sell_signals) > len(buy_signals):
candidates = sell_signals
else:
# Tie: pick highest confidence
candidates = signals
# Select highest confidence from winning group
best = max(candidates, key=lambda s: s.confidence)
# Adjust confidence by consensus strength
consensus_ratio = len(candidates) / len(signals)
best.confidence *= consensus_ratio
return best
Diverse Strategy Set:
| Strategy | Strength | Weakness | Diversification Role |
|---|---|---|---|
| TrendFollowing | Strong in trends | Whipsaws in ranging | Captures extended moves |
| MeanReversion | Strong in ranges | Killed by trends | Catches overextensions |
| Momentum | Fast signals | False breakouts | Early entry timing |
| Volatility | Risk-adjusted | Slow signals | Position sizing |
Integration Points:
1. trading_agents/strategy_agent.py - Replace single strategy with ensemble
2. trading_agents/config.py - Define strategy weights and confidence thresholds
3. Redis message bus - Add strategy.ensemble.signal event type
Expected Impact: - Win rate improvement: 10-20% (consensus filtering) - Drawdown reduction: 15-25% (no single strategy failure) - Signal quality: Higher confidence signals only
Redundant Execution Pattern (Pattern 9)¶
Status: 🟡 Partially Implemented (Circuit Breakers)
Reference: docs/architecture/AGENTIC_PARALLELISM_PATTERNS.md
Our existing circuit breaker implementation provides fail-fast behavior, but Pattern 9 extends this with parallel redundant execution for critical operations.
Current Flow (Circuit Breaker Only):
Proposed Flow (Redundant Execution):
Critical Order Request
├─→ [Parallel] Alpaca Primary
└─→ [Parallel] Alpaca Backup (different endpoint)
→ First successful response wins
→ Second request cancelled
→ If both fail, circuit opens
Use Cases for Redundant Execution: 1. Stop Loss Orders - Critical for risk management 2. Position Exit - Must execute even if primary API slow 3. Emergency Liquidation - All-or-nothing reliability
Assembly Line Pattern (Pattern 7) for Email Correlation¶
Status: 🔴 Not Implemented (LOW PRIORITY)
Reference: docs/architecture/AGENTIC_PARALLELISM_PATTERNS.md
While not directly trading-related, the Assembly Line pattern applies to our email-to-transcript correlation pipeline, which feeds client intelligence into trading decisions.
Current Flow (Sequential):
Email → Parse → Extract Entities → Correlate → Store
100ms 200ms 300ms 100ms
Total: 700ms per email (sequential)
Proposed Flow (Pipeline Parallelism):
Stage 1 (Parse): E1 → E2 → E3 → E4 → ...
Stage 2 (Extract): E1 → E2 → E3 → E4 → ...
Stage 3 (Correlate): E1 → E2 → E3 → E4 → ...
Stage 4 (Store): E1 → E2 → E3 → E4 → ...
Throughput: 1 email every 200ms (5x improvement)
Next Steps¶
Phase 1a: Foundation (This Week)¶
- Set up Docker Compose (Redis + Jaeger)
- Implement base agent class with tracing
- Implement message bus (Redis pub/sub)
- Implement circuit breakers (base, Schwab, Alpaca)
- Implement state machines (trade lifecycle, order lifecycle)
Phase 1b: Agents (Next Week)¶
- Implement MarketDataAgent
- Implement OrderExecutionAgent
- Implement StrategyAgent
- Implement RiskManagementAgent
- Implement PositionManagementAgent
Phase 1c: Testing (Week After)¶
- Unit tests for all agents
- Unit tests for state machines
- Unit tests for circuit breakers
- Integration test (full trade lifecycle)
- Load test (multiple concurrent trades)
Phase 1d: Validation (30 Days)¶
- Run 30 trades in paper account
- Verify state machines work correctly
- Verify circuit breakers prevent API abuse
- Analyze traces in Jaeger
- Measure performance and reliability
Status: Design Complete, Ready for Implementation Next: Implement Docker Compose + Redis + Jaeger infrastructure Timeline: 3-4 weeks for full implementation