Skip to content

Position Management Agent

Position tracking, stop losses, P&L calculation, and automatic profit-taking


January 2026 Status

ACTIVE - Managing ~20 positions with ATR-based dynamic stops

Feature Status Details
Position Tracking ✅ Active Long and short positions
ATR-Based Stops ✅ Active 15-24% micro-cap, 8-15% large-cap
Trailing Stops ✅ Active Lock in profits as price rises
Profit-Taking ✅ Active 50% at +20%, remaining at +50%
Pre-Earnings Reduction ✅ Active 50% reduction 7 days before

Overview

The Position Management Agent tracks positions, manages stop losses, calculates P&L, and implements automatic trailing stops and profit-taking. The agent interfaces with Alpaca's current v2 API and integrates with our Redis-based message bus for real-time coordination.

trading_agents/position_management_agent.py

Responsibilities: - Track all open positions (long and short) - Set and update stop loss orders with error handling - Calculate unrealized P&L with market hours awareness - Monitor for stop/target hits with rate limiting - Implement trailing stops for profitable positions - Execute automatic profit-taking at +20% and +50% - Reduce positions before earnings (using Alpha Vantage data) - Handle after-hours and pre-market scenarios


Position Data Structure

@dataclass
class Position:
    symbol: str
    quantity: int              # Positive for long, negative for short
    entry_price: float
    stop_price: float
    target_price: float
    stop_order_id: Optional[str]
    current_price: Optional[float]
    unrealized_pnl: float
    unrealized_pnl_pct: float
    high_water_mark: Optional[float]  # Highest price since entry
    profit_taken_20: bool             # 20% profit already taken?
    profit_taken_50: bool             # 50% profit already taken?
    original_quantity: Optional[int]  # Original size before profit-taking
    last_updated: datetime            # Last price update timestamp
    market_session: str               # REGULAR, PRE, POST

Event Architecture

Events Published

Event Description Data
position.opened New position tracked symbol, quantity, entry_price, stop_price
position.stop_set Stop loss order placed symbol, stop_price, stop_order_id
position.stop_hit Stop loss triggered symbol, current_price, pnl
position.target_hit Profit target reached symbol, current_price, pnl
position.pnl_updated P&L recalculated symbol, unrealized_pnl
position.closed Position fully closed symbol, exit_price, pnl
position.trailing_stop_updated Trailing stop adjusted symbol, old_stop, new_stop
position.profit_taking Partial/full profit taken symbol, trigger, shares_sold
position.error Position management error symbol, error_type, message

Events Subscribed

Event Handler Description
risk.trade_approved handle_risk_trade_approved Place entry order
order.filled handle_order_filled Track new position
order.failed handle_order_failed Handle failed stop orders
market_data.quote.updated handle_market_data_quote_updated Monitor positions
market.session.changed handle_market_session_changed Adjust for market hours

ATR-Based Dynamic Stops

Phase 5 Enhancement

Instead of fixed percentage stops, the agent calculates stops based on Average True Range (ATR) using 14-day historical data:

Asset Type Stop Method Stop Range Target R/R
Micro-cap 2x ATR 15-24% 3:1
Large-cap 1.5x ATR 8-15% 2:1

ATR Calculation Implementation

def get_atr(self, symbol: str, period: int = 14) -> float:
    """Calculate Average True Range for the symbol."""
    try:
        # Fetch 15 days of bars (need 1 extra for previous close)
        bars = self.trading_client.get_bars(
            symbol_or_symbols=symbol,
            timeframe=TimeFrame.Day,
            start=datetime.now() - timedelta(days=20),
            end=datetime.now()
        ).df

        if len(bars) < period + 1:
            self.logger.warning(f"Insufficient data for ATR calculation: {symbol}")
            return None

        # Calculate True Range for each day
        bars['prev_close'] = bars['close'].shift(1)
        bars['tr1'] = bars['high'] - bars['low']
        bars['tr2'] = abs(bars['high'] - bars['prev_close'])
        bars['tr3'] = abs(bars['low'] - bars['prev_close'])
        bars['true_range'] = bars[['tr1', 'tr2', 'tr3']].max(axis=1)

        # ATR = Simple moving average of True Range
        atr = bars['true_range'].rolling(window=period).mean().iloc[-1]

        # Cache the result for 60 minutes
        self.redis_client.setex(
            f"atr:{symbol}:{period}", 
            3600, 
            str(atr)
        )

        return atr

    except Exception as e:
        self.logger.error(f"Error calculating ATR for {symbol}: {e}")
        return None

