Skip to content

Trading Multi-Agent System - Architecture Design

Date: 2025-10-28 Phase: 1 of 3 (Trading Multi-Agent → Isolation → IT Raven Enterprise) Status: Implementation In Progress


Overview

Production-ready multi-agent system for automated trading with: - State machines for trade lifecycle management - Circuit breakers for API reliability - Distributed tracing for audit trail - Event-driven architecture for loose coupling - Agent boundaries for single responsibility


Agent Boundaries

1. MarketDataAgent

Responsibility: Real-time quotes, historical bars, market status

Capabilities: - Get latest quote for symbol - Get historical bars (1min, 5min, 1hour, 1day) - Subscribe to real-time price streams - Check market open/closed status

Message Inputs: - market_data.get_quote - Fetch current price - market_data.get_bars - Fetch historical data - market_data.subscribe - Start price stream - market_data.check_status - Market open/closed?

Message Outputs: - market_data.quote_updated - New price available - market_data.bars_received - Historical data ready - market_data.stream_tick - Real-time price update - market_data.market_status - Open/closed/pre-market/after-hours

State: Stateless (pure data fetcher)

Circuit Breaker: Yes (Alpaca API) - Failure threshold: 3 - Timeout: 60 seconds - Half-open retry: 1 request

Rate Limiting: 200 req/min (Alpaca limit)


2. OrderExecutionAgent

Responsibility: Order submission, fills, cancellations, modifications

Capabilities: - Submit market/limit/stop orders - Cancel pending orders - Modify order prices - Track order status (submitted → filled/rejected/cancelled)

Message Inputs: - order.submit - Place new order - order.cancel - Cancel pending order - order.modify - Change price/quantity - order.get_status - Check order state

Message Outputs: - order.submitted - Order sent to broker - order.filled - Order executed - order.partially_filled - Partial execution - order.rejected - Order rejected by broker - order.cancelled - Order cancelled

State: Maintains order tracking (order_id → order_status)

Circuit Breakers: - Schwab API: 3 failures → 60s timeout - Alpaca API: 3 failures → 60s timeout

Rate Limiting: - Schwab: 120 req/min - Alpaca: 200 req/min


3. StrategyAgent

Responsibility: Signal generation, entry/exit logic, trend following rules

Capabilities: - Scan for entry candidates - Evaluate setups (MA20, RSI, volume) - Generate entry signals - Manage position (trailing stops, profit targets) - Decide when to exit

Message Inputs: - strategy.scan_candidates - Find potential entries - strategy.evaluate_setup - Check if criteria met - strategy.manage_position - Monitor open trade - strategy.should_exit - Check exit conditions

Message Outputs: - strategy.candidate_found - Potential entry detected - strategy.entry_signal - Criteria met, ready to enter - strategy.hold_signal - Continue holding position - strategy.exit_signal - Time to exit (target/stop/time)

State Machine: Trade Lifecycle (see below)

States: - SCANNING - Looking for setups - SETUP_DETECTED - Found candidate, evaluating - ENTRY_APPROVED - Ready to place order - HOLDING - Position open, monitoring - EXIT_TRIGGERED - Exit condition met


4. RiskManagementAgent

Responsibility: Position sizing, portfolio limits, drawdown management

Capabilities: - Calculate position size (max $1000) - Check portfolio limits (max 20% per position) - Enforce cash reserve (min 50%) - Track drawdown (max 20%) - Approve/reject trades based on risk

Message Inputs: - risk.calculate_size - Determine position size - risk.check_limits - Validate against rules - risk.assess_portfolio - Check overall risk - risk.approve_trade - Final risk approval

Message Outputs: - risk.size_calculated - Position size determined - risk.limit_exceeded - Trade rejected (too risky) - risk.trade_approved - Risk checks passed - risk.drawdown_alert - Drawdown threshold reached

State: Portfolio-level risk metrics

Rules: - Max position size: $1000 - Max portfolio exposure: 20% per position - Min cash reserve: 50% - Max drawdown: 20% - Stop loss required: 5% for longs, +5% for shorts


5. PositionManagementAgent

Responsibility: Position tracking, stop loss management, P&L calculation

Capabilities: - Track all open positions - Set/update stop losses - Calculate unrealized P&L - Monitor for stop/target hits - Handle short positions correctly

