Skip to content

Market Data Agent

Real-time market data fetching and distribution with circuit breaker protection


December 2025 Status

ACTIVE - Real-time quotes and historical bars from Alpaca API

Feature Status Details
Quote Fetching ✅ Active 50-100ms latency
Historical Bars ✅ Active 1min, 5min, 1hour, 1day timeframes
Market Status ✅ Active Open/closed detection
Circuit Breaker ✅ Active 60s recovery timeout

Overview

The Market Data Agent is responsible for fetching real-time quotes and historical bars from the Alpaca API, distributing this data to other agents via the Redis message bus.

Key Features: - Real-time quote fetching - Historical bars (1min, 5min, 1hour, 1day timeframes) - Market status checking (open/closed) - Circuit breaker protection for API reliability - Event-driven data distribution


Architecture

graph LR
    ST[Strategy Agent] -->|market_data.get_quote| REDIS[Redis Bus]
    REDIS -->|Request| MD[Market Data Agent]
    MD <-->|API Call| ALPACA[Alpaca API]
    MD -->|market_data.quote.updated| REDIS
    REDIS -->|Quote Data| ST
    REDIS -->|Quote Data| PM[Position Management]

    style MD fill:#e1f5ff
    style ALPACA fill:#fff3e0

Responsibilities

Primary Functions

  1. Quote Fetching: Fetch latest quote for a symbol (bid, ask, last price)
  2. Historical Bars: Fetch OHLCV data for technical analysis
  3. Market Status: Check if US market is open (9:30 AM - 4:00 PM ET)
  4. Circuit Breaker: Protect against API failures and rate limits

Events Published

market_data.quote.updated

Published when a quote is successfully fetched.

Data:

{
  "symbol": "AAPL",
  "ask_price": 175.50,
  "bid_price": 175.48,
  "last_price": 175.50,
  "timestamp": "2025-12-20T14:30:00"
}

market_data.bars.received

Published when historical bars are fetched.

Data:

{
  "symbol": "AAPL",
  "timeframe": "1Day",
  "bars": [
    {
      "timestamp": "2025-12-20T09:30:00",
      "open": 174.20,
      "high": 176.80,
      "low": 173.90,
      "close": 175.50,
      "volume": 42350000
    }
  ],
  "count": 50
}

market_data.status.updated

Published when market status is checked.

Data:

{
  "is_open": true,
  "timestamp": "2025-12-20T14:30:00"
}

market_data.quote.error

Published when quote fetching fails.

Data:

{
  "symbol": "AAPL",
  "error": "API rate limit exceeded"
}


Events Subscribed

market_data.get_quote

Request to fetch latest quote for a symbol.

Data:

{
  "symbol": "AAPL"
}

market_data.get_bars

Request to fetch historical bars.

Data:

{
  "symbol": "AAPL",
  "timeframe": "1Day",  // "1Min", "5Min", "1Hour", "1Day"
  "limit": 50          // Number of bars (default: 100)
}

market_data.check_status

Request to check market open/closed status.

Data: (empty)


Implementation Details

Quote Fetching with Circuit Breaker

def handle_market_data_get_quote(self, message: Message):
    """Handle quote request with circuit breaker protection."""
    symbol = message.data.get("symbol")

    with self.tracer.start_as_current_span("get_quote") as span:
        span.set_attribute("symbol", symbol)

        try:
            # Fetch quote with circuit breaker protection
            quote = self.circuit_breaker.call(
                self._fetch_quote,
                symbol
            )

            # Publish quote update event
            self.publish_event(
                event="market_data.quote.updated",
                data={
                    "symbol": symbol,
                    "ask_price": quote.ask_price,
                    "bid_price": quote.bid_price,
                    "last_price": quote.ask_price,
                    "timestamp": quote.timestamp.isoformat()
                },
                trace_id=message.trace_id
            )
        except Exception as e:
            logger.error(f"Failed to fetch quote for {symbol}: {e}")
            self.publish_event(
                event="market_data.quote.error",
                data={"symbol": symbol, "error": str(e)},
                trace_id=message.trace_id
            )

Historical Bars with Timeframe Parsing

def _parse_timeframe(self, timeframe_str: str) -> TimeFrame:
    """Parse timeframe string to Alpaca TimeFrame enum."""
    timeframe_map = {
        "1Min": TimeFrame.Minute,
        "5Min": TimeFrame(5, "Min"),
        "1Hour": TimeFrame.Hour,
        "1Day": TimeFrame.Day
    }

    timeframe = timeframe_map.get(timeframe_str)
    if timeframe is None:
        raise ValueError(f"Invalid timeframe: {timeframe_str}")

    return timeframe