Stop Calculation with Error Handling

def calculate_atr_stop(self, symbol: str, entry_price: float):
    """Calculate ATR-based stop and target prices."""
    try:
        # Check cache first
        cached_atr = self.redis_client.get(f"atr:{symbol}:14")
        if cached_atr:
            atr = float(cached_atr)
        else:
            atr = self.get_atr(symbol)

        if atr is None:
            # Fallback to fixed percentages
            self.logger.warning(f"Using fallback stops for {symbol}")
            if self.is_micro_cap(symbol):
                stop_pct = 0.20  # 20% fallback
                target_pct = 0.60  # 3:1 R/R
            else:
                stop_pct = 0.12  # 12% fallback  
                target_pct = 0.24  # 2:1 R/R
        else:
            # Calculate ATR-based percentages
            if self.is_micro_cap(symbol):
                stop_pct = max(0.15, min(0.24, 2 * atr / entry_price))
                target_pct = 3 * stop_pct  # 3:1 R/R
            else:
                stop_pct = max(0.08, min(0.15, 1.5 * atr / entry_price))
                target_pct = 2 * stop_pct  # 2:1 R/R

        stop_price = entry_price * (1 - stop_pct)
        target_price = entry_price * (1 + target_pct)

        return stop_price, target_price

    except Exception as e:
        self.logger.error(f"Error calculating stops for {symbol}: {e}")
        # Return conservative fallback
        return entry_price * 0.85, entry_price * 1.30

Market Hours Handling

Session Awareness

The agent adjusts behavior based on market session:

Session Behavior Stop Orders
PRE Monitor only No modifications
REGULAR Full management Active trailing
POST Limited updates Hold existing
CLOSED Cache updates No new orders

Implementation

def handle_market_session_changed(self, session: str):
    """Adjust agent behavior for market session."""
    self.current_session = session

    if session == "REGULAR":
        self.logger.info("Market open - resuming full position management")
        self.active_management = True
    elif session in ["PRE", "POST"]:
        self.logger.info(f"Extended hours ({session}) - limited management")
        self.active_management = False
    else:  # CLOSED
        self.logger.info("Market closed - monitoring only")
        self.active_management = False

def should_update_stops(self) -> bool:
    """Check if stop updates are allowed in current session."""
    return self.current_session == "REGULAR" and self.active_management

Error Handling & Rate Limiting

Order Failure Handling

def handle_order_failed(self, event_data: dict):
    """Handle failed stop order placement."""
    symbol = event_data.get('symbol')
    error_msg = event_data.get('error', 'Unknown error')

    self.logger.error(f"Stop order failed for {symbol}: {error_msg}")

    # Retry logic for transient errors
    if "insufficient buying power" not in error_msg.lower():
        self.retry_stop_order(symbol, max_retries=3)
    else:
        # Alert for position sizing issues
        self.publish_event('position.error', {
            'symbol': symbol,
            'error_type': 'INSUFFICIENT_FUNDS',
            'message': error_msg
        })

def retry_stop_order(self, symbol: str, max_retries: int = 3):
    """Retry failed stop order with exponential backoff."""
    for attempt in range(max_retries):
        try:
            time.sleep(2 ** attempt)  # Exponential backoff
            position = self.positions.get(symbol)
            if position and not position.stop_order_id:
                self.place_stop_order(position)
                break
        except Exception as e:
            self.logger.warning(f"Stop retry {attempt+1} failed for {symbol}: {e}")

API Rate Limiting

