Market Data Agent¶
Real-time market data fetching and distribution with circuit breaker protection
December 2025 Status
ACTIVE - Real-time quotes and historical bars from Alpaca API
| Feature | Status | Details |
|---|---|---|
| Quote Fetching | ✅ Active | 50-100ms latency |
| Historical Bars | ✅ Active | 1min, 5min, 1hour, 1day timeframes |
| Market Status | ✅ Active | Open/closed detection |
| Circuit Breaker | ✅ Active | 60s recovery timeout |
Overview¶
The Market Data Agent is responsible for fetching real-time quotes and historical bars from the Alpaca API, distributing this data to other agents via the Redis message bus.
Key Features: - Real-time quote fetching - Historical bars (1min, 5min, 1hour, 1day timeframes) - Market status checking (open/closed) - Circuit breaker protection for API reliability - Event-driven data distribution
Architecture¶
graph LR
ST[Strategy Agent] -->|market_data.get_quote| REDIS[Redis Bus]
REDIS -->|Request| MD[Market Data Agent]
MD <-->|API Call| ALPACA[Alpaca API]
MD -->|market_data.quote.updated| REDIS
REDIS -->|Quote Data| ST
REDIS -->|Quote Data| PM[Position Management]
style MD fill:#e1f5ff
style ALPACA fill:#fff3e0
Responsibilities¶
Primary Functions¶
- Quote Fetching: Fetch latest quote for a symbol (bid, ask, last price)
- Historical Bars: Fetch OHLCV data for technical analysis
- Market Status: Check if US market is open (9:30 AM - 4:00 PM ET)
- Circuit Breaker: Protect against API failures and rate limits
Events Published¶
market_data.quote.updated¶
Published when a quote is successfully fetched.
Data:
{
"symbol": "AAPL",
"ask_price": 175.50,
"bid_price": 175.48,
"last_price": 175.50,
"timestamp": "2025-12-20T14:30:00"
}
market_data.bars.received¶
Published when historical bars are fetched.
Data:
{
"symbol": "AAPL",
"timeframe": "1Day",
"bars": [
{
"timestamp": "2025-12-20T09:30:00",
"open": 174.20,
"high": 176.80,
"low": 173.90,
"close": 175.50,
"volume": 42350000
}
],
"count": 50
}
market_data.status.updated¶
Published when market status is checked.
Data:
market_data.quote.error¶
Published when quote fetching fails.
Data:
Events Subscribed¶
market_data.get_quote¶
Request to fetch latest quote for a symbol.
Data:
market_data.get_bars¶
Request to fetch historical bars.
Data:
{
"symbol": "AAPL",
"timeframe": "1Day", // "1Min", "5Min", "1Hour", "1Day"
"limit": 50 // Number of bars (default: 100)
}
market_data.check_status¶
Request to check market open/closed status.
Data: (empty)
Implementation Details¶
Quote Fetching with Circuit Breaker¶
def handle_market_data_get_quote(self, message: Message):
"""Handle quote request with circuit breaker protection."""
symbol = message.data.get("symbol")
with self.tracer.start_as_current_span("get_quote") as span:
span.set_attribute("symbol", symbol)
try:
# Fetch quote with circuit breaker protection
quote = self.circuit_breaker.call(
self._fetch_quote,
symbol
)
# Publish quote update event
self.publish_event(
event="market_data.quote.updated",
data={
"symbol": symbol,
"ask_price": quote.ask_price,
"bid_price": quote.bid_price,
"last_price": quote.ask_price,
"timestamp": quote.timestamp.isoformat()
},
trace_id=message.trace_id
)
except Exception as e:
logger.error(f"Failed to fetch quote for {symbol}: {e}")
self.publish_event(
event="market_data.quote.error",
data={"symbol": symbol, "error": str(e)},
trace_id=message.trace_id
)
Historical Bars with Timeframe Parsing¶
def _parse_timeframe(self, timeframe_str: str) -> TimeFrame:
"""Parse timeframe string to Alpaca TimeFrame enum."""
timeframe_map = {
"1Min": TimeFrame.Minute,
"5Min": TimeFrame(5, "Min"),
"1Hour": TimeFrame.Hour,
"1Day": TimeFrame.Day
}
timeframe = timeframe_map.get(timeframe_str)
if timeframe is None:
raise ValueError(f"Invalid timeframe: {timeframe_str}")
return timeframe
def handle_market_data_get_bars(self, message: Message):
"""Handle bars request."""
symbol = message.data.get("symbol")
timeframe_str = message.data.get("timeframe", "1Day")
limit = message.data.get("limit", 100)
timeframe = self._parse_timeframe(timeframe_str)
# Fetch bars with circuit breaker protection
bars = self.circuit_breaker.call(
self._fetch_bars,
symbol,
timeframe,
limit
)
# Convert bars to list of dicts
bars_data = []
for bar in bars:
bars_data.append({
"timestamp": bar.timestamp.isoformat(),
"open": bar.open,
"high": bar.high,
"low": bar.low,
"close": bar.close,
"volume": bar.volume
})
# Publish bars received event
self.publish_event(
event="market_data.bars.received",
data={
"symbol": symbol,
"timeframe": timeframe_str,
"bars": bars_data,
"count": len(bars_data)
},
trace_id=message.trace_id
)
Market Status Check¶
def _is_market_open(self) -> bool:
"""
Check if US stock market is currently open.
Market hours: 9:30 AM - 4:00 PM ET, Monday-Friday
This is a simplified check (doesn't account for holidays).
"""
now = datetime.now()
# Check if weekday (0=Monday, 6=Sunday)
if now.weekday() >= 5: # Saturday or Sunday
return False
# Check time (simplified, assumes ET timezone)
current_time = now.time()
market_open = dt_time(9, 30) # 9:30 AM
market_close = dt_time(16, 0) # 4:00 PM
return market_open <= current_time <= market_close
Circuit Breaker Protection¶
The Market Data Agent uses a circuit breaker to protect against API failures:
Configuration: - Failure threshold: 50% failures in window - Minimum requests: 5 requests before opening circuit - Recovery timeout: 60 seconds - Half-open max calls: 1 call to test recovery
States:
Example:
# Circuit breaker created at initialization
self.circuit_breaker = create_alpaca_paper_breaker() if paper else create_alpaca_live_breaker()
# All API calls protected
quote = self.circuit_breaker.call(
self._fetch_quote,
symbol
)
When the circuit is OPEN: - API calls are blocked immediately - Returns cached data or raises exception - Prevents cascading failures - Auto-recovery after timeout
Usage Examples¶
Request Quote¶
# Publish request for AAPL quote
market_data_agent.publish_event(
event="market_data.get_quote",
data={"symbol": "AAPL"}
)
# Agent will publish market_data.quote.updated event
# with current quote data
Request Historical Bars¶
# Request 50 daily bars for NVDA
market_data_agent.publish_event(
event="market_data.get_bars",
data={
"symbol": "NVDA",
"timeframe": "1Day",
"limit": 50
}
)
# Agent will publish market_data.bars.received event
# with OHLCV data for 50 days
Check Market Status¶
# Check if market is open
market_data_agent.publish_event(
event="market_data.check_status",
data={}
)
# Agent will publish market_data.status.updated event
# with is_open: true/false
Initialization¶
from trading_agents import MarketDataAgent
# Initialize with Alpaca Paper credentials
agent = MarketDataAgent(
api_key=None, # If None, loads from .env
secret_key=None, # If None, loads from .env
paper=True, # Use paper trading account
redis_host="localhost",
redis_port=6379
)
# Start agent (subscribes to events, starts message bus)
agent.start()
# Agent is now listening for market data requests
Performance Characteristics¶
Quote Latency¶
- Average: 50-100ms (Alpaca API)
- Circuit breaker overhead: <1ms
- Event publishing: <5ms
- Total: ~60-110ms from request to quote.updated event
Bars Latency¶
- Average: 200-500ms (depends on number of bars)
- 50 bars (1 day timeframe): ~250ms
- 100 bars (1 hour timeframe): ~400ms
Rate Limits¶
- Alpaca Paper: 200 requests/minute
- Alpaca Live: 200 requests/minute
- Circuit breaker prevents exceeding limits
Error Handling¶
Common Errors¶
1. Invalid Symbol
{
"event": "market_data.quote.error",
"data": {
"symbol": "INVALID",
"error": "Symbol not found"
}
}
2. API Rate Limit
{
"event": "market_data.quote.error",
"data": {
"symbol": "AAPL",
"error": "API rate limit exceeded"
}
}
3. Market Closed
{
"event": "market_data.quote.updated",
"data": {
"symbol": "AAPL",
"ask_price": 175.50,
"bid_price": 175.48,
"last_price": 175.50,
"timestamp": "2025-12-20T16:05:00"
}
}
Integration with Other Agents¶
Strategy Agent Integration¶
# Strategy requests quote for evaluation
strategy_agent.publish_event(
event="market_data.get_quote",
data={"symbol": "AAPL"}
)
# Market Data Agent responds
# market_data.quote.updated → Strategy Agent evaluates setup
Position Management Integration¶
# Position Management monitors positions
position_agent.subscribe_to("market_data.quote.updated")
# On each quote update:
# - Calculate unrealized P&L
# - Check for stop/target hits
# - Update trailing stops
Distributed Tracing¶
All market data operations are traced with OpenTelemetry:
market_data.get_quote [trace_id: abc123]
├─ get_quote (span)
│ ├─ symbol: AAPL
│ ├─ circuit_breaker.call
│ └─ alpaca.get_stock_latest_quote
└─ publish_event (market_data.quote.updated)
Trace Attributes:
- symbol - Stock symbol
- timeframe - For bars requests
- limit - Number of bars requested
- error - If operation failed
- circuit_breaker.state - CLOSED, OPEN, or HALF_OPEN
Monitoring¶
Key Metrics to Monitor¶
- Quote Request Rate: Requests per minute
- Circuit Breaker State: CLOSED (healthy) vs OPEN (degraded)
- Error Rate: % of failed requests
- Latency: P50, P95, P99 response times
- Cache Hit Rate: For repeated symbol requests
Health Check¶
# Check if agent is running and healthy
if market_data_agent.running:
print("Market Data Agent: RUNNING")
# Check circuit breaker state
if market_data_agent.circuit_breaker.state == "CLOSED":
print("Circuit Breaker: HEALTHY")
else:
print(f"Circuit Breaker: {market_data_agent.circuit_breaker.state}")
Best Practices¶
1. Cache Quote Requests¶
Don't request the same symbol repeatedly within 1 second. Cache quotes locally.
# Bad: Request quote every 100ms
for i in range(10):
market_data_agent.publish_event(
event="market_data.get_quote",
data={"symbol": "AAPL"}
)
time.sleep(0.1)
# Good: Request once, cache result
market_data_agent.publish_event(
event="market_data.get_quote",
data={"symbol": "AAPL"}
)
# Store in cache, reuse for 1 second
2. Check Market Status First¶
Before requesting quotes, check if market is open.
# Check market status
market_data_agent.publish_event(
event="market_data.check_status",
data={}
)
# Wait for response, then request quotes
# Avoids stale data from market closed
3. Use Appropriate Timeframes¶
Match timeframe to your strategy needs: - 1Min: High-frequency scalping (rarely needed) - 5Min: Intraday momentum trading - 1Hour: Swing trading setups - 1Day: Position trading (most common)
4. Limit Bars Requests¶
Request only the bars you need (typically 20-50 for MA calculations).
# Good: Request 50 bars for MA20 calculation
market_data_agent.publish_event(
event="market_data.get_bars",
data={
"symbol": "AAPL",
"timeframe": "1Day",
"limit": 50 # Enough for MA20 + some buffer
}
)
# Bad: Request 1000 bars (slow, wasteful)
market_data_agent.publish_event(
event="market_data.get_bars",
data={
"symbol": "AAPL",
"timeframe": "1Day",
"limit": 1000 # Too many!
}
)
Troubleshooting¶
Problem: No quote.updated events received¶
Solution: Check if agent is subscribed to market_data.get_quote events.
Problem: Circuit breaker stuck in OPEN state¶
Solution: Wait 60 seconds for auto-recovery or manually reset circuit breaker.
Problem: Stale quote data¶
Solution: Check market status. Quotes are only updated during market hours.
Problem: High latency (>500ms for quotes)¶
Solution: Check Alpaca API status, circuit breaker may be degraded.
See Also¶
- Strategy Agent - Uses market data for entry signals
- Risk Management Agent - Validates trades with live prices
- Position Management Agent - Monitors positions with quotes
- Order Execution Agent - Places orders using market data
- Trading Agents Overview - Architecture and event flow
- Trading Platform Architecture - System design
References¶
Last Updated: January 10, 2026 Platform: Alpaca API | Redis Pub/Sub | Circuit Breaker Protection