def handle_market_data_get_bars(self, message: Message):
    """Handle bars request."""
    symbol = message.data.get("symbol")
    timeframe_str = message.data.get("timeframe", "1Day")
    limit = message.data.get("limit", 100)

    timeframe = self._parse_timeframe(timeframe_str)

    # Fetch bars with circuit breaker protection
    bars = self.circuit_breaker.call(
        self._fetch_bars,
        symbol,
        timeframe,
        limit
    )

    # Convert bars to list of dicts
    bars_data = []
    for bar in bars:
        bars_data.append({
            "timestamp": bar.timestamp.isoformat(),
            "open": bar.open,
            "high": bar.high,
            "low": bar.low,
            "close": bar.close,
            "volume": bar.volume
        })

    # Publish bars received event
    self.publish_event(
        event="market_data.bars.received",
        data={
            "symbol": symbol,
            "timeframe": timeframe_str,
            "bars": bars_data,
            "count": len(bars_data)
        },
        trace_id=message.trace_id
    )

Market Status Check

def _is_market_open(self) -> bool:
    """
    Check if US stock market is currently open.

    Market hours: 9:30 AM - 4:00 PM ET, Monday-Friday
    This is a simplified check (doesn't account for holidays).
    """
    now = datetime.now()

    # Check if weekday (0=Monday, 6=Sunday)
    if now.weekday() >= 5:  # Saturday or Sunday
        return False

    # Check time (simplified, assumes ET timezone)
    current_time = now.time()
    market_open = dt_time(9, 30)   # 9:30 AM
    market_close = dt_time(16, 0)  # 4:00 PM

    return market_open <= current_time <= market_close

Circuit Breaker Protection

The Market Data Agent uses a circuit breaker to protect against API failures:

Configuration: - Failure threshold: 50% failures in window - Minimum requests: 5 requests before opening circuit - Recovery timeout: 60 seconds - Half-open max calls: 1 call to test recovery

States:

CLOSED (normal) → OPEN (too many failures) → HALF_OPEN (testing) → CLOSED (recovered)

Example:

# Circuit breaker created at initialization
self.circuit_breaker = create_alpaca_paper_breaker() if paper else create_alpaca_live_breaker()

# All API calls protected
quote = self.circuit_breaker.call(
    self._fetch_quote,
    symbol
)

When the circuit is OPEN: - API calls are blocked immediately - Returns cached data or raises exception - Prevents cascading failures - Auto-recovery after timeout


Usage Examples

Request Quote

# Publish request for AAPL quote
market_data_agent.publish_event(
    event="market_data.get_quote",
    data={"symbol": "AAPL"}
)

# Agent will publish market_data.quote.updated event
# with current quote data

Request Historical Bars

# Request 50 daily bars for NVDA
market_data_agent.publish_event(
    event="market_data.get_bars",
    data={
        "symbol": "NVDA",
        "timeframe": "1Day",
        "limit": 50
    }
)

# Agent will publish market_data.bars.received event
# with OHLCV data for 50 days

Check Market Status

# Check if market is open
market_data_agent.publish_event(
    event="market_data.check_status",
    data={}
)

# Agent will publish market_data.status.updated event
# with is_open: true/false

Initialization

from trading_agents import MarketDataAgent

# Initialize with Alpaca Paper credentials
agent = MarketDataAgent(
    api_key=None,           # If None, loads from .env
    secret_key=None,        # If None, loads from .env
    paper=True,             # Use paper trading account
    redis_host="localhost",
    redis_port=6379
)

# Start agent (subscribes to events, starts message bus)
agent.start()

# Agent is now listening for market data requests

Performance Characteristics

Quote Latency

  • Average: 50-100ms (Alpaca API)
  • Circuit breaker overhead: <1ms
  • Event publishing: <5ms
  • Total: ~60-110ms from request to quote.updated event

Bars Latency

  • Average: 200-500ms (depends on number of bars)
  • 50 bars (1 day timeframe): ~250ms
  • 100 bars (1 hour timeframe): ~400ms

Rate Limits

  • Alpaca Paper: 200 requests/minute
  • Alpaca Live: 200 requests/minute
  • Circuit breaker prevents exceeding limits

Error Handling

Common Errors

1. Invalid Symbol

{
  "event": "market_data.quote.error",
  "data": {
    "symbol": "INVALID",
    "error": "Symbol not found"
  }
}