class RateLimitedAlpacaClient:
    def __init__(self, trading_client):
        self.client = trading_client
        self.last_call = {}
        self.min_interval = 0.2  # 200ms between API calls

    def submit_order(self, request):
        """Rate-limited order submission."""
        now = time.time()
        if 'submit_order' in self.last_call:
            elapsed = now - self.last_call['submit_order']
            if elapsed < self.min_interval:
                time.sleep(self.min_interval - elapsed)

        self.last_call['submit_order'] = time.time()
        return self.client.submit_order(request)

Automatic Profit-Taking

Dynamic Profit Targets

Configuration-based profit targets (stored in environment):

Trigger Action Default % Shares Sold
First Target Partial exit 20% gain 50% of position
Second Target Full exit 50% gain Remaining shares

Implementation with Error Handling

def check_profit_targets(self, position: Position):
    """Check and execute profit-taking targets."""
    try:
        gain_pct = position.unrealized_pnl_pct / 100

        # Get configurable targets from environment
        target_1_pct = float(os.getenv('PROFIT_TARGET_1', '0.20'))
        target_2_pct = float(os.getenv('PROFIT_TARGET_2', '0.50'))

        # First target: Partial exit
        if gain_pct >= target_1_pct and not position.profit_taken_20:
            sell_qty = position.original_quantity // 2
            # Use market order during regular hours for better fills
            order_type = "market" if self.current_session == "REGULAR" else "limit"
            limit_price = position.current_price * 0.995 if order_type == "limit" else None

            try:
                order = self.submit_profit_taking_order(
                    position.symbol, sell_qty, order_type, limit_price
                )
                position.profit_taken_20 = True

                self.publish_event('position.profit_taking', {
                    'symbol': position.symbol,
                    'trigger': f'{target_1_pct*100:.0f}%',
                    'shares_sold': sell_qty,
                    'order_id': order.id
                })

            except Exception as e:
                self.logger.error(f"Failed to execute 20% profit taking for {position.symbol}: {e}")
                return

        # Second target: Full exit
        if gain_pct >= target_2_pct and not position.profit_taken_50:
            try:
                # Cancel existing stop order first
                if position.stop_order_id:
                    self.cancel_order(position.stop_order_id)

                # Sell all remaining shares
                remaining_qty = position.quantity
                order = self.submit_profit_taking_order(
                    position.symbol, remaining_qty, "market"
                )
                position.profit_taken_50 = True

                self.publish_event('position.profit_taking', {
                    'symbol': position.symbol,
                    'trigger': f'{target_2_pct*100:.0f}%',
                    'shares_sold': remaining_qty,
                    'order_id': order.id
                })

            except Exception as e:
                self.logger.error(f"Failed to execute 50% profit taking for {position.symbol}: {e}")

    except Exception as e:
        self.logger.error(f"Error in profit target check for {position.symbol}: {e}")

Pre-Earnings Risk Reduction

Alpha Vantage Integration

Uses Alpha Vantage API for earnings calendar data:

def get_earnings_date(self, symbol: str) -> Optional[datetime]:
    """Get next earnings date from Alpha Vantage."""
    try:
        api_key = os.getenv('ALPHA_VANTAGE_API_KEY')
        if not api_key:
            self.logger.warning("Alpha Vantage API key not configured")
            return None

        # Check cache first
        cached_date = self.redis_client.get(f"earnings:{symbol}")
        if cached_date:
            return datetime.fromisoformat(cached_date.decode())

        url = f"https://www.alphavantage.co/query"
        params = {
            'function': 'EARNINGS_CALENDAR',
            'symbol': symbol,
            'apikey': api_key,
            'horizon': '3month'
        }

        response = requests.get(url, params=params, timeout=10)
        response.raise_for_status()

        # Parse CSV response
        earnings_data = response.text
        if "reportDate" in earnings_data:
            lines = earnings_data.strip().split('\n')
            if len(lines) > 1:  # Has data beyond header
                next_earnings_str = lines[1].split(',')[1]  # reportDate column
                next_earnings = datetime.strptime(next_earnings_str, '%Y-%m-%d')

                # Cache for 24 hours
                self.redis_client.setex(
                    f"earnings:{symbol}", 
                    86400, 
                    next_earnings.isoformat()
                )
                return next_earnings

        return None

    except Exception as e:
        self.logger.error(f"Error fetching earnings date for {symbol}: {e}")
        return None

