Order Execution Agent¶
Order submission and lifecycle tracking with state machine management
January 2026 Status
ACTIVE - Order submission and lifecycle tracking via Alpaca API
| Feature | Status | Details |
|---|---|---|
| MARKET Orders | ✅ Active | Immediate execution |
| LIMIT Orders | ✅ Active | Price-controlled entries |
| STOP Orders | ✅ Active | GTC for stop losses |
| Circuit Breaker | ✅ Active | 60s recovery timeout |
| Rate Limiting | ✅ Active | 200 req/min quota |
| Error Recovery | ✅ Active | Retry with backoff |
Overview¶
The Order Execution Agent is responsible for submitting orders to the Alpaca broker, tracking their lifecycle from submission to fill/cancellation, and managing order state machines with comprehensive error handling and monitoring.
Key Features: - Submit MARKET, LIMIT, and STOP orders with validation - Track order status (submitted, filled, rejected, cancelled) - Handle partial fills with automatic state transitions - Order state machine for lifecycle tracking - Circuit breaker protection with configurable timeouts - Rate limiting and API quota management - Comprehensive monitoring and metrics collection
Architecture¶
graph TD
PM[Position Management] -->|order.submit| REDIS[Redis Bus]
REDIS -->|Request| OE[Order Execution Agent]
OE --> CB{Circuit Breaker}
CB -->|Open| RETRY[Retry Logic]
CB -->|Closed| ALPACA[Alpaca API]
OE --> OSM[Order State Machine]
OSM --> PENDING[PENDING_SUBMISSION]
PENDING --> SUBMITTED[SUBMITTED]
SUBMITTED --> ACCEPTED[ACCEPTED]
ACCEPTED --> FILLED[FILLED]
ACCEPTED --> PARTIAL[PARTIALLY_FILLED]
ACCEPTED --> REJECTED[REJECTED]
ACCEPTED --> CANCELLED[CANCELLED]
PARTIAL --> FILLED
OE --> METRICS[Monitoring]
METRICS --> LATENCY[Order Latency]
METRICS --> FILLS[Fill Rates]
METRICS --> REJECTIONS[Rejection Rates]
OE -->|order.filled| REDIS
OE -->|order.rejected| REDIS
REDIS -->|Fill Data| PM
style OE fill:#e8f5e9
style ALPACA fill:#fff3e0
style CB fill:#ffebee
style OSM fill:#f3e5f5
Redis Configuration¶
Connection Settings¶
# Environment Configuration
REDIS_HOST: "172.200.3.164"
REDIS_PORT: 6379
REDIS_DB: 0
REDIS_PASSWORD: "***" # From environment variable
CONNECTION_POOL_SIZE: 10
MAX_CONNECTIONS: 20
Channel Configuration¶
REDIS_CHANNELS = {
"ORDER_COMMANDS": "trading:orders:commands",
"ORDER_EVENTS": "trading:orders:events",
"ORDER_STATUS": "trading:orders:status",
"HEARTBEAT": "trading:orders:heartbeat"
}
# Message routing
ROUTING_CONFIG = {
"order.submit": "trading:orders:commands",
"order.cancel": "trading:orders:commands",
"order.status": "trading:orders:commands",
"order.filled": "trading:orders:events",
"order.rejected": "trading:orders:events",
"order.cancelled": "trading:orders:events"
}
Responsibilities¶
Primary Functions¶
- Order Submission: Submit orders to broker with validation and error handling
- Order Tracking: Monitor order status from submission to completion
- Fill Handling: Process fills and partial fills with state management
- State Management: Manage order state machines with transition validation
- Circuit Breaker: Protect against API failures with configurable recovery
- Rate Limiting: Manage API quotas and prevent throttling
- Monitoring: Track metrics and performance indicators
Events Published¶
order.submitted¶
Published when an order is sent to the broker.
Data:
{
"order_id": "abc123",
"symbol": "AAPL",
"side": "BUY",
"quantity": 10,
"order_type": "LIMIT",
"limit_price": 175.50,
"status": "ACCEPTED",
"timestamp": "2026-01-07T10:30:00Z",
"trace_id": "trace-456"
}
order.filled¶
Published when an order is fully executed.
Data:
{
"order_id": "abc123",
"symbol": "AAPL",
"filled_qty": 10,
"fill_price": 175.48,
"side": "BUY",
"commission": 0.00,
"timestamp": "2026-01-07T10:30:15Z",
"execution_time_ms": 150
}
order.partially_filled¶
Published when an order is partially executed.
Data:
{
"order_id": "abc123",
"symbol": "AAPL",
"filled_qty": 5,
"remaining_qty": 5,
"fill_price": 175.48,
"cumulative_qty": 5,
"avg_fill_price": 175.48
}
order.rejected¶
Published when broker rejects an order.
Data:
{
"order_id": "abc123",
"symbol": "AAPL",
"rejection_reason": "insufficient_buying_power",
"error_code": "40110000",
"error_message": "Insufficient buying power",
"timestamp": "2026-01-07T10:30:02Z"
}
order.cancelled¶
Published when an order is cancelled.
Data:
{
"order_id": "abc123",
"symbol": "AAPL",
"reason": "User requested",
"cancelled_qty": 5,
"timestamp": "2026-01-07T10:35:00Z"
}
Events Subscribed¶
order.submit¶
Request to place a new order.
Data:
{
"symbol": "AAPL",
"side": "BUY",
"quantity": 10,
"order_type": "LIMIT",
"limit_price": 175.50,
"time_in_force": "DAY",
"client_order_id": "pos_mgmt_001",
"trace_id": "trace-789"
}
Order Types:
- MARKET: Execute immediately at best price (slippage risk)
- LIMIT: Execute at specified price or better (execution risk)
- STOP: Execute when price reaches stop price (becomes market order)
Time In Force:
- DAY: Good for the trading day (expires at market close)
- GTC: Good till cancelled (persists across days)
- IOC: Immediate or cancel (fill available quantity immediately)
- FOK: Fill or kill (fill entire order immediately or cancel)
Broker-Specific Implementations: - Alpaca: Supports all TIF types, auto-cancels DAY orders at 4:00 PM ET - Market Hours: DAY orders auto-expire, GTC orders persist through weekends - Extended Hours: Only limit orders allowed, reduced liquidity
order.cancel¶
Request to cancel a pending order.
Data:
order.status¶
Request for order status check.
Data:
Implementation Details¶
Order Submission with Error Handling¶
import asyncio
import logging
from typing import Optional
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
async def handle_order_submit(self, message: Message):
"""Handle order submission request with comprehensive error handling."""
try:
# Extract and validate order data
symbol = message.data.get("symbol")
side = message.data.get("side", "BUY").upper()
quantity = message.data.get("quantity")
order_type = message.data.get("order_type", "MARKET").upper()
limit_price = message.data.get("limit_price")
trace_id = message.data.get("trace_id")
# Validate required fields
if not all([symbol, quantity]):
raise ValueError("Missing required fields: symbol, quantity")
# Validate market hours for market orders
if order_type == "MARKET" and not self._is_market_open():
raise ValueError("Market orders not allowed when market is closed")
# Create order state machine
order_sm = OrderStateMachine(
symbol=symbol,
side=side,
order_type=order_type,
quantity=quantity,
limit_price=limit_price
)
# Check rate limits
await self._check_rate_limits()
# Submit order to broker with circuit breaker protection
start_time = datetime.utcnow()
try:
broker_order = await self.circuit_breaker.call(
self._submit_order_to_broker,
symbol=symbol,
side=side,
quantity=quantity,
order_type=order_type,
limit_price=limit_price,
time_in_force=message.data.get("time_in_force", "DAY")
)
execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000
self.metrics.record_order_latency(execution_time)
except CircuitBreakerError as e:
logger.error(f"Circuit breaker open for order submission: {e}")
self._handle_submission_failure(order_sm, "circuit_breaker_open", trace_id)
return
except Exception as e:
logger.error(f"Order submission failed: {e}")
self._handle_submission_failure(order_sm, str(e), trace_id)
return
# Update state machine
order_sm.submit(order_id=broker_order.id)
order_sm.accept() # Alpaca auto-accepts orders
# Store order state machine
self.orders[broker_order.id] = order_sm
# Publish order submitted event
await self.publish_event(
event="order.submitted",
data={
"order_id": broker_order.id,
"symbol": symbol,
"side": side,
"quantity": quantity,
"order_type": order_type,
"status": "ACCEPTED",
"timestamp": datetime.utcnow().isoformat(),
"trace_id": trace_id
}
)
# Update metrics
self.metrics.increment_orders_submitted()
# Check order status (may already be filled)
await self._check_order_status(broker_order.id, trace_id)
except Exception as e:
logger.error(f"Unexpected error in order submission: {e}")
await self._publish_error_event("order_submission_error", str(e), trace_id)
Order Status Checking with Retry Logic¶
async def _check_order_status(self, order_id: str, trace_id: Optional[str] = None, max_retries: int = 3):
"""Check order status from broker with retry logic."""
retry_count = 0
while retry_count < max_retries:
try:
# Get order from broker with circuit breaker protection
broker_order = await self.circuit_breaker.call(
self.trading_client.get_order_by_id,
order_id
)
# Get our state machine (if tracking)
order_sm = self.orders.get(order_id)
if not order_sm:
logger.warning(f"No state machine found for order {order_id}")
return
# Map Alpaca status to our state machine
status = broker_order.status
if status == AlpacaOrderStatus.FILLED:
await self._handle_order_filled(broker_order, order_sm, trace_id)
elif status == AlpacaOrderStatus.PARTIALLY_FILLED:
await self._handle_partial_fill(broker_order, order_sm, trace_id)
elif status == AlpacaOrderStatus.REJECTED:
await self._handle_order_rejected(broker_order, order_sm, trace_id)
elif status == AlpacaOrderStatus.CANCELLED:
await self._handle_order_cancelled(broker_order, order_sm, trace_id)
# Successfully processed
break
except CircuitBreakerError:
logger.warning(f"Circuit breaker open, skipping status check for {order_id}")
break
except Exception as e:
retry_count += 1
if retry_count >= max_retries:
logger.error(f"Failed to check order status after {max_retries} retries: {e}")
break
else:
# Exponential backoff
await asyncio.sleep(2 ** retry_count)
logger.warning(f"Retry {retry_count}/{max_retries} for order status check: {e}")
async def _handle_order_filled(self, broker_order, order_sm, trace_id):
"""Handle fully filled order."""
if not order_sm.is_filled():
filled_qty = int(broker_order.filled_qty)
fill_price = float(broker_order.filled_avg_price)
order_sm.fill(filled_qty=filled_qty, fill_price=fill_price)
# Publish filled event
await self.publish_event(
event="order.filled",
data={
"order_id": broker_order.id,
"symbol": broker_order.symbol,
"filled_qty": filled_qty,
"fill_price": fill_price,
"side": broker_order.side.value,
"commission": float(broker_order.commission) if broker_order.commission else 0.00,
"timestamp": datetime.utcnow().isoformat(),
"execution_time_ms": getattr(broker_order, 'execution_time_ms', 0)
},
trace_id=trace_id
)
# Update metrics
self.metrics.increment_orders_filled()
self.metrics.record_fill_rate(broker_order.symbol, 1.0)
Rate Limiting and API Quota Management¶
class RateLimiter:
def __init__(self, requests_per_minute: int = 200):
self.requests_per_minute = requests_per_minute
self.requests = []
self.lock = asyncio.Lock()
async def check_rate_limit(self):
"""Check if we can make a request without exceeding rate limits."""
async with self.lock:
now = datetime.utcnow()
# Remove requests older than 1 minute
self.requests = [req_time for req_time in self.requests
if now - req_time < timedelta(minutes=1)]
if len(self.requests) >= self.requests_per_minute:
# Calculate wait time
oldest_request = min(self.requests)
wait_time = 60 - (now - oldest_request).total_seconds()
if wait_time > 0:
logger.warning(f"Rate limit reached, waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
self.requests.append(now)
async def _check_rate_limits(self):
"""Ensure we don't exceed API rate limits."""
await self.rate_limiter.check_rate_limit()
Market Hours and Order Validation¶
def _is_market_open(self) -> bool:
"""Check if market is currently open."""
try:
clock = self.trading_client.get_clock()
return clock.is_open
except Exception as e:
logger.error(f"Failed to check market status: {e}")
# Default to closed for safety
return False
def _validate_order_request(self, order_data: dict) -> tuple[bool, str]:
"""Validate order request before submission."""
symbol = order_data.get("symbol")
quantity = order_data.get("quantity")
order_type = order_data.get("order_type")
limit_price = order_data.get("limit_price")
# Basic validation
if not symbol:
return False, "Symbol is required"
if not quantity or quantity <= 0:
return False, "Quantity must be positive"
if order_type not in ["MARKET", "LIMIT", "STOP"]:
return False, "Invalid order type"
if order_type in ["LIMIT", "STOP"] and not limit_price:
return False, f"{order_type} order requires price"
# Market hours validation
if order_type == "MARKET" and not self._is_market_open():
return False, "Market orders not allowed when market is closed"
# Check if symbol is tradeable
try:
asset = self.trading_client.get_asset(symbol)
if not asset.tradable:
return False, f"Asset {symbol} is not tradeable"
except Exception as e:
logger.warning(f"Could not validate asset {symbol}: {e}")
return True, "Valid"
Order State Machine¶
PENDING_SUBMISSION
└→ SUBMITTED (sent to broker)
└→ ACCEPTED (broker acknowledged)
├→ FILLED (fully executed)
├→ PARTIALLY_FILLED (partial execution)
│ └→ FILLED (completed)
├→ REJECTED (broker rejected)
└→ CANCELLED (user cancelled)
State Transitions:
- submit(): PENDING_SUBMISSION → SUBMITTED
- accept(): SUBMITTED → ACCEPTED
- fill(): ACCEPTED → FILLED
- partially_fill(): ACCEPTED → PARTIALLY_FILLED
- reject(): ACCEPTED → REJECTED
- cancel(): ACCEPTED/PARTIALLY_FILLED → CANCELLED
State Validation:
class OrderStateMachine:
def __init__(self, symbol: str, side: str, order_type: str, quantity: int, limit_price: float = None):
self.state = OrderState.PENDING_SUBMISSION
self.symbol = symbol
self.side = side
self.order_type = order_type
self.quantity = quantity
self.limit_price = limit_price
self.filled_qty = 0
self.avg_fill_price = 0.0
self.transitions = []
def submit(self, order_id: str):
if self.state != OrderState.PENDING_SUBMISSION:
raise InvalidStateTransition(f"Cannot submit from state {self.state}")
self.state = OrderState.SUBMITTED
self.order_id = order_id
self.transitions.append(("SUBMIT", datetime.utcnow()))
def fill(self, filled_qty: int, fill_price: float):
if self.state not in [OrderState.ACCEPTED, OrderState.PARTIALLY_FILLED]:
raise InvalidStateTransition(f"Cannot fill from state {self.state}")
self.state = OrderState.FILLED
self.filled_qty = filled_qty
self.avg_fill_price = fill_price
self.transitions.append(("FILL", datetime.utcnow()))
Circuit Breaker Protection¶
The circuit breaker protects against API failures with configurable timeout values and retry logic:
class CircuitBreakerConfig:
FAILURE_THRESHOLD = 0.5 # 50% failure rate triggers open
RECOVERY_TIMEOUT = 60 # 60 seconds in open state
EXPECTED_EXCEPTION = (
AlpacaAPIError,
ConnectionError,
TimeoutError,
HTTPException
)
# Create circuit breakers for different environments
def create_alpaca_circuit_breaker(environment: str):
if environment == "paper":
return CircuitBreaker(
failure_threshold=5, # More lenient for paper trading
recovery_timeout=30, # Shorter recovery for testing
expected_exception=CircuitBreakerConfig.EXPECTED_EXCEPTION
)
else: # live trading
return CircuitBreaker(
failure_threshold=3, # Stricter for live trading
recovery_timeout=60, # Standard recovery timeout
expected_exception=CircuitBreakerConfig.EXPECTED_EXCEPTION
)
# Usage in order submission
async def _submit_order_to_broker(self, **order_params):
"""Submit order with circuit breaker protection."""
try:
order_request = self._create_order_request(**order_params)
# Protected API call
result = await self.circuit_breaker.call(
self.trading_client.submit_order,
order_request
)
return result
except CircuitBreakerOpenError:
logger.error("Circuit breaker open - API temporarily unavailable")
self.metrics.increment_circuit_breaker_opens()
raise
except Exception as e:
logger.error(f"Order submission failed: {e}")
self.metrics.increment_order_failures()
raise
# Circuit breaker state monitoring
def _monitor_circuit_breaker_state(self):
"""Monitor and log circuit breaker state changes."""
if self.circuit_breaker.current_state == "open":
self.metrics.increment_circuit_breaker_opens()
logger.warning(
f"Circuit breaker opened due to failures. "
f"Recovery timeout: {self.circuit_breaker.recovery_timeout}s"
)
elif self.circuit_breaker.current_state == "half_open":
logger.info("Circuit breaker half-open - testing recovery")
elif self.circuit_breaker.current_state == "closed":
logger.info("Circuit breaker closed - normal operation")
Failure Handling Strategy: - Open State: All requests fail fast, no API calls made - Half-Open State: Single test call to check API recovery - Closed State: Normal operation with failure monitoring - Retry Logic: Exponential backoff for transient failures - Fallback: Graceful degradation for non-critical operations
Monitoring and Metrics¶
class OrderMetrics:
def __init__(self):
self.orders_submitted = 0
self.orders_filled = 0
self.orders_rejected = 0
self.orders_cancelled = 0
self.latency_measurements = []
self.fill_rates = {}
def record_order_latency(self, latency_ms: float):
"""Record order execution latency."""
self.latency_measurements.append({
"latency_ms": latency_ms,
"timestamp": datetime.utcnow()
})
# Keep only last 1000 measurements
if len(self.latency_measurements) > 1000:
self.latency_measurements = self.latency_measurements[-1000:]
def record_fill_rate(self, symbol: str, fill_rate: float):
"""Record fill rate by symbol."""
if symbol not in self.fill_rates:
self.fill_rates[symbol] = []
self.fill_rates[symbol].append({
"fill_rate": fill_rate,
"timestamp": datetime.utcnow()
})
def get_metrics_summary(self) -> dict:
"""Get comprehensive metrics summary."""
avg_latency = 0
if self.latency_measurements:
avg_latency = sum(m["latency_ms"] for m in self.latency_measurements) / len(self.latency_measurements)
total_orders = self.orders_submitted
success_rate = (self.orders_filled / total_orders) if total_orders > 0 else 0
return {
"orders_submitted": self.orders_submitted,
"orders_filled": self.orders_filled,
"orders_rejected": self.orders_rejected,
"orders_cancelled": self.orders_cancelled,
"success_rate": success_rate,
"avg_latency_ms": avg_latency,
"circuit_breaker_state": self.circuit_breaker.current_state
}
Configuration Examples¶
Development Environment¶
# config/development.yml
order_execution:
alpaca:
api_key: "${ALPACA_PAPER_API_KEY}"
secret_key: "${ALPACA_PAPER_SECRET_KEY}"
base_url: "https://paper-api.alpaca.markets"
circuit_breaker:
failure_threshold: 5
recovery_timeout: 30
expected_exceptions: ["AlpacaAPIError", "ConnectionError"]
rate_limiting:
requests_per_minute: 200
burst_limit: 10
redis:
host: "172.200.3.164"
port: 6379
db: 0
channels:
commands: "trading:orders:commands:dev"
events: "trading:orders:events:dev"
Production Environment¶
# config/production.yml
order_execution:
alpaca:
api_key: "${ALPACA_LIVE_API_KEY}"
secret_key: "${ALPACA_LIVE_SECRET_KEY}"
base_url: "https://api.alpaca.markets"
circuit_breaker:
failure_threshold: 3
recovery_timeout: 60
max_failures: 10
rate_limiting:
requests_per_minute: 180 # Conservative for live trading
burst_limit: 5
redis:
host: "172.200.3.164"
port: 6379
db: 1 # Separate DB for production
channels:
commands: "trading:orders:commands:prod"
events: "trading:orders:events:prod"
monitoring:
metrics_interval: 60
alert_thresholds:
rejection_rate: 0.1
avg_latency_ms: 1000
Order Types¶
MARKET Orders¶
Execute immediately at best available price.
order_request = MarketOrderRequest(
symbol="AAPL",
qty=10,
side=OrderSide.BUY,
time_in_force=TimeInForce.DAY
)
Pros: - Guaranteed execution (if market is open) - Fast (executes immediately)
Cons: - No price control - Slippage on low liquidity stocks
Use Case: Exit positions quickly, high liquidity stocks
LIMIT Orders¶
Execute at specified price or better.
order_request = LimitOrderRequest(
symbol="AAPL",
qty=10,
side=OrderSide.BUY,
time_in_force=TimeInForce.DAY,
limit_price=175.50
)
Pros: - Price control - Better fills on limit breaches
Cons: - May not execute - Requires price monitoring
Use Case: Entry orders, precise price targeting
STOP Orders¶
Execute when price reaches stop price.
order_request = StopOrderRequest(
symbol="AAPL",
qty=10,
side=OrderSide.SELL,
time_in_force=TimeInForce.GTC,
stop_price=166.70
)
Pros: - Automatic exit on adverse moves - Good till cancelled (GTC)
Cons: - Becomes market order when triggered - Slippage on fast moves
Use Case: Stop losses (used by Position Management Agent)
Usage Examples¶
Submit Market Order with Error Handling¶
# Request market order with validation
try:
await order_agent.publish_event(
event="order.submit",
data={
"symbol": "AAPL",
"side": "BUY",
"quantity": 10,
"order_type": "MARKET",
"time_in_force": "DAY",
"trace_id": "trade-001"
}
)
# Agent will:
# 1. Validate market hours and order parameters
# 2. Check rate limits
# 3. Submit to broker with circuit breaker protection
# 4. Publish order.submitted
# 5. Monitor status and publish order.filled
except ValidationError as e:
logger.error(f"Invalid order request: {e}")
except RateLimitError as e:
logger.warning(f"Rate limit exceeded: {e}")
Submit Limit Order with Monitoring¶
# Request limit order at $175.50
order_data = {
"symbol": "AAPL",
"side": "BUY",
"quantity": 10,
"order_type": "LIMIT",
"limit_price": 175.50,
"time_in_force": "DAY",
"client_order_id": "entry_001",
"trace_id": "strategy-abc"
}
# Validate before submission
is_valid, error_msg = order_agent._validate_order_request(order_data)
if not is_valid:
logger.error(f"Order validation failed: {error_msg}")
return
await order_agent.publish_event(
event="order.submit",
data=order_data
)
# Monitor for events
@order_agent.event_handler("order.filled")
async def handle_fill(event_data):
logger.info(f"Order filled: {event_data['order_id']} at {event_data['fill_price']}")
Cancel Order with Status Check¶
# Cancel pending order
await order_agent.publish_event(
event="order.cancel",
data={
"order_id": "abc123",
"reason": "strategy_exit",
"trace_id": "cancel-001"
}
)
# Verify cancellation
await order_agent.publish_event(
event="order.status",
data={
"order_id": "abc123",
"include_fills": True
}
)
Error Handling¶
Common Errors and Solutions¶
1. Insufficient Funds
Error: "insufficient_buying_power"
Code: 40110000
# Solution: Check buying power before order submission
try:
account = trading_client.get_account()
available_cash = float(account.cash)
order_value = quantity * limit_price
if order_value > available_cash:
raise InsufficientFundsError(f"Order value {order_value} exceeds available cash {available_cash}")
except AlpacaAPIError as e:
if e.code == "40110000":
# Handle insufficient funds
await self._handle_insufficient_funds_error(order_data)
2. Invalid Symbol ```python Error: "asset_not_found" Code: 40410000
Solution: Validate symbol before submission¶
try: asset = trading_client.get_asset(symbol) if not asset.tradable: raise AssetNotTradableError(f"Symbol {symbol} is not tradable") except AssetNotFoundError: logger.error(f"Invalid symbol: {symbol}") await self._publish_error_event("invalid_symbol", symbol)