Message Inputs: - position.track - Add new position - position.set_stop - Place stop loss order - position.update_stop - Modify stop (trail higher) - position.calculate_pnl - Get current P&L - position.check_exit - Stop/target hit?

Message Outputs: - position.opened - New position tracked - position.stop_set - Stop loss placed - position.stop_hit - Stop loss triggered - position.target_hit - Profit target reached - position.pnl_updated - P&L recalculated

State: Position inventory (symbol → position_data)

Special Handling: Short positions (BUY stops ABOVE entry)


Message Bus Architecture

Technology: Redis Pub/Sub

Event Naming Convention: <agent>.<action>.<status>

Examples: - market_data.quote.updated - order.submit.success - strategy.entry.signal - risk.limit.exceeded - position.stop.hit

Message Format:

{
  "event": "strategy.entry.signal",
  "timestamp": "2025-10-28T14:30:00Z",
  "trace_id": "trade-123-abc",
  "data": {
    "symbol": "AAPL",
    "entry_price": 175.50,
    "stop_price": 166.72,
    "target_price": 201.82,
    "quantity": 5,
    "risk_reward_ratio": 3.0,
    "strategy": "trend_following_3_to_1"
  },
  "metadata": {
    "source": "StrategyAgent",
    "correlation_id": "scan-456",
    "priority": "high"
  }
}

State Machines

Trade Lifecycle State Machine

States:
  SCANNING          - Looking for entry setups
  SETUP_DETECTED    - Found candidate, evaluating criteria
  ENTRY_APPROVED    - Risk approved, ready to place order
  ORDER_PLACED      - Entry order submitted to broker
  FILLED            - Entry order executed
  HOLDING           - Position open, stop loss set
  EXIT_TRIGGERED    - Exit condition met (target/stop/time)
  ORDER_CLOSING     - Exit order submitted
  CLOSED            - Position closed, trade complete

Transitions:
  SCANNING → SETUP_DETECTED
    Trigger: strategy.candidate_found
    Condition: Price > MA20, RSI 50-70, Volume > avg

  SETUP_DETECTED → ENTRY_APPROVED
    Trigger: risk.trade_approved
    Condition: Position size < $1000, portfolio < 20%, cash > 50%

  ENTRY_APPROVED → ORDER_PLACED
    Trigger: order.submit
    Condition: Market open

  ORDER_PLACED → FILLED
    Trigger: order.filled
    Condition: Broker confirms execution

  FILLED → HOLDING
    Trigger: position.stop_set
    Condition: Stop loss order placed successfully

  HOLDING → EXIT_TRIGGERED
    Trigger: position.target_hit OR position.stop_hit OR strategy.exit_signal
    Condition: Exit criteria met

  EXIT_TRIGGERED → ORDER_CLOSING
    Trigger: order.submit (exit)
    Condition: Market open

  ORDER_CLOSING → CLOSED
    Trigger: order.filled (exit)
    Condition: Broker confirms execution

Invalid Transitions (raise exception):
  - SCANNING → ORDER_PLACED (must evaluate setup first)
  - SETUP_DETECTED → FILLED (must place order first)
  - ORDER_PLACED → CLOSED (must fill and hold first)

Order State Machine

States:
  DRAFT             - Order created, not submitted
  SUBMITTED         - Sent to broker
  ACCEPTED          - Broker accepted order
  PARTIALLY_FILLED  - Some shares executed
  FILLED            - Fully executed
  CANCELLED         - User/broker cancelled
  REJECTED          - Broker rejected order
  EXPIRED           - Time-in-force expired

Transitions:
  DRAFT → SUBMITTED
    Trigger: order.submit

  SUBMITTED → ACCEPTED
    Trigger: broker confirmation

  ACCEPTED → PARTIALLY_FILLED
    Trigger: partial fill event

  PARTIALLY_FILLED → FILLED
    Trigger: remaining shares filled

  ACCEPTED → CANCELLED
    Trigger: order.cancel OR market closed

  SUBMITTED → REJECTED
    Trigger: broker rejection (insufficient funds, invalid symbol, etc)

Circuit Breaker Implementation

Pattern: Fail-fast with exponential backoff

SchwabCircuitBreaker