def check_pre_earnings_reduction(self):
    """Check and execute pre-earnings position reduction."""
    for symbol, position in self.positions.items():
        try:
            earnings_date = self.get_earnings_date(symbol)
            if not earnings_date:
                continue

            days_until = (earnings_date.date() - datetime.now().date()).days

            if 0 <= days_until <= 7 and not position.earnings_reduced:
                sell_qty = position.quantity // 2
                if sell_qty > 0:
                    order = self.submit_earnings_reduction_order(symbol, sell_qty)
                    position.earnings_reduced = True

                    self.logger.info(f"Pre-earnings reduction: {symbol} - sold {sell_qty} shares")
                    self.publish_event('position.earnings_reduction', {
                        'symbol': symbol,
                        'days_until_earnings': days_until,
                        'shares_sold': sell_qty
                    })

        except Exception as e:
            self.logger.error(f"Error in pre-earnings check for {symbol}: {e}")

Configuration Management

Environment Variables

Variable Default Description
ALPACA_API_KEY Required Alpaca API key
ALPACA_SECRET_KEY Required Alpaca secret key
ALPACA_PAPER "true" Use paper trading
REDIS_HOST "localhost" Redis message bus host
REDIS_PORT "6379" Redis port
REDIS_PASSWORD None Redis password
ALPHA_VANTAGE_API_KEY Required Earnings data API key
PROFIT_TARGET_1 "0.20" First profit target (20%)
PROFIT_TARGET_2 "0.50" Second profit target (50%)
TRAIL_PERCENT_MICRO "0.09" Micro-cap trail % (9%)
TRAIL_PERCENT_LARGE "0.05" Large-cap trail % (5%)
MIN_STOP_ADJUSTMENT "0.10" Minimum stop movement
ATR_CACHE_TTL "3600" ATR cache time (seconds)

Configuration Loading

@dataclass
class PositionManagerConfig:
    alpaca_api_key: str
    alpaca_secret_key: str
    paper_trading: bool = True
    redis_host: str = "localhost"
    redis_port: int = 6379
    redis_password: Optional[str] = None
    profit_target_1: float = 0.20
    profit_target_2: float = 0.50
    trail_percent_micro: float = 0.09
    trail_percent_large: float = 0.05
    min_stop_adjustment: float = 0.10
    atr_cache_ttl: int = 3600

    @classmethod
    def from_env(cls):
        return cls(
            alpaca_api_key=os.getenv('ALPACA_API_KEY'),
            alpaca_secret_key=os.getenv('ALPACA_SECRET_KEY'),
            paper_trading=os.getenv('ALPACA_PAPER', 'true').lower() == 'true',
            redis_host=os.getenv('REDIS_HOST', 'localhost'),
            redis_port=int(os.getenv('REDIS_PORT', '6379')),
            redis_password=os.getenv('REDIS_PASSWORD'),
            profit_target_1=float(os.getenv('PROFIT_TARGET_1', '0.20')),
            profit_target_2=float(os.getenv('PROFIT_TARGET_2', '0.50')),
            trail_percent_micro=float(os.getenv('TRAIL_PERCENT_MICRO', '0.09')),
            trail_percent_large=float(os.getenv('TRAIL_PERCENT_LARGE', '0.05')),
            min_stop_adjustment=float(os.getenv('MIN_STOP_ADJUSTMENT', '0.10')),
            atr_cache_ttl=int(os.getenv('ATR_CACHE_TTL', '3600'))
        )

Monitoring and Alerting

Health Checks

def health_check(self) -> dict:
    """Return agent health status."""
    try:
        return {
            'status': 'healthy',
            'positions_count': len(self.positions),
            'active_stops': sum(1 for p in self.positions.values() if p.stop_order_id),
            'total_pnl': sum(p.unrealized_pnl for p in self.positions.values()),
            'last_update': self.last_update_time.isoformat() if self.last_update_time else None,
            'market_session': self.current_session,
            'active_management': self.active_management,
            'redis_connected': self.redis_client.ping(),
            'alpaca_connected': self._test_alpaca_connection()
        }
    except Exception as e:
        return {'status': 'unhealthy', 'error': str(e)}

