Position Management Agent¶
Position tracking, stop losses, P&L calculation, and automatic profit-taking
January 2026 Status
ACTIVE - Managing ~20 positions with ATR-based dynamic stops
| Feature | Status | Details |
|---|---|---|
| Position Tracking | ✅ Active | Long and short positions |
| ATR-Based Stops | ✅ Active | 15-24% micro-cap, 8-15% large-cap |
| Trailing Stops | ✅ Active | Lock in profits as price rises |
| Profit-Taking | ✅ Active | 50% at +20%, remaining at +50% |
| Pre-Earnings Reduction | ✅ Active | 50% reduction 7 days before |
Overview¶
The Position Management Agent tracks positions, manages stop losses, calculates P&L, and implements automatic trailing stops and profit-taking. The agent interfaces with Alpaca's current v2 API and integrates with our Redis-based message bus for real-time coordination.
Responsibilities: - Track all open positions (long and short) - Set and update stop loss orders with error handling - Calculate unrealized P&L with market hours awareness - Monitor for stop/target hits with rate limiting - Implement trailing stops for profitable positions - Execute automatic profit-taking at +20% and +50% - Reduce positions before earnings (using Alpha Vantage data) - Handle after-hours and pre-market scenarios
Position Data Structure¶
@dataclass
class Position:
symbol: str
quantity: int # Positive for long, negative for short
entry_price: float
stop_price: float
target_price: float
stop_order_id: Optional[str]
current_price: Optional[float]
unrealized_pnl: float
unrealized_pnl_pct: float
high_water_mark: Optional[float] # Highest price since entry
profit_taken_20: bool # 20% profit already taken?
profit_taken_50: bool # 50% profit already taken?
original_quantity: Optional[int] # Original size before profit-taking
last_updated: datetime # Last price update timestamp
market_session: str # REGULAR, PRE, POST
Event Architecture¶
Events Published¶
| Event | Description | Data |
|---|---|---|
position.opened |
New position tracked | symbol, quantity, entry_price, stop_price |
position.stop_set |
Stop loss order placed | symbol, stop_price, stop_order_id |
position.stop_hit |
Stop loss triggered | symbol, current_price, pnl |
position.target_hit |
Profit target reached | symbol, current_price, pnl |
position.pnl_updated |
P&L recalculated | symbol, unrealized_pnl |
position.closed |
Position fully closed | symbol, exit_price, pnl |
position.trailing_stop_updated |
Trailing stop adjusted | symbol, old_stop, new_stop |
position.profit_taking |
Partial/full profit taken | symbol, trigger, shares_sold |
position.error |
Position management error | symbol, error_type, message |
Events Subscribed¶
| Event | Handler | Description |
|---|---|---|
risk.trade_approved |
handle_risk_trade_approved |
Place entry order |
order.filled |
handle_order_filled |
Track new position |
order.failed |
handle_order_failed |
Handle failed stop orders |
market_data.quote.updated |
handle_market_data_quote_updated |
Monitor positions |
market.session.changed |
handle_market_session_changed |
Adjust for market hours |
ATR-Based Dynamic Stops¶
Phase 5 Enhancement¶
Instead of fixed percentage stops, the agent calculates stops based on Average True Range (ATR) using 14-day historical data:
| Asset Type | Stop Method | Stop Range | Target R/R |
|---|---|---|---|
| Micro-cap | 2x ATR | 15-24% | 3:1 |
| Large-cap | 1.5x ATR | 8-15% | 2:1 |
ATR Calculation Implementation¶
def get_atr(self, symbol: str, period: int = 14) -> float:
"""Calculate Average True Range for the symbol."""
try:
# Fetch 15 days of bars (need 1 extra for previous close)
bars = self.trading_client.get_bars(
symbol_or_symbols=symbol,
timeframe=TimeFrame.Day,
start=datetime.now() - timedelta(days=20),
end=datetime.now()
).df
if len(bars) < period + 1:
self.logger.warning(f"Insufficient data for ATR calculation: {symbol}")
return None
# Calculate True Range for each day
bars['prev_close'] = bars['close'].shift(1)
bars['tr1'] = bars['high'] - bars['low']
bars['tr2'] = abs(bars['high'] - bars['prev_close'])
bars['tr3'] = abs(bars['low'] - bars['prev_close'])
bars['true_range'] = bars[['tr1', 'tr2', 'tr3']].max(axis=1)
# ATR = Simple moving average of True Range
atr = bars['true_range'].rolling(window=period).mean().iloc[-1]
# Cache the result for 60 minutes
self.redis_client.setex(
f"atr:{symbol}:{period}",
3600,
str(atr)
)
return atr
except Exception as e:
self.logger.error(f"Error calculating ATR for {symbol}: {e}")
return None
Stop Calculation with Error Handling¶
def calculate_atr_stop(self, symbol: str, entry_price: float):
"""Calculate ATR-based stop and target prices."""
try:
# Check cache first
cached_atr = self.redis_client.get(f"atr:{symbol}:14")
if cached_atr:
atr = float(cached_atr)
else:
atr = self.get_atr(symbol)
if atr is None:
# Fallback to fixed percentages
self.logger.warning(f"Using fallback stops for {symbol}")
if self.is_micro_cap(symbol):
stop_pct = 0.20 # 20% fallback
target_pct = 0.60 # 3:1 R/R
else:
stop_pct = 0.12 # 12% fallback
target_pct = 0.24 # 2:1 R/R
else:
# Calculate ATR-based percentages
if self.is_micro_cap(symbol):
stop_pct = max(0.15, min(0.24, 2 * atr / entry_price))
target_pct = 3 * stop_pct # 3:1 R/R
else:
stop_pct = max(0.08, min(0.15, 1.5 * atr / entry_price))
target_pct = 2 * stop_pct # 2:1 R/R
stop_price = entry_price * (1 - stop_pct)
target_price = entry_price * (1 + target_pct)
return stop_price, target_price
except Exception as e:
self.logger.error(f"Error calculating stops for {symbol}: {e}")
# Return conservative fallback
return entry_price * 0.85, entry_price * 1.30
Market Hours Handling¶
Session Awareness¶
The agent adjusts behavior based on market session:
| Session | Behavior | Stop Orders |
|---|---|---|
| PRE | Monitor only | No modifications |
| REGULAR | Full management | Active trailing |
| POST | Limited updates | Hold existing |
| CLOSED | Cache updates | No new orders |
Implementation¶
def handle_market_session_changed(self, session: str):
"""Adjust agent behavior for market session."""
self.current_session = session
if session == "REGULAR":
self.logger.info("Market open - resuming full position management")
self.active_management = True
elif session in ["PRE", "POST"]:
self.logger.info(f"Extended hours ({session}) - limited management")
self.active_management = False
else: # CLOSED
self.logger.info("Market closed - monitoring only")
self.active_management = False
def should_update_stops(self) -> bool:
"""Check if stop updates are allowed in current session."""
return self.current_session == "REGULAR" and self.active_management
Error Handling & Rate Limiting¶
Order Failure Handling¶
def handle_order_failed(self, event_data: dict):
"""Handle failed stop order placement."""
symbol = event_data.get('symbol')
error_msg = event_data.get('error', 'Unknown error')
self.logger.error(f"Stop order failed for {symbol}: {error_msg}")
# Retry logic for transient errors
if "insufficient buying power" not in error_msg.lower():
self.retry_stop_order(symbol, max_retries=3)
else:
# Alert for position sizing issues
self.publish_event('position.error', {
'symbol': symbol,
'error_type': 'INSUFFICIENT_FUNDS',
'message': error_msg
})
def retry_stop_order(self, symbol: str, max_retries: int = 3):
"""Retry failed stop order with exponential backoff."""
for attempt in range(max_retries):
try:
time.sleep(2 ** attempt) # Exponential backoff
position = self.positions.get(symbol)
if position and not position.stop_order_id:
self.place_stop_order(position)
break
except Exception as e:
self.logger.warning(f"Stop retry {attempt+1} failed for {symbol}: {e}")
API Rate Limiting¶
class RateLimitedAlpacaClient:
def __init__(self, trading_client):
self.client = trading_client
self.last_call = {}
self.min_interval = 0.2 # 200ms between API calls
def submit_order(self, request):
"""Rate-limited order submission."""
now = time.time()
if 'submit_order' in self.last_call:
elapsed = now - self.last_call['submit_order']
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_call['submit_order'] = time.time()
return self.client.submit_order(request)
Automatic Profit-Taking¶
Dynamic Profit Targets¶
Configuration-based profit targets (stored in environment):
| Trigger | Action | Default % | Shares Sold |
|---|---|---|---|
| First Target | Partial exit | 20% gain | 50% of position |
| Second Target | Full exit | 50% gain | Remaining shares |
Implementation with Error Handling¶
def check_profit_targets(self, position: Position):
"""Check and execute profit-taking targets."""
try:
gain_pct = position.unrealized_pnl_pct / 100
# Get configurable targets from environment
target_1_pct = float(os.getenv('PROFIT_TARGET_1', '0.20'))
target_2_pct = float(os.getenv('PROFIT_TARGET_2', '0.50'))
# First target: Partial exit
if gain_pct >= target_1_pct and not position.profit_taken_20:
sell_qty = position.original_quantity // 2
# Use market order during regular hours for better fills
order_type = "market" if self.current_session == "REGULAR" else "limit"
limit_price = position.current_price * 0.995 if order_type == "limit" else None
try:
order = self.submit_profit_taking_order(
position.symbol, sell_qty, order_type, limit_price
)
position.profit_taken_20 = True
self.publish_event('position.profit_taking', {
'symbol': position.symbol,
'trigger': f'{target_1_pct*100:.0f}%',
'shares_sold': sell_qty,
'order_id': order.id
})
except Exception as e:
self.logger.error(f"Failed to execute 20% profit taking for {position.symbol}: {e}")
return
# Second target: Full exit
if gain_pct >= target_2_pct and not position.profit_taken_50:
try:
# Cancel existing stop order first
if position.stop_order_id:
self.cancel_order(position.stop_order_id)
# Sell all remaining shares
remaining_qty = position.quantity
order = self.submit_profit_taking_order(
position.symbol, remaining_qty, "market"
)
position.profit_taken_50 = True
self.publish_event('position.profit_taking', {
'symbol': position.symbol,
'trigger': f'{target_2_pct*100:.0f}%',
'shares_sold': remaining_qty,
'order_id': order.id
})
except Exception as e:
self.logger.error(f"Failed to execute 50% profit taking for {position.symbol}: {e}")
except Exception as e:
self.logger.error(f"Error in profit target check for {position.symbol}: {e}")
Pre-Earnings Risk Reduction¶
Alpha Vantage Integration¶
Uses Alpha Vantage API for earnings calendar data:
def get_earnings_date(self, symbol: str) -> Optional[datetime]:
"""Get next earnings date from Alpha Vantage."""
try:
api_key = os.getenv('ALPHA_VANTAGE_API_KEY')
if not api_key:
self.logger.warning("Alpha Vantage API key not configured")
return None
# Check cache first
cached_date = self.redis_client.get(f"earnings:{symbol}")
if cached_date:
return datetime.fromisoformat(cached_date.decode())
url = f"https://www.alphavantage.co/query"
params = {
'function': 'EARNINGS_CALENDAR',
'symbol': symbol,
'apikey': api_key,
'horizon': '3month'
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
# Parse CSV response
earnings_data = response.text
if "reportDate" in earnings_data:
lines = earnings_data.strip().split('\n')
if len(lines) > 1: # Has data beyond header
next_earnings_str = lines[1].split(',')[1] # reportDate column
next_earnings = datetime.strptime(next_earnings_str, '%Y-%m-%d')
# Cache for 24 hours
self.redis_client.setex(
f"earnings:{symbol}",
86400,
next_earnings.isoformat()
)
return next_earnings
return None
except Exception as e:
self.logger.error(f"Error fetching earnings date for {symbol}: {e}")
return None
def check_pre_earnings_reduction(self):
"""Check and execute pre-earnings position reduction."""
for symbol, position in self.positions.items():
try:
earnings_date = self.get_earnings_date(symbol)
if not earnings_date:
continue
days_until = (earnings_date.date() - datetime.now().date()).days
if 0 <= days_until <= 7 and not position.earnings_reduced:
sell_qty = position.quantity // 2
if sell_qty > 0:
order = self.submit_earnings_reduction_order(symbol, sell_qty)
position.earnings_reduced = True
self.logger.info(f"Pre-earnings reduction: {symbol} - sold {sell_qty} shares")
self.publish_event('position.earnings_reduction', {
'symbol': symbol,
'days_until_earnings': days_until,
'shares_sold': sell_qty
})
except Exception as e:
self.logger.error(f"Error in pre-earnings check for {symbol}: {e}")
Configuration Management¶
Environment Variables¶
| Variable | Default | Description |
|---|---|---|
ALPACA_API_KEY |
Required | Alpaca API key |
ALPACA_SECRET_KEY |
Required | Alpaca secret key |
ALPACA_PAPER |
"true" | Use paper trading |
REDIS_HOST |
"localhost" | Redis message bus host |
REDIS_PORT |
"6379" | Redis port |
REDIS_PASSWORD |
None | Redis password |
ALPHA_VANTAGE_API_KEY |
Required | Earnings data API key |
PROFIT_TARGET_1 |
"0.20" | First profit target (20%) |
PROFIT_TARGET_2 |
"0.50" | Second profit target (50%) |
TRAIL_PERCENT_MICRO |
"0.09" | Micro-cap trail % (9%) |
TRAIL_PERCENT_LARGE |
"0.05" | Large-cap trail % (5%) |
MIN_STOP_ADJUSTMENT |
"0.10" | Minimum stop movement |
ATR_CACHE_TTL |
"3600" | ATR cache time (seconds) |
Configuration Loading¶
@dataclass
class PositionManagerConfig:
alpaca_api_key: str
alpaca_secret_key: str
paper_trading: bool = True
redis_host: str = "localhost"
redis_port: int = 6379
redis_password: Optional[str] = None
profit_target_1: float = 0.20
profit_target_2: float = 0.50
trail_percent_micro: float = 0.09
trail_percent_large: float = 0.05
min_stop_adjustment: float = 0.10
atr_cache_ttl: int = 3600
@classmethod
def from_env(cls):
return cls(
alpaca_api_key=os.getenv('ALPACA_API_KEY'),
alpaca_secret_key=os.getenv('ALPACA_SECRET_KEY'),
paper_trading=os.getenv('ALPACA_PAPER', 'true').lower() == 'true',
redis_host=os.getenv('REDIS_HOST', 'localhost'),
redis_port=int(os.getenv('REDIS_PORT', '6379')),
redis_password=os.getenv('REDIS_PASSWORD'),
profit_target_1=float(os.getenv('PROFIT_TARGET_1', '0.20')),
profit_target_2=float(os.getenv('PROFIT_TARGET_2', '0.50')),
trail_percent_micro=float(os.getenv('TRAIL_PERCENT_MICRO', '0.09')),
trail_percent_large=float(os.getenv('TRAIL_PERCENT_LARGE', '0.05')),
min_stop_adjustment=float(os.getenv('MIN_STOP_ADJUSTMENT', '0.10')),
atr_cache_ttl=int(os.getenv('ATR_CACHE_TTL', '3600'))
)
Monitoring and Alerting¶
Health Checks¶
def health_check(self) -> dict:
"""Return agent health status."""
try:
return {
'status': 'healthy',
'positions_count': len(self.positions),
'active_stops': sum(1 for p in self.positions.values() if p.stop_order_id),
'total_pnl': sum(p.unrealized_pnl for p in self.positions.values()),
'last_update': self.last_update_time.isoformat() if self.last_update_time else None,
'market_session': self.current_session,
'active_management': self.active_management,
'redis_connected': self.redis_client.ping(),
'alpaca_connected': self._test_alpaca_connection()
}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
def _test_alpaca_connection(self) -> bool:
"""Test Alpaca API connectivity."""
try:
account = self.trading_client.get_account()
return account is not None
except:
return False
Alert Conditions¶
def check_alert_conditions(self):
"""Check for conditions that require alerts."""
# Large unrealized loss
large_losses = [p for p in self.positions.values()
if p.unrealized_pnl_pct < -15]
if large_losses:
self.send_alert('LARGE_LOSSES',
f"{len(large_losses)} positions with >15% loss")
# Missing stop orders
unprotected = [p for p in self.positions.values()
if not p.stop_order_id and abs(p.unrealized_pnl_pct) > 5]
if unprotected:
self.send_alert('UNPROTECTED_POSITIONS',
f"{len(unprotected)} positions without stops")
# Stale data
if self.last_update_time and \
(datetime.now() - self.last_update_time).seconds > 300:
self.send_alert('STALE_DATA', 'No price updates for 5+ minutes')
Usage Examples¶
Initialize Agent with Configuration¶
from trading_agents.position_management_agent import PositionManagementAgent
from trading_agents.position_management_agent import PositionManagerConfig
# Load configuration from environment
config = PositionManagerConfig.from_env()
# Initialize agent
agent = PositionManagementAgent(config)
# Start monitoring
try:
agent.start()
agent.logger.info("Position Management Agent started successfully")
except Exception as e:
agent.logger.error(f"Failed to start agent: {e}")
Get All Positions with Health Check¶
# Check agent health first
health = agent.health_check()
if health['status'] != 'healthy':
print(f"Agent unhealthy: {health}")
# Get positions
positions = agent.get_positions()
total_pnl = sum(pos.unrealized_pnl for pos in positions.values())
print(f"Portfolio Summary:")
print(f" Positions: {len(positions)}")
print(f" Total P/L: ${total_pnl:,.2f}")
print(f" Market Session: {health['market_session']}")
for symbol, pos in positions.items():
print(f"\n{symbol}: {pos.quantity} @ ${pos.entry_price:.2f}")
print(f" Current: ${pos.current_price:.2f}")
print(f" P/L: ${pos.unrealized_pnl:.2f} ({pos.unrealized_pnl_pct:+.1f}%)")
print(f" Stop: ${pos.stop_price:.2f}")
print(f" Session: {pos.market_session}")
if pos.profit_taken_20:
print(f" ✓ 20% profit taken")
if pos.stop_order_id:
print(f" ✓ Protected (Order: {pos.stop_order_id})")
Startup Behavior¶
On agent start, _scan_and_protect_existing_positions():
- Load Configuration: Validate all required environment variables
- Test Connections: Verify Alpaca API and Redis connectivity
- Fetch Positions: Get all current positions from Alpaca v2 API
- Load Stop Orders: Retrieve all open stop orders
- Reconcile Data: Match stops to positions, identify unprotected
- Calculate Stops: Use ATR-based calculation for missing stops
- Place Orders: Submit stop orders with error handling
- Initialize Monitoring: Start real-time P/L tracking
def _scan_and_protect_existing_positions(self):
"""Scan existing positions and ensure all have protective stops."""
try:
# Get current positions from Alpaca
alpaca_positions = self.trading_client.list_positions()
# Get current open orders
open_orders = self.trading_client.list_orders(
status=QueryOrderStatus.OPEN,
limit=500
)
stops_by_symbol = {
order.symbol: order for order in open_orders
if order.order_type == OrderType.STOP
}
protected_count = 0
new_stops_count = 0
for alpaca_pos in alpaca_positions:
symbol = alpaca_pos.symbol
# Create Position object
position = Position(
symbol=symbol,
quantity=int(alpaca_pos.qty),
entry_price=float(alpaca_pos.avg_entry_price),
current_price=float(alpaca_pos.market_value) / int(alpaca_pos.qty),
unrealized_pnl=float(alpaca_pos.unrealized_pl),
unrealized_pnl_pct=float(alpaca_pos.unrealized_plpc) * 100,
original_quantity=int(alpaca_pos.qty),
last_updated=datetime.now(),
market_session=self.current_session,
stop_price=0, # Will be calculated
target_price=0, # Will be calculated
stop_order_id=None,
profit_taken_20=False,
profit_taken_50=False,
high_water_mark=None,
earnings_reduced=False
)
# Check for existing stop
if symbol in stops_by_symbol:
stop_order = stops_by_symbol[symbol]
position.stop_order_id = stop_order.id
position.stop_price = float(stop_order.stop_price)
protected_count += 1
self.logger.info(f"Found existing stop for {symbol}: ${position.stop_price:.2f}")
else:
# Calculate and place new stop
try:
stop_price, target_price = self.calculate_atr_stop(
symbol, position.entry_price
)
position.stop_price = stop_price
position.target_price = target_price
# Place stop order
stop_order_id = self.place_stop_order(position)
if stop_order_id:
position.stop_order_id = stop_order_id
new_stops_count += 1
self.logger.info(f"Placed new stop for {symbol}: ${stop_price:.2f}")
except Exception as e:
self.logger.error(f"Failed to protect {symbol}: {e}")
# Initialize high water mark
if position.unrealized_pnl > 0:
position.high_water_mark = position.current_price
# Add to tracking
self.positions[symbol] = position
self.logger.info(f"Startup scan complete:")
self.logger.info(f" Total positions: {len(self.positions)}")
self.logger.info(f" Already protected: {protected_count}")
self.logger.info(f" New stops placed: {new_stops_count}")
return len(self.positions)
except Exception as e:
self.logger.error(f"Error in startup position scan: {e}")
return 0
P&L Calculation with Session Awareness¶
Long Position¶
def calculate_pnl(self, position: Position, current_price: float) -> tuple:
"""Calculate P&L with session-aware pricing."""
# Basic P&L calculation
if position.quantity > 0: # Long position
unrealized_pnl = (current_price - position.entry_price) * position.quantity
else: # Short position
unrealized_pnl = (position.entry_price - current_price) * abs(position.quantity)
unrealized_pnl_pct = (unrealized_pnl / (position.entry_price * abs(position.quantity))) * 100
# Adjust for extended hours volatility (informational only)
if position.market_session in ["PRE", "POST"]:
# Flag extended hours P&L
extended_hours_flag = True
else:
extended_hours_flag = False
return unrealized_pnl, unrealized_pnl_pct, extended_hours_flag
Circuit Breaker Protection¶
All Alpaca API calls are protected with current connection details:
def create_position_manager_breaker():
"""Create circuit breaker for position management operations."""
return CircuitBreaker(
failure_threshold=5,
recovery_timeout=30,
expected_exception=Exception
)
# Usage in agent
self.circuit_breaker = create_position_manager_breaker()
def place_stop_order(self, position: Position) -> Optional[str]:
"""Place stop order with circuit breaker protection."""
try:
stop_request = StopOrderRequest(
symbol=position.symbol,
qty=abs(position.quantity),
side=OrderSide.SELL if position.quantity > 0 else OrderSide.BUY,
time_in_force=TimeInForce.GTC,
stop_price=position.stop_price
)
# Protected API call
stop_order = self.circuit_breaker.call(
self.rate_limited_client.submit_order,
stop_request
)
self.logger.info(f"Stop order placed: {position.symbol} @ ${position.stop_price:.2f}")
return stop_order.id
except Exception as e:
self.logger.error(f"Failed to place stop for {position.symbol}: {e}")
# Publish error event for monitoring
self.publish_event('position.error', {
'symbol': position.symbol,
'error_type': 'STOP_ORDER_FAILED',
'message': str(e)
})
return None
See Also¶
- Strategy Agent - Generates entry and exit signals
- Risk Management Agent - Validates position sizes
- Order Execution Agent - Places stop orders
- Market Data Agent - Provides real-time quotes
- Trading Agents Overview - Architecture and event flow
- Trading Platform Architecture - System design
Last Updated: January 10, 2026