class SchwabCircuitBreaker:
    """
    Circuit breaker for Schwab API with special token handling.

    States:
      CLOSED - Normal operation
      OPEN - API unavailable, reject requests immediately
      HALF_OPEN - Testing if API recovered

    Thresholds:
      - Failure count: 3
      - Timeout: 60 seconds
      - Half-open retry: 1 request
    """

    def __init__(self):
        self.state = "CLOSED"
        self.failure_count = 0
        self.failure_threshold = 3
        self.timeout = 60  # seconds
        self.last_failure_time = None
        self.half_open_success = False

    def call(self, func, *args, **kwargs):
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "HALF_OPEN"
                logger.info("Schwab circuit breaker entering HALF_OPEN")
            else:
                raise CircuitBreakerOpenError("Schwab API unavailable")

        try:
            result = func(*args, **kwargs)

            if self.state == "HALF_OPEN":
                self.state = "CLOSED"
                self.failure_count = 0
                logger.info("Schwab circuit breaker recovered, now CLOSED")

            return result

        except TokenExpiredError:
            logger.error("Schwab token expired. Run: uv run python3 tools/schwab_setup.py")
            self.state = "OPEN"
            self.last_failure_time = time.time()
            raise

        except RateLimitError as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"
                logger.error("Schwab circuit breaker OPENED (rate limit)")

            raise

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"
                logger.error(f"Schwab circuit breaker OPENED: {str(e)}")

            raise

AlpacaCircuitBreaker

class AlpacaCircuitBreaker:
    """
    Circuit breaker for Alpaca API (paper + live accounts).

    Thresholds:
      - Failure count: 3
      - Timeout: 60 seconds
      - Rate limit: 200 req/min
    """

    def __init__(self, account_type="paper"):
        self.account_type = account_type
        self.state = "CLOSED"
        self.failure_count = 0
        self.failure_threshold = 3
        self.timeout = 60
        self.last_failure_time = None

    def call(self, func, *args, **kwargs):
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "HALF_OPEN"
                logger.info(f"Alpaca {self.account_type} circuit breaker entering HALF_OPEN")
            else:
                raise CircuitBreakerOpenError(f"Alpaca {self.account_type} API unavailable")

        try:
            result = func(*args, **kwargs)

            if self.state == "HALF_OPEN":
                self.state = "CLOSED"
                self.failure_count = 0
                logger.info(f"Alpaca {self.account_type} circuit breaker recovered")

            return result

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"
                logger.error(f"Alpaca {self.account_type} circuit breaker OPENED: {str(e)}")

            raise

Distributed Tracing

Technology: OpenTelemetry + Jaeger

Trace Structure:

Trade Execution Trace (trade-123-abc):
├─ strategy.scan_candidates [500ms]
│  ├─ market_data.get_quotes [200ms]
│  └─ strategy.evaluate_criteria [300ms]
├─ strategy.evaluate_setup [100ms]
├─ risk.approve_trade [50ms]
│  ├─ risk.calculate_size [20ms]
│  └─ risk.check_limits [30ms]
├─ order.submit [300ms]
│  └─ alpaca_api.place_order [280ms]
├─ position.track [50ms]
├─ position.set_stop [200ms]
│  └─ alpaca_api.place_stop_order [180ms]
└─ Total: 1200ms

Instrumentation Points: - Agent method entry/exit - External API calls (Schwab, Alpaca) - State machine transitions - Circuit breaker state changes - Message bus publish/subscribe

Trace Attributes: - trade.symbol: Stock symbol - trade.entry_price: Entry price - trade.quantity: Share count - trade.strategy: Strategy name - order.id: Order ID - order.status: Order status - circuit_breaker.state: CLOSED/OPEN/HALF_OPEN - error.type: Exception type (if failed)


Directory Structure