def _test_alpaca_connection(self) -> bool:
    """Test Alpaca API connectivity."""
    try:
        account = self.trading_client.get_account()
        return account is not None
    except:
        return False

Alert Conditions

def check_alert_conditions(self):
    """Check for conditions that require alerts."""

    # Large unrealized loss
    large_losses = [p for p in self.positions.values() 
                   if p.unrealized_pnl_pct < -15]
    if large_losses:
        self.send_alert('LARGE_LOSSES', 
                       f"{len(large_losses)} positions with >15% loss")

    # Missing stop orders
    unprotected = [p for p in self.positions.values() 
                  if not p.stop_order_id and abs(p.unrealized_pnl_pct) > 5]
    if unprotected:
        self.send_alert('UNPROTECTED_POSITIONS',
                       f"{len(unprotected)} positions without stops")

    # Stale data
    if self.last_update_time and \
       (datetime.now() - self.last_update_time).seconds > 300:
        self.send_alert('STALE_DATA', 'No price updates for 5+ minutes')

Usage Examples

Initialize Agent with Configuration

from trading_agents.position_management_agent import PositionManagementAgent
from trading_agents.position_management_agent import PositionManagerConfig

# Load configuration from environment
config = PositionManagerConfig.from_env()

# Initialize agent
agent = PositionManagementAgent(config)

# Start monitoring
try:
    agent.start()
    agent.logger.info("Position Management Agent started successfully")
except Exception as e:
    agent.logger.error(f"Failed to start agent: {e}")

Get All Positions with Health Check

# Check agent health first
health = agent.health_check()
if health['status'] != 'healthy':
    print(f"Agent unhealthy: {health}")

# Get positions
positions = agent.get_positions()
total_pnl = sum(pos.unrealized_pnl for pos in positions.values())

print(f"Portfolio Summary:")
print(f"  Positions: {len(positions)}")
print(f"  Total P/L: ${total_pnl:,.2f}")
print(f"  Market Session: {health['market_session']}")

for symbol, pos in positions.items():
    print(f"\n{symbol}: {pos.quantity} @ ${pos.entry_price:.2f}")
    print(f"  Current: ${pos.current_price:.2f}")
    print(f"  P/L: ${pos.unrealized_pnl:.2f} ({pos.unrealized_pnl_pct:+.1f}%)")
    print(f"  Stop: ${pos.stop_price:.2f}")
    print(f"  Session: {pos.market_session}")
    if pos.profit_taken_20:
        print(f"  ✓ 20% profit taken")
    if pos.stop_order_id:
        print(f"  ✓ Protected (Order: {pos.stop_order_id})")

Startup Behavior

On agent start, _scan_and_protect_existing_positions():

  1. Load Configuration: Validate all required environment variables
  2. Test Connections: Verify Alpaca API and Redis connectivity
  3. Fetch Positions: Get all current positions from Alpaca v2 API
  4. Load Stop Orders: Retrieve all open stop orders
  5. Reconcile Data: Match stops to positions, identify unprotected
  6. Calculate Stops: Use ATR-based calculation for missing stops
  7. Place Orders: Submit stop orders with error handling
  8. Initialize Monitoring: Start real-time P/L tracking