2. API Rate Limit

{
  "event": "market_data.quote.error",
  "data": {
    "symbol": "AAPL",
    "error": "API rate limit exceeded"
  }
}
Circuit breaker will OPEN to prevent further requests until recovery.

3. Market Closed

{
  "event": "market_data.quote.updated",
  "data": {
    "symbol": "AAPL",
    "ask_price": 175.50,
    "bid_price": 175.48,
    "last_price": 175.50,
    "timestamp": "2025-12-20T16:05:00"
  }
}
Quote will be stale (last quote before close). Check market status first.


Integration with Other Agents

Strategy Agent Integration

# Strategy requests quote for evaluation
strategy_agent.publish_event(
    event="market_data.get_quote",
    data={"symbol": "AAPL"}
)

# Market Data Agent responds
# market_data.quote.updated → Strategy Agent evaluates setup

Position Management Integration

# Position Management monitors positions
position_agent.subscribe_to("market_data.quote.updated")

# On each quote update:
# - Calculate unrealized P&L
# - Check for stop/target hits
# - Update trailing stops

Distributed Tracing

All market data operations are traced with OpenTelemetry:

market_data.get_quote [trace_id: abc123]
├─ get_quote (span)
│  ├─ symbol: AAPL
│  ├─ circuit_breaker.call
│  └─ alpaca.get_stock_latest_quote
└─ publish_event (market_data.quote.updated)

Trace Attributes: - symbol - Stock symbol - timeframe - For bars requests - limit - Number of bars requested - error - If operation failed - circuit_breaker.state - CLOSED, OPEN, or HALF_OPEN


Monitoring

Key Metrics to Monitor

  1. Quote Request Rate: Requests per minute
  2. Circuit Breaker State: CLOSED (healthy) vs OPEN (degraded)
  3. Error Rate: % of failed requests
  4. Latency: P50, P95, P99 response times
  5. Cache Hit Rate: For repeated symbol requests

Health Check

# Check if agent is running and healthy
if market_data_agent.running:
    print("Market Data Agent: RUNNING")

    # Check circuit breaker state
    if market_data_agent.circuit_breaker.state == "CLOSED":
        print("Circuit Breaker: HEALTHY")
    else:
        print(f"Circuit Breaker: {market_data_agent.circuit_breaker.state}")

Best Practices

1. Cache Quote Requests

Don't request the same symbol repeatedly within 1 second. Cache quotes locally.

# Bad: Request quote every 100ms
for i in range(10):
    market_data_agent.publish_event(
        event="market_data.get_quote",
        data={"symbol": "AAPL"}
    )
    time.sleep(0.1)

# Good: Request once, cache result
market_data_agent.publish_event(
    event="market_data.get_quote",
    data={"symbol": "AAPL"}
)
# Store in cache, reuse for 1 second

2. Check Market Status First

Before requesting quotes, check if market is open.

# Check market status
market_data_agent.publish_event(
    event="market_data.check_status",
    data={}
)

# Wait for response, then request quotes
# Avoids stale data from market closed

3. Use Appropriate Timeframes

Match timeframe to your strategy needs: - 1Min: High-frequency scalping (rarely needed) - 5Min: Intraday momentum trading - 1Hour: Swing trading setups - 1Day: Position trading (most common)

4. Limit Bars Requests

Request only the bars you need (typically 20-50 for MA calculations).

# Good: Request 50 bars for MA20 calculation
market_data_agent.publish_event(
    event="market_data.get_bars",
    data={
        "symbol": "AAPL",
        "timeframe": "1Day",
        "limit": 50  # Enough for MA20 + some buffer
    }
)

# Bad: Request 1000 bars (slow, wasteful)
market_data_agent.publish_event(
    event="market_data.get_bars",
    data={
        "symbol": "AAPL",
        "timeframe": "1Day",
        "limit": 1000  # Too many!
    }
)

Troubleshooting

Problem: No quote.updated events received

Solution: Check if agent is subscribed to market_data.get_quote events.

Problem: Circuit breaker stuck in OPEN state

Solution: Wait 60 seconds for auto-recovery or manually reset circuit breaker.

Problem: Stale quote data

Solution: Check market status. Quotes are only updated during market hours.

Problem: High latency (>500ms for quotes)

Solution: Check Alpaca API status, circuit breaker may be degraded.


See Also

References


Last Updated: January 10, 2026 Platform: Alpaca API | Redis Pub/Sub | Circuit Breaker Protection