/Users/bertfrichot/mem-agent-mcp/
├── trading_agents/                    # NEW: Multi-agent trading system
│   ├── __init__.py
│   ├── base_agent.py                  # Base agent class with tracing
│   ├── market_data_agent.py           # MarketDataAgent
│   ├── order_execution_agent.py       # OrderExecutionAgent
│   ├── strategy_agent.py              # StrategyAgent
│   ├── risk_management_agent.py       # RiskManagementAgent
│   ├── position_management_agent.py   # PositionManagementAgent
│   ├── state_machines/
│   │   ├── __init__.py
│   │   ├── trade_state_machine.py     # Trade lifecycle
│   │   └── order_state_machine.py     # Order lifecycle
│   ├── circuit_breakers/
│   │   ├── __init__.py
│   │   ├── base_breaker.py            # Base circuit breaker
│   │   ├── schwab_breaker.py          # Schwab API breaker
│   │   └── alpaca_breaker.py          # Alpaca API breaker
│   ├── message_bus/
│   │   ├── __init__.py
│   │   ├── redis_bus.py               # Redis pub/sub
│   │   └── message.py                 # Message schema
│   ├── tracing/
│   │   ├── __init__.py
│   │   └── tracer.py                  # OpenTelemetry setup
│   └── config.py                      # Agent configuration
├── tests/
│   └── trading_agents/                # NEW: Agent tests
│       ├── test_market_data_agent.py
│       ├── test_order_execution_agent.py
│       ├── test_strategy_agent.py
│       ├── test_risk_management_agent.py
│       ├── test_position_management_agent.py
│       ├── test_trade_state_machine.py
│       ├── test_circuit_breakers.py
│       └── test_integration.py        # Full trade lifecycle
├── docker-compose.yml                 # Redis + Jaeger
└── docs/architecture/
    └── trading-multi-agent-design.md  # This file

Infrastructure Setup

Docker Compose

# docker-compose.yml

version: '3.8'

services:
  redis:
    image: redis:6379-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 3s
      retries: 3

  jaeger:
    image: jaegertracing/all-in-one:latest
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"  # Jaeger UI
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    healthcheck:
      test: ["CMD", "wget", "-q", "--spider", "http://localhost:14269/"]
      interval: 10s
      timeout: 3s
      retries: 3

volumes:
  redis_data:

Starting Infrastructure

# Start Redis + Jaeger
docker-compose up -d

# Verify
docker-compose ps

# View logs
docker-compose logs -f

# Access Jaeger UI
open http://localhost:16686

# Test Redis
redis-cli ping
# Should return: PONG

Agent Coordination Flow

Example: Full Trade Execution

1. StrategyAgent scans for candidates
   ├─ Publishes: strategy.scan.started
   ├─ Calls: MarketDataAgent.get_quotes([symbols])
   ├─ Evaluates: MA20, RSI, volume for each symbol
   └─ Publishes: strategy.candidate_found (symbol=AAPL)

2. StrategyAgent evaluates setup
   ├─ Publishes: strategy.evaluate.started (symbol=AAPL)
   ├─ Calculates: entry, stop, target prices
   ├─ Validates: risk/reward > 2.5
   └─ Publishes: strategy.entry.signal (AAPL @ $175.50)

3. RiskManagementAgent approves trade
   ├─ Subscribes to: strategy.entry.signal
   ├─ Calculates: position size (max $1000 / $175.50 = 5 shares)
   ├─ Checks: portfolio limit (5 × $175.50 = $877.50 < $1000 ✓)
   ├─ Checks: cash reserve (buying power > $877.50 ✓)
   └─ Publishes: risk.trade.approved (AAPL, qty=5)

4. OrderExecutionAgent places entry order
   ├─ Subscribes to: risk.trade.approved
   ├─ Creates: limit order (AAPL, qty=5, price=$175.50)
   ├─ Calls: AlpacaAPI.submit_order() [with circuit breaker]
   └─ Publishes: order.submitted (order_id=abc123)

5. OrderExecutionAgent receives fill
   ├─ Monitors: order status via polling or webhook
   ├─ Detects: order status = FILLED
   ├─ Publishes: order.filled (AAPL, qty=5, fill_price=$175.50)
   └─ Updates: order state machine (SUBMITTED → FILLED)

6. PositionManagementAgent tracks position
   ├─ Subscribes to: order.filled
   ├─ Creates: position record (AAPL, qty=5, entry=$175.50)
   ├─ Calculates: stop=$166.72 (-5%), target=$201.82 (+15%)
   └─ Publishes: position.opened (AAPL)

7. PositionManagementAgent sets stop loss
   ├─ Creates: stop order (AAPL, qty=5, stop=$166.72)
   ├─ Calls: AlpacaAPI.submit_order(type=STOP) [with circuit breaker]
   └─ Publishes: position.stop.set (AAPL, stop=$166.72)

8. StrategyAgent monitors position
   ├─ Subscribes to: market_data.quote.updated (AAPL)
   ├─ Checks: current price vs target ($201.82)
   ├─ Updates: trade state machine (HOLDING)
   └─ Waits for: target hit OR stop hit