def _scan_and_protect_existing_positions(self):
    """Scan existing positions and ensure all have protective stops."""
    try:
        # Get current positions from Alpaca
        alpaca_positions = self.trading_client.list_positions()

        # Get current open orders
        open_orders = self.trading_client.list_orders(
            status=QueryOrderStatus.OPEN,
            limit=500
        )

        stops_by_symbol = {
            order.symbol: order for order in open_orders 
            if order.order_type == OrderType.STOP
        }

        protected_count = 0
        new_stops_count = 0

        for alpaca_pos in alpaca_positions:
            symbol = alpaca_pos.symbol

            # Create Position object
            position = Position(
                symbol=symbol,
                quantity=int(alpaca_pos.qty),
                entry_price=float(alpaca_pos.avg_entry_price),
                current_price=float(alpaca_pos.market_value) / int(alpaca_pos.qty),
                unrealized_pnl=float(alpaca_pos.unrealized_pl),
                unrealized_pnl_pct=float(alpaca_pos.unrealized_plpc) * 100,
                original_quantity=int(alpaca_pos.qty),
                last_updated=datetime.now(),
                market_session=self.current_session,
                stop_price=0,  # Will be calculated
                target_price=0,  # Will be calculated
                stop_order_id=None,
                profit_taken_20=False,
                profit_taken_50=False,
                high_water_mark=None,
                earnings_reduced=False
            )

            # Check for existing stop
            if symbol in stops_by_symbol:
                stop_order = stops_by_symbol[symbol]
                position.stop_order_id = stop_order.id
                position.stop_price = float(stop_order.stop_price)
                protected_count += 1
                self.logger.info(f"Found existing stop for {symbol}: ${position.stop_price:.2f}")
            else:
                # Calculate and place new stop
                try:
                    stop_price, target_price = self.calculate_atr_stop(
                        symbol, position.entry_price
                    )
                    position.stop_price = stop_price
                    position.target_price = target_price

                    # Place stop order
                    stop_order_id = self.place_stop_order(position)
                    if stop_order_id:
                        position.stop_order_id = stop_order_id
                        new_stops_count += 1
                        self.logger.info(f"Placed new stop for {symbol}: ${stop_price:.2f}")

                except Exception as e:
                    self.logger.error(f"Failed to protect {symbol}: {e}")

            # Initialize high water mark
            if position.unrealized_pnl > 0:
                position.high_water_mark = position.current_price

            # Add to tracking
            self.positions[symbol] = position

        self.logger.info(f"Startup scan complete:")
        self.logger.info(f"  Total positions: {len(self.positions)}")
        self.logger.info(f"  Already protected: {protected_count}")
        self.logger.info(f"  New stops placed: {new_stops_count}")

        return len(self.positions)

    except Exception as e:
        self.logger.error(f"Error in startup position scan: {e}")
        return 0

P&L Calculation with Session Awareness

Long Position

def calculate_pnl(self, position: Position, current_price: float) -> tuple:
    """Calculate P&L with session-aware pricing."""

    # Basic P&L calculation
    if position.quantity > 0:  # Long position
        unrealized_pnl = (current_price - position.entry_price) * position.quantity
    else:  # Short position
        unrealized_pnl = (position.entry_price - current_price) * abs(position.quantity)

    unrealized_pnl_pct = (unrealized_pnl / (position.entry_price * abs(position.quantity))) * 100

    # Adjust for extended hours volatility (informational only)
    if position.market_session in ["PRE", "POST"]:
        # Flag extended hours P&L
        extended_hours_flag = True
    else:
        extended_hours_flag = False

    return unrealized_pnl, unrealized_pnl_pct, extended_hours_flag

Circuit Breaker Protection

All Alpaca API calls are protected with current connection details:

def create_position_manager_breaker():
    """Create circuit breaker for position management operations."""
    return CircuitBreaker(
        failure_threshold=5,
        recovery_timeout=30,
        expected_exception=Exception
    )

# Usage in agent
self.circuit_breaker = create_position_manager_breaker()

def place_stop_order(self, position: Position) -> Optional[str]:
    """Place stop order with circuit breaker protection."""
    try:
        stop_request = StopOrderRequest(
            symbol=position.symbol,
            qty=abs(position.quantity),
            side=OrderSide.SELL if position.quantity > 0 else OrderSide.BUY,
            time_in_force=TimeInForce.GTC,
            stop_price=position.stop_price
        )

        # Protected API call
        stop_order = self.circuit_breaker.call(
            self.rate_limited_client.submit_order,
            stop_request
        )

        self.logger.info(f"Stop order placed: {position.symbol} @ ${position.stop_price:.2f}")
        return stop_order.id

    except Exception as e:
        self.logger.error(f"Failed to place stop for {position.symbol}: {e}")

        # Publish error event for monitoring
        self.publish_event('position.error', {
            'symbol': position.symbol,
            'error_type': 'STOP_ORDER_FAILED',
            'message': str(e)
        })
        return None

See Also


Last Updated: January 10, 2026