9. PositionManagementAgent detects target hit
   ├─ Monitors: real-time quotes
   ├─ Detects: current price >= $201.82
   ├─ Publishes: position.target.hit (AAPL)
   └─ Updates: trade state machine (HOLDING → EXIT_TRIGGERED)

10. OrderExecutionAgent places exit order
    ├─ Subscribes to: position.target.hit
    ├─ Creates: limit order (AAPL, qty=5, price=$201.82, side=SELL)
    ├─ Calls: AlpacaAPI.submit_order() [with circuit breaker]
    └─ Publishes: order.submitted (exit order_id=xyz789)

11. OrderExecutionAgent receives exit fill
    ├─ Monitors: exit order status
    ├─ Detects: order status = FILLED
    ├─ Publishes: order.filled (AAPL exit, fill_price=$201.82)
    └─ Updates: order state machine (SUBMITTED → FILLED)

12. PositionManagementAgent closes trade
    ├─ Subscribes to: order.filled (exit)
    ├─ Calculates: P&L ($201.82 - $175.50) × 5 = $131.60
    ├─ Updates: trade state machine (EXIT_TRIGGERED → CLOSED)
    ├─ Publishes: position.closed (AAPL, pnl=$131.60, reason=TARGET_HIT)
    └─ Cancels: stop loss order (no longer needed)

13. StrategyAgent logs trade
    ├─ Subscribes to: position.closed
    ├─ Records: trade in journal
    ├─ Updates: strategy metrics (win rate, avg R/R)
    └─ Publishes: strategy.trade.logged (AAPL)

Total execution time: ~2-5 seconds (synchronous steps)
Total hold time: Hours to days (async monitoring)

Error Handling

Circuit Breaker Scenarios

Scenario 1: Alpaca API Down

1. MarketDataAgent calls AlpacaAPI.get_quote()
2. Request fails (timeout, 500 error, network issue)
3. AlpacaCircuitBreaker.failure_count increments (1 → 2 → 3)
4. On 3rd failure: Circuit breaker opens
5. All subsequent requests fail immediately with CircuitBreakerOpenError
6. After 60 seconds: Circuit breaker enters HALF_OPEN
7. Next request succeeds → Circuit breaker closes
8. OR next request fails → Circuit breaker reopens for another 60s

Scenario 2: Schwab Token Expired

1. MarketDataAgent calls SchwabAPI.get_quote()
2. API returns 401 Unauthorized (token expired)
3. SchwabCircuitBreaker detects TokenExpiredError
4. Circuit breaker immediately opens (don't retry expired tokens)
5. Log error: "Schwab token expired. Run: uv run python3 tools/schwab_setup.py"
6. All agents stop using Schwab API
7. Manual intervention required (refresh token)
8. After token refresh: Circuit breaker can be manually reset

Scenario 3: Rate Limit Hit

1. Multiple agents call AlpacaAPI rapidly
2. API returns 429 Too Many Requests
3. AlpacaCircuitBreaker detects RateLimitError
4. Circuit breaker opens after 3 rate limit errors
5. Agents wait 60 seconds before retrying
6. During wait: All requests fail fast (no API calls)
7. After timeout: Circuit breaker enters HALF_OPEN
8. Rate limit should be cleared by now
9. Next request succeeds → Circuit breaker closes

State Machine Invalid Transitions

Scenario 4: Try to Fill Before Placing Order

1. StrategyAgent generates entry signal
2. Bug: OrderExecutionAgent misses order.submit step
3. PositionManagementAgent receives order.filled event (stale/duplicate)
4. TradeStateMachine.fill() called while state = ENTRY_APPROVED
5. Raises: InvalidTransitionError("Cannot transition from ENTRY_APPROVED to FILLED")
6. Error logged with trace_id for debugging
7. Trade aborted, no position opened

Scenario 5: Market Closed During Exit

1. PositionManagementAgent detects target hit
2. OrderExecutionAgent tries to place exit order
3. MarketDataAgent.check_status() returns CLOSED
4. Exit order rejected (cannot trade when market closed)
5. OrderStateMachine remains in DRAFT (not submitted)
6. Agent waits for market open
7. On market open: Retry exit order submission

Distributed Trace Error Analysis

Scenario 6: Order Rejected by Broker

Trace: trade-123-abc (FAILED)
├─ strategy.scan_candidates [✓ 500ms]
├─ strategy.evaluate_setup [✓ 100ms]
├─ risk.approve_trade [✓ 50ms]
├─ order.submit [✗ 300ms] ERROR: InsufficientFunds
│  └─ alpaca_api.place_order [280ms]
│     └─ HTTP 403: "Insufficient buying power"
└─ Total: 950ms (FAILED)

Trace attributes:
- error.type: InsufficientFunds
- error.message: "Insufficient buying power"
- order.symbol: AAPL
- order.quantity: 5
- order.estimated_cost: $877.50
- account.buying_power: $500.00  ← ROOT CAUSE

Resolution:
- RiskManagementAgent needs to check buying_power BEFORE approving trade
- Add check: required_capital < account.buying_power


Testing Strategy

Unit Tests

# tests/trading_agents/test_trade_state_machine.py

def test_valid_transition_scanning_to_setup():
    sm = TradeStateMachine("AAPL")
    assert sm.state == "SCANNING"

    sm.detect_setup(entry=175.50, stop=166.72, target=201.82)
    assert sm.state == "SETUP_DETECTED"
    assert sm.entry_price == 175.50

def test_invalid_transition_scanning_to_filled():
    sm = TradeStateMachine("AAPL")

    with pytest.raises(InvalidTransitionError):
        sm.filled(fill_price=175.50)  # Can't fill without placing order

def test_circuit_breaker_opens_after_3_failures():
    breaker = AlpacaCircuitBreaker()
    mock_api = MagicMock(side_effect=TimeoutError)

    for i in range(3):
        with pytest.raises(TimeoutError):
            breaker.call(mock_api)

    assert breaker.state == "OPEN"

    with pytest.raises(CircuitBreakerOpenError):
        breaker.call(mock_api)

def test_circuit_breaker_half_open_after_timeout():
    breaker = AlpacaCircuitBreaker()
    breaker.state = "OPEN"
    breaker.last_failure_time = time.time() - 61  # 61 seconds ago

    mock_api = MagicMock(return_value="success")
    result = breaker.call(mock_api)

    assert result == "success"
    assert breaker.state == "CLOSED"

Integration Tests

# tests/trading_agents/test_integration.py

@pytest.mark.asyncio
async def test_full_trade_lifecycle():
    """
    Integration test: Scan → Setup → Entry → Holding → Exit
    Uses real agents with mocked broker APIs
    """

    # Setup
    redis_bus = RedisBus()
    mock_alpaca = MockAlpacaAPI()

    market_agent = MarketDataAgent(redis_bus, mock_alpaca)
    strategy_agent = StrategyAgent(redis_bus)
    risk_agent = RiskManagementAgent(redis_bus)
    order_agent = OrderExecutionAgent(redis_bus, mock_alpaca)
    position_agent = PositionManagementAgent(redis_bus, mock_alpaca)

    # Start agents
    await asyncio.gather(
        market_agent.start(),
        strategy_agent.start(),
        risk_agent.start(),
        order_agent.start(),
        position_agent.start()
    )

    # Trigger scan
    candidates = await strategy_agent.scan_candidates()
    assert len(candidates) > 0

    # Verify setup detection
    await asyncio.sleep(1)  # Wait for async processing
    assert strategy_agent.state == "SETUP_DETECTED"

    # Verify risk approval
    await asyncio.sleep(1)
    assert risk_agent.last_decision == "APPROVED"

    # Verify order placed
    await asyncio.sleep(1)
    assert len(mock_alpaca.orders) == 1
    assert mock_alpaca.orders[0].symbol == "AAPL"

    # Simulate fill
    mock_alpaca.fill_order(mock_alpaca.orders[0].id, price=175.50)
    await asyncio.sleep(1)

    # Verify position opened
    assert len(position_agent.positions) == 1
    assert position_agent.positions["AAPL"].entry_price == 175.50

    # Verify stop loss set
    await asyncio.sleep(1)
    stop_orders = [o for o in mock_alpaca.orders if o.type == "STOP"]
    assert len(stop_orders) == 1
    assert stop_orders[0].stop_price == 166.72

    # Simulate target hit
    mock_alpaca.update_quote("AAPL", price=201.82)
    await asyncio.sleep(2)  # Wait for monitoring loop

    # Verify exit order placed
    exit_orders = [o for o in mock_alpaca.orders if o.side == "SELL"]
    assert len(exit_orders) >= 1

    # Simulate exit fill
    mock_alpaca.fill_order(exit_orders[0].id, price=201.82)
    await asyncio.sleep(1)

    # Verify trade closed
    assert position_agent.positions["AAPL"].state == "CLOSED"
    assert position_agent.positions["AAPL"].pnl > 0

    # Cleanup
    await asyncio.gather(
        market_agent.stop(),
        strategy_agent.stop(),
        risk_agent.stop(),
        order_agent.stop(),
        position_agent.stop()
    )

Deployment Checklist

Infrastructure

  • [ ] Docker Compose running (Redis + Jaeger)
  • [ ] Redis accessible (redis-cli ping)
  • [ ] Jaeger UI accessible (http://localhost:16686)

Configuration

  • [ ] .env files present (trading-command-center/.env)
  • [ ] Alpaca API keys configured
  • [ ] Schwab API keys configured
  • [ ] Schwab OAuth tokens fresh (<7 days)

Agents

  • [ ] All 5 agents implemented
  • [ ] State machines tested (unit tests pass)
  • [ ] Circuit breakers tested (unit tests pass)
  • [ ] Integration tests pass (full trade lifecycle)

Observability

  • [ ] OpenTelemetry instrumentation added
  • [ ] Traces visible in Jaeger UI
  • [ ] Error traces include stack traces
  • [ ] Performance metrics captured

Safety

  • [ ] Paper trading only (ALPACA_PAPER_API_KEY used)
  • [ ] Position size limits enforced ($1000 max)
  • [ ] Stop losses required (5% for longs)
  • [ ] Portfolio limits checked (20% max per position)
  • [ ] Cash reserve maintained (50% minimum)

Agentic Parallelism Patterns

Competitive Ensemble Pattern (Pattern 6)

Status: 🔴 Not Implemented (MEDIUM PRIORITY) Reference: docs/architecture/AGENTIC_PARALLELISM_PATTERNS.md

The Competitive Ensemble pattern addresses a fundamental limitation of single-strategy trading: each strategy has blind spots. By running multiple diverse strategy agents in parallel and selecting the best signal, we reduce the risk of any single strategy's failure mode.

Current Flow (Single Strategy):

Market Data → StrategyAgent → Single Signal → Risk Check → Order

Proposed Flow (Competitive Ensemble):

Market Data
    ├─→ [Parallel] TrendFollowingAgent
    │       Signal: BUY AAPL @ $175, confidence 0.72
    ├─→ [Parallel] MeanReversionAgent
    │       Signal: HOLD (no signal)
    ├─→ [Parallel] MomentumAgent
    │       Signal: BUY AAPL @ $175, confidence 0.85
    └─→ SignalAggregator (Judge)
            - Consensus: 2/3 agents agree on BUY
            - Best confidence: 0.85 (Momentum)
            - Final: BUY AAPL with position size weighted by consensus

Implementation for Trading System:

# trading_agents/strategy_ensemble.py
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Optional
from dataclasses import dataclass

@dataclass
class TradingSignal:
    action: str  # BUY, SELL, HOLD
    symbol: str
    price: float
    confidence: float
    strategy: str
    reasoning: str

class StrategyEnsemble:
    """Competitive ensemble of diverse trading strategies."""

    def __init__(self, strategies: List[BaseStrategy]):
        self.strategies = strategies
        self.executor = ThreadPoolExecutor(max_workers=len(strategies))

    def get_signal(self, symbol: str, market_data: dict) -> Optional[TradingSignal]:
        """Run all strategies in parallel, aggregate signals."""

        # Launch parallel strategy evaluation
        futures = {
            strategy.name: self.executor.submit(
                strategy.evaluate, symbol, market_data
            )
            for strategy in self.strategies
        }

        # Collect signals
        signals: List[TradingSignal] = []
        for name, future in futures.items():
            try:
                signal = future.result(timeout=5.0)
                if signal and signal.action != "HOLD":
                    signals.append(signal)
            except Exception as e:
                logger.warning(f"Strategy {name} failed: {e}")

        if not signals:
            return None  # No actionable signals

        # Aggregate using voting + confidence weighting
        return self._aggregate_signals(signals)

    def _aggregate_signals(self, signals: List[TradingSignal]) -> TradingSignal:
        """Select best signal using consensus + confidence."""

        # Group by action
        buy_signals = [s for s in signals if s.action == "BUY"]
        sell_signals = [s for s in signals if s.action == "SELL"]

        # Consensus voting
        if len(buy_signals) > len(sell_signals):
            candidates = buy_signals
        elif len(sell_signals) > len(buy_signals):
            candidates = sell_signals
        else:
            # Tie: pick highest confidence
            candidates = signals

        # Select highest confidence from winning group
        best = max(candidates, key=lambda s: s.confidence)

        # Adjust confidence by consensus strength
        consensus_ratio = len(candidates) / len(signals)
        best.confidence *= consensus_ratio

        return best

Diverse Strategy Set:

Strategy Strength Weakness Diversification Role
TrendFollowing Strong in trends Whipsaws in ranging Captures extended moves
MeanReversion Strong in ranges Killed by trends Catches overextensions
Momentum Fast signals False breakouts Early entry timing
Volatility Risk-adjusted Slow signals Position sizing

Integration Points: 1. trading_agents/strategy_agent.py - Replace single strategy with ensemble 2. trading_agents/config.py - Define strategy weights and confidence thresholds 3. Redis message bus - Add strategy.ensemble.signal event type

Expected Impact: - Win rate improvement: 10-20% (consensus filtering) - Drawdown reduction: 15-25% (no single strategy failure) - Signal quality: Higher confidence signals only


Redundant Execution Pattern (Pattern 9)

Status: 🟡 Partially Implemented (Circuit Breakers) Reference: docs/architecture/AGENTIC_PARALLELISM_PATTERNS.md

Our existing circuit breaker implementation provides fail-fast behavior, but Pattern 9 extends this with parallel redundant execution for critical operations.

Current Flow (Circuit Breaker Only):

Order Request → Circuit Breaker → Alpaca API
                      ↓ (if open)
                    Fail Fast

Proposed Flow (Redundant Execution):

Critical Order Request
    ├─→ [Parallel] Alpaca Primary
    └─→ [Parallel] Alpaca Backup (different endpoint)

    → First successful response wins
    → Second request cancelled
    → If both fail, circuit opens

Use Cases for Redundant Execution: 1. Stop Loss Orders - Critical for risk management 2. Position Exit - Must execute even if primary API slow 3. Emergency Liquidation - All-or-nothing reliability


Assembly Line Pattern (Pattern 7) for Email Correlation

Status: 🔴 Not Implemented (LOW PRIORITY) Reference: docs/architecture/AGENTIC_PARALLELISM_PATTERNS.md

While not directly trading-related, the Assembly Line pattern applies to our email-to-transcript correlation pipeline, which feeds client intelligence into trading decisions.

Current Flow (Sequential):

Email → Parse → Extract Entities → Correlate → Store
        100ms    200ms             300ms       100ms
        Total: 700ms per email (sequential)

Proposed Flow (Pipeline Parallelism):

Stage 1 (Parse):      E1 → E2 → E3 → E4 → ...
Stage 2 (Extract):         E1 → E2 → E3 → E4 → ...
Stage 3 (Correlate):            E1 → E2 → E3 → E4 → ...
Stage 4 (Store):                     E1 → E2 → E3 → E4 → ...

Throughput: 1 email every 200ms (5x improvement)


Next Steps

Phase 1a: Foundation (This Week)

  1. Set up Docker Compose (Redis + Jaeger)
  2. Implement base agent class with tracing
  3. Implement message bus (Redis pub/sub)
  4. Implement circuit breakers (base, Schwab, Alpaca)
  5. Implement state machines (trade lifecycle, order lifecycle)

Phase 1b: Agents (Next Week)

  1. Implement MarketDataAgent
  2. Implement OrderExecutionAgent
  3. Implement StrategyAgent
  4. Implement RiskManagementAgent
  5. Implement PositionManagementAgent

Phase 1c: Testing (Week After)

  1. Unit tests for all agents
  2. Unit tests for state machines
  3. Unit tests for circuit breakers
  4. Integration test (full trade lifecycle)
  5. Load test (multiple concurrent trades)

Phase 1d: Validation (30 Days)

  1. Run 30 trades in paper account
  2. Verify state machines work correctly
  3. Verify circuit breakers prevent API abuse
  4. Analyze traces in Jaeger
  5. Measure performance and reliability

Status: Design Complete, Ready for Implementation Next: Implement Docker Compose + Redis + Jaeger infrastructure Timeline: 3-4 weeks for full implementation