Real-time Data Streaming

Comprehensive real-time WebSocket implementation reference for live market data and portfolio tracking.

Module Overview

The Real-time module provides WebSocket-based live data streaming capabilities:

Core Features

  • Live Price Feeds: Real-time asset price updates from multiple data sources

  • Portfolio Tracking: Live portfolio value calculations with position updates

  • WebSocket Management: Connection handling, authentication, and message routing

  • Scalable Architecture: High-frequency updates for multiple concurrent users

  • Production Ready: Comprehensive error handling, logging, and monitoring

WebSocket Architecture

Connection Management

The connection manager handles all WebSocket connections, user authentication, and subscription management:

from personal_finance.realtime.connection_manager import ConnectionManager
from personal_finance.realtime.consumers import RealtimeConsumer

# Get the global connection manager instance
connection_manager = ConnectionManager()

# Connection stats
print(f\"Total connections: {connection_manager.get_connection_count()}\")
print(f\"Authenticated users: {connection_manager.get_authenticated_user_count()}\")

# User-specific connections
user_connections = connection_manager.get_user_connections(user)
print(f\"User has {len(user_connections)} active connections\")

# Broadcast to all user connections
connection_manager.broadcast_to_user(user, {
    'type': 'notification',
    'data': {'message': 'Portfolio updated successfully'}
})

# Broadcast to specific subscribers
connection_manager.broadcast_to_asset_subscribers('AAPL', {
    'type': 'asset_update',
    'data': {'symbol': 'AAPL', 'price': 150.25, 'change': 2.50}
})

WebSocket Consumer

Main WebSocket consumer handling client connections and message routing:

# RealtimeConsumer handles all WebSocket connections
class RealtimeConsumer(AsyncWebsocketConsumer):
    \"\"\"Real-time WebSocket consumer for live data streaming.\"\"\"

    async def connect(self):
        \"\"\"Handle new WebSocket connection.\"\"\"
        # Authenticate user
        user = self.scope.get('user')
        if not user or not user.is_authenticated:
            await self.close()
            return

        # Register connection
        await self.connection_manager.add_connection(self)
        await self.accept()

        # Send connection confirmation
        await self.send_json({
            'type': 'connection',
            'data': {
                'status': 'connected',
                'connection_id': str(self.connection_id),
                'authenticated': True,
                'user_id': user.id
            }
        })

Message Protocol

All WebSocket messages follow a standardized JSON protocol:

Base Message Structure:

{
    \"type\": \"message_type\",
    \"data\": {
        // Message-specific payload
    },
    \"timestamp\": \"2024-01-15T10:30:00Z\",
    \"connection_id\": \"uuid-string\"
}

Client to Server Messages:

// Ping/Pong for connection health
{
    \"type\": \"ping\",
    \"data\": {}
}

// Subscribe to asset price updates
{
    \"type\": \"subscribe_asset\",
    \"data\": {
        \"symbol\": \"AAPL\"
    }
}

// Subscribe to portfolio updates
{
    \"type\": \"subscribe_portfolio\",
    \"data\": {
        \"portfolio_id\": 123
    }
}

// Unsubscribe from updates
{
    \"type\": \"unsubscribe_asset\",
    \"data\": {
        \"symbol\": \"AAPL\"
    }
}

Server to Client Messages:

// Asset price update
{
    \"type\": \"asset_update\",
    \"data\": {
        \"symbol\": \"AAPL\",
        \"price\": 150.25,
        \"change\": 2.50,
        \"change_percent\": 1.69,
        \"volume\": 50000000,
        \"high\": 151.00,
        \"low\": 148.50,
        \"market_cap\": 2400000000000,
        \"last_updated\": \"2024-01-15T15:30:00Z\"
    }
}

// Portfolio value update
{
    \"type\": \"portfolio_update\",
    \"data\": {
        \"portfolio_id\": 123,
        \"name\": \"Growth Portfolio\",
        \"total_value\": 50000.00,
        \"daily_change\": 125.50,
        \"daily_change_percent\": 0.25,
        \"positions_updated\": [
            {
                \"symbol\": \"AAPL\",
                \"quantity\": 100,
                \"current_value\": 15025.00,
                \"daily_change\": 250.00
            }
        ]
    }
}

Price Feed Service

Real-time Price Updates

Core service managing real-time price data updates:

from personal_finance.realtime.services import PriceFeedService
from decimal import Decimal

# Initialize the price feed service
price_service = PriceFeedService(
    update_interval=30,  # Update every 30 seconds
    batch_size=50,       # Process 50 assets per batch
    enable_caching=True  # Enable Redis caching
)

# Start the price feed (blocking operation)
price_service.start_price_feed()

# Update specific assets manually
symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']
updated_prices = price_service.update_asset_prices(symbols)

for symbol, price_data in updated_prices.items():
    print(f\"{symbol}: ${price_data['price']} ({price_data['change']:+.2f})\")

# Get current service status
status = price_service.get_service_status()
print(f\"Service running: {status['running']}\")
print(f\"Last update: {status['last_update']}\")
print(f\"Assets tracked: {status['assets_tracked']}\")
print(f\"Update frequency: {status['update_interval']}s\")

Batch Processing

Efficient batch processing for high-volume price updates:

from personal_finance.realtime.batch import BatchPriceProcessor

# Initialize batch processor
batch_processor = BatchPriceProcessor(
    batch_size=100,
    concurrent_batches=3,
    retry_failed=True
)

# Process large asset list efficiently
all_tracked_symbols = Asset.objects.filter(
    is_active=True,
    positions__quantity__gt=0
).values_list('symbol', flat=True).distinct()

print(f\"Processing {len(all_tracked_symbols)} assets in batches...\")

# Process with progress tracking
for batch_num, results in batch_processor.process_in_batches(all_tracked_symbols):
    successful = len([r for r in results if r['success']])
    print(f\"Batch {batch_num}: {successful}/{len(results)} successful\")

# Handle failed updates
failed_updates = batch_processor.get_failed_updates()
if failed_updates:
    print(f\"Retrying {len(failed_updates)} failed updates...\")
    retry_results = batch_processor.retry_failed_updates(failed_updates)

Data Source Integration

Multiple market data source integration:

from personal_finance.realtime.data_sources import (
    YahooFinanceSource,
    AlphaVantageSource,
    DataSourceManager
)

# Configure data sources with fallback
data_source_manager = DataSourceManager()

# Primary source
yahoo_source = YahooFinanceSource(
    api_key=settings.YAHOO_FINANCE_API_KEY,
    rate_limit=100  # requests per minute
)
data_source_manager.add_source(yahoo_source, priority=1)

# Fallback source
alpha_vantage_source = AlphaVantageSource(
    api_key=settings.ALPHA_VANTAGE_API_KEY,
    rate_limit=5  # requests per minute (free tier)
)
data_source_manager.add_source(alpha_vantage_source, priority=2)

# Fetch data with automatic fallback
price_data = data_source_manager.get_asset_data(['AAPL', 'GOOGL'])

for symbol, data in price_data.items():
    if data['success']:
        print(f\"{symbol}: ${data['price']} (source: {data['source']})\")
    else:
        print(f\"{symbol}: Failed - {data['error']}\")

Portfolio Value Tracking

Real-time Portfolio Updates

Service for real-time portfolio value calculations and updates:

from personal_finance.realtime.portfolio_tracker import PortfolioTracker

# Initialize portfolio tracker
tracker = PortfolioTracker()

# Get user's portfolios for real-time tracking
user_portfolios = Portfolio.objects.filter(user=user)

# Start tracking all user portfolios
for portfolio in user_portfolios:
    tracker.start_tracking_portfolio(portfolio)

# Update specific portfolio based on price changes
portfolio = user_portfolios.first()
price_updates = {
    'AAPL': Decimal('150.25'),
    'GOOGL': Decimal('2800.75'),
    'MSFT': Decimal('420.30')
}

updated_portfolio = tracker.update_portfolio_values(portfolio, price_updates)

print(f\"Portfolio '{portfolio.name}' updated:\")
print(f\"  Total value: ${updated_portfolio['total_value']}\")
print(f\"  Daily change: ${updated_portfolio['daily_change']} ({updated_portfolio['daily_change_percent']:.2f}%)\")

# Broadcast updates to WebSocket subscribers
if updated_portfolio['value_changed']:
    connection_manager.broadcast_to_portfolio_subscribers(
        portfolio.id,
        {
            'type': 'portfolio_update',
            'data': updated_portfolio
        }
    )

Position-Level Updates

Track individual position changes within portfolios:

# Update individual positions and propagate to portfolio
position = Position.objects.get(portfolio=portfolio, asset__symbol='AAPL')

position_update = tracker.update_position_value(
    position,
    new_price=Decimal('150.25')
)

print(f\"Position update for {position.asset.symbol}:\")
print(f\"  Quantity: {position.quantity}\")
print(f\"  New price: ${position_update['current_price']}\")
print(f\"  Position value: ${position_update['current_value']}\")
print(f\"  Daily change: ${position_update['daily_change']}\")
print(f\"  Total return: ${position_update['total_return']} ({position_update['total_return_percent']:.2f}%)\")

# Send position-specific updates to subscribers
if position_update['significant_change']:  # > 1% change
    connection_manager.broadcast_to_user(
        portfolio.user,
        {
            'type': 'position_update',
            'data': position_update
        }
    )

Client-Side Integration

JavaScript WebSocket Client

Comprehensive JavaScript client for real-time data consumption:

class PersonalFinanceWebSocket {
    constructor(options = {}) {
        this.url = options.url || 'ws://localhost:8000/ws/realtime/';
        this.reconnectInterval = options.reconnectInterval || 5000;
        this.maxReconnectAttempts = options.maxReconnectAttempts || 10;
        this.reconnectAttempts = 0;

        this.socket = null;
        this.isConnected = false;
        this.subscriptions = new Set();
        this.messageHandlers = new Map();
        this.connectionId = null;

        // Event callbacks
        this.onConnect = options.onConnect || (() => {});
        this.onDisconnect = options.onDisconnect || (() => {});
        this.onError = options.onError || console.error;
        this.onMessage = options.onMessage || (() => {});
    }

    connect() {
        try {
            this.socket = new WebSocket(this.url);
            this.setupEventHandlers();
        } catch (error) {
            this.onError('WebSocket connection failed:', error);
            this.scheduleReconnect();
        }
    }

    setupEventHandlers() {
        this.socket.onopen = (event) => {
            console.log('WebSocket connected');
            this.isConnected = true;
            this.reconnectAttempts = 0;

            // Send ping to establish connection
            this.sendMessage('ping', {});
            this.onConnect(event);
        };

        this.socket.onmessage = (event) => {
            try {
                const message = JSON.parse(event.data);
                this.handleMessage(message);
            } catch (error) {
                this.onError('Failed to parse WebSocket message:', error);
            }
        };

        this.socket.onclose = (event) => {
            console.log('WebSocket disconnected');
            this.isConnected = false;
            this.connectionId = null;
            this.onDisconnect(event);

            if (!event.wasClean) {
                this.scheduleReconnect();
            }
        };

        this.socket.onerror = (error) => {
            console.error('WebSocket error:', error);
            this.onError(error);
        };
    }

    sendMessage(type, data) {
        if (!this.isConnected || !this.socket) {
            console.warn('WebSocket not connected, cannot send message');
            return false;
        }

        const message = {
            type: type,
            data: data,
            timestamp: new Date().toISOString()
        };

        this.socket.send(JSON.stringify(message));
        return true;
    }

    handleMessage(message) {
        // Handle built-in message types
        switch (message.type) {
            case 'connection':
                this.connectionId = message.data.connection_id;
                break;

            case 'pong':
                console.log('Received pong from server');
                break;

            case 'asset_update':
                this.handleAssetUpdate(message.data);
                break;

            case 'portfolio_update':
                this.handlePortfolioUpdate(message.data);
                break;

            case 'error':
                this.onError('Server error:', message.data);
                break;
        }

        // Call registered message handlers
        if (this.messageHandlers.has(message.type)) {
            const handler = this.messageHandlers.get(message.type);
            handler(message.data, message);
        }

        // Call general message callback
        this.onMessage(message);
    }

    // Asset subscription management
    subscribeToAsset(symbol) {
        if (this.sendMessage('subscribe_asset', { symbol: symbol })) {
            this.subscriptions.add(`asset:${symbol}`);
            return true;
        }
        return false;
    }

    unsubscribeFromAsset(symbol) {
        if (this.sendMessage('unsubscribe_asset', { symbol: symbol })) {
            this.subscriptions.delete(`asset:${symbol}`);
            return true;
        }
        return false;
    }

    // Portfolio subscription management
    subscribeToPortfolio(portfolioId) {
        if (this.sendMessage('subscribe_portfolio', { portfolio_id: portfolioId })) {
            this.subscriptions.add(`portfolio:${portfolioId}`);
            return true;
        }
        return false;
    }

    unsubscribeFromPortfolio(portfolioId) {
        if (this.sendMessage('unsubscribe_portfolio', { portfolio_id: portfolioId })) {
            this.subscriptions.delete(`portfolio:${portfolioId}`);
            return true;
        }
        return false;
    }

    // Message handler registration
    onMessageType(type, handler) {
        this.messageHandlers.set(type, handler);
    }

    // Built-in update handlers
    handleAssetUpdate(data) {
        console.log(`Asset update: ${data.symbol} = $${data.price}`);

        // Update DOM elements
        const priceElement = document.getElementById(`price-${data.symbol}`);
        if (priceElement) {
            priceElement.textContent = `$${data.price.toFixed(2)}`;

            // Add visual feedback for price changes
            const changeClass = data.change >= 0 ? 'price-up' : 'price-down';
            priceElement.className = `price ${changeClass}`;
        }

        const changeElement = document.getElementById(`change-${data.symbol}`);
        if (changeElement) {
            changeElement.textContent = `${data.change >= 0 ? '+' : ''}${data.change.toFixed(2)} (${data.change_percent.toFixed(2)}%)`;
        }
    }

    handlePortfolioUpdate(data) {
        console.log(`Portfolio update: ${data.name} = $${data.total_value}`);

        // Update portfolio display
        const valueElement = document.getElementById(`portfolio-value-${data.portfolio_id}`);
        if (valueElement) {
            valueElement.textContent = `$${data.total_value.toFixed(2)}`;
        }

        const changeElement = document.getElementById(`portfolio-change-${data.portfolio_id}`);
        if (changeElement) {
            const changeText = `${data.daily_change >= 0 ? '+' : ''}$${data.daily_change.toFixed(2)} (${data.daily_change_percent.toFixed(2)}%)`;
            changeElement.textContent = changeText;
            changeElement.className = data.daily_change >= 0 ? 'change-positive' : 'change-negative';
        }
    }

    // Connection management
    scheduleReconnect() {
        if (this.reconnectAttempts >= this.maxReconnectAttempts) {
            console.error('Max reconnection attempts reached');
            return;
        }

        this.reconnectAttempts++;
        console.log(`Scheduling reconnection attempt ${this.reconnectAttempts} in ${this.reconnectInterval}ms`);

        setTimeout(() => {
            console.log('Attempting to reconnect...');
            this.connect();
        }, this.reconnectInterval);
    }

    disconnect() {
        if (this.socket) {
            this.socket.close();
        }
    }
}

// Usage example
const wsClient = new PersonalFinanceWebSocket({
    url: 'ws://localhost:8000/ws/realtime/',
    onConnect: () => console.log('Connected to real-time feed'),
    onDisconnect: () => console.log('Disconnected from real-time feed'),
    onError: (error) => console.error('WebSocket error:', error)
});

// Connect and subscribe
wsClient.connect();

// Subscribe to specific assets
wsClient.subscribeToAsset('AAPL');
wsClient.subscribeToAsset('GOOGL');

// Subscribe to portfolio updates
wsClient.subscribeToPortfolio(123);

// Register custom message handlers
wsClient.onMessageType('custom_alert', (data) => {
    alert(`Custom alert: ${data.message}`);
});

React Integration

React hook for WebSocket integration:

import { useState, useEffect, useRef, useCallback } from 'react';

// Custom hook for WebSocket connection
function usePersonalFinanceWebSocket(url, options = {}) {
    const [isConnected, setIsConnected] = useState(false);
    const [lastMessage, setLastMessage] = useState(null);
    const [connectionError, setConnectionError] = useState(null);
    const [assetPrices, setAssetPrices] = useState({});
    const [portfolioValues, setPortfolioValues] = useState({});

    const wsClient = useRef(null);
    const subscriptions = useRef(new Set());

    useEffect(() => {
        // Initialize WebSocket client
        wsClient.current = new PersonalFinanceWebSocket({
            url,
            onConnect: () => {
                setIsConnected(true);
                setConnectionError(null);
            },
            onDisconnect: () => {
                setIsConnected(false);
            },
            onError: (error) => {
                setConnectionError(error);
            },
            onMessage: (message) => {
                setLastMessage(message);
            }
        });

        // Register message handlers
        wsClient.current.onMessageType('asset_update', (data) => {
            setAssetPrices(prev => ({
                ...prev,
                [data.symbol]: data
            }));
        });

        wsClient.current.onMessageType('portfolio_update', (data) => {
            setPortfolioValues(prev => ({
                ...prev,
                [data.portfolio_id]: data
            }));
        });

        // Connect
        wsClient.current.connect();

        // Cleanup on unmount
        return () => {
            wsClient.current.disconnect();
        };
    }, [url]);

    const subscribeToAsset = useCallback((symbol) => {
        if (wsClient.current && !subscriptions.current.has(`asset:${symbol}`)) {
            wsClient.current.subscribeToAsset(symbol);
            subscriptions.current.add(`asset:${symbol}`);
        }
    }, []);

    const subscribeToPortfolio = useCallback((portfolioId) => {
        if (wsClient.current && !subscriptions.current.has(`portfolio:${portfolioId}`)) {
            wsClient.current.subscribeToPortfolio(portfolioId);
            subscriptions.current.add(`portfolio:${portfolioId}`);
        }
    }, []);

    const unsubscribeFromAsset = useCallback((symbol) => {
        if (wsClient.current && subscriptions.current.has(`asset:${symbol}`)) {
            wsClient.current.unsubscribeFromAsset(symbol);
            subscriptions.current.delete(`asset:${symbol}`);
        }
    }, []);

    return {
        isConnected,
        lastMessage,
        connectionError,
        assetPrices,
        portfolioValues,
        subscribeToAsset,
        subscribeToPortfolio,
        unsubscribeFromAsset,
        sendMessage: wsClient.current?.sendMessage.bind(wsClient.current)
    };
}

// Portfolio component using the hook
function PortfolioTracker({ portfolioId, watchlist = [] }) {
    const {
        isConnected,
        assetPrices,
        portfolioValues,
        subscribeToAsset,
        subscribeToPortfolio
    } = usePersonalFinanceWebSocket('ws://localhost:8000/ws/realtime/');

    useEffect(() => {
        if (isConnected) {
            // Subscribe to portfolio updates
            subscribeToPortfolio(portfolioId);

            // Subscribe to watchlist assets
            watchlist.forEach(symbol => {
                subscribeToAsset(symbol);
            });
        }
    }, [isConnected, portfolioId, watchlist, subscribeToAsset, subscribeToPortfolio]);

    const portfolioData = portfolioValues[portfolioId];

    return (
        <div className=\"portfolio-tracker\">
            <div className=\"connection-status\">
                Status: {isConnected ?
                    <span className=\"connected\">Connected</span> :
                    <span className=\"disconnected\">Disconnected</span>
                }
            </div>

            {portfolioData && (
                <div className=\"portfolio-summary\">
                    <h2>{portfolioData.name}</h2>
                    <div className=\"portfolio-value\">
                        ${portfolioData.total_value.toFixed(2)}
                    </div>
                    <div className={`portfolio-change ${portfolioData.daily_change >= 0 ? 'positive' : 'negative'}`}>
                        {portfolioData.daily_change >= 0 ? '+' : ''}
                        ${portfolioData.daily_change.toFixed(2)}
                        ({portfolioData.daily_change_percent.toFixed(2)}%)
                    </div>
                </div>
            )}

            <div className=\"watchlist\">
                <h3>Watchlist</h3>
                {watchlist.map(symbol => {
                    const priceData = assetPrices[symbol];
                    return (
                        <div key={symbol} className=\"watchlist-item\">
                            <span className=\"symbol\">{symbol}</span>
                            {priceData ? (
                                <>
                                    <span className=\"price\">${priceData.price.toFixed(2)}</span>
                                    <span className={`change ${priceData.change >= 0 ? 'positive' : 'negative'}`}>
                                        {priceData.change >= 0 ? '+' : ''}{priceData.change.toFixed(2)}
                                        ({priceData.change_percent.toFixed(2)}%)
                                    </span>
                                </>
                            ) : (
                                <span className=\"loading\">Loading...</span>
                            )}
                        </div>
                    );
                })}
            </div>
        </div>
    );
}

REST API Integration

WebSocket Status Endpoints

Get WebSocket Connection Information

Get Real-time Service Status

Price Update Endpoints

Force Price Update

Get Price History

Management Commands

Start Price Feed Service

Start the real-time price feed service:

# Basic usage - start with default settings
python manage.py start_price_feed

# Custom configuration
python manage.py start_price_feed \\
    --interval 15 \\           # Update every 15 seconds
    --batch-size 100 \\        # Process 100 assets per batch
    --max-workers 4 \\         # Use 4 parallel workers
    --verbose                  # Enable detailed logging

# Production deployment with background execution
nohup python manage.py start_price_feed \\
    --interval 30 \\
    --batch-size 50 \\
    > /var/log/price_feed.log 2>&1 &

Command Options:

Update Asset Prices (Manual)

Perform one-time price updates:

# Update all tracked assets
python manage.py update_asset_prices

# Update specific symbols
python manage.py update_asset_prices --symbols AAPL MSFT GOOGL AMZN TSLA

# Update with historical data refresh
python manage.py update_asset_prices --historical --days 7

# Update and broadcast to WebSocket subscribers
python manage.py update_asset_prices --broadcast

# Dry run to test without making changes
python manage.py update_asset_prices --dry-run --verbose

Monitor WebSocket Connections

Monitor active WebSocket connections:

# Show connection statistics
python manage.py monitor_websockets

# Show detailed connection information
python manage.py monitor_websockets --detailed

# Monitor specific user
python manage.py monitor_websockets --user admin

# Export connection statistics
python manage.py monitor_websockets --export /tmp/websocket_stats.json

Configuration

Django Settings

Add real-time configuration to your Django settings:

# config/settings/base.py

# Real-time WebSocket Settings
REALTIME_UPDATE_INTERVAL = env.int('REALTIME_UPDATE_INTERVAL', 30)  # seconds
REALTIME_BATCH_SIZE = env.int('REALTIME_BATCH_SIZE', 50)           # assets per batch
REALTIME_CACHE_TIMEOUT = env.int('REALTIME_CACHE_TIMEOUT', 300)    # 5 minutes
REALTIME_MAX_WORKERS = env.int('REALTIME_MAX_WORKERS', 2)          # parallel workers

# WebSocket Settings
WEBSOCKET_TIMEOUT = env.int('WEBSOCKET_TIMEOUT', 300)              # 5 minutes
WEBSOCKET_MAX_CONNECTIONS_PER_USER = env.int('WEBSOCKET_MAX_CONNECTIONS_PER_USER', 5)
WEBSOCKET_MAX_SUBSCRIPTIONS_PER_CONNECTION = env.int('WEBSOCKET_MAX_SUBSCRIPTIONS_PER_CONNECTION', 50)
WEBSOCKET_MESSAGE_SIZE_LIMIT = env.int('WEBSOCKET_MESSAGE_SIZE_LIMIT', 1024)  # bytes
WEBSOCKET_RATE_LIMIT_MESSAGES_PER_MINUTE = env.int('WEBSOCKET_RATE_LIMIT_MESSAGES_PER_MINUTE', 60)

# Market Data Sources
MARKET_DATA_SOURCE_PRIORITIES = {
    'yahoo_finance': 1,      # Primary source
    'alpha_vantage': 2,      # Fallback source
    'iex_cloud': 3,          # Secondary fallback
}

# Redis Configuration for WebSocket (if using Redis)
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            'hosts': [env('REDIS_URL', 'redis://localhost:6379/3')],
            'capacity': 1500,
            'expiry': 60,
        },
    },
}

Environment Variables

# .env file for real-time configuration

# Price Update Settings
REALTIME_UPDATE_INTERVAL=30
REALTIME_BATCH_SIZE=50
REALTIME_CACHE_TIMEOUT=300
REALTIME_MAX_WORKERS=2

# WebSocket Configuration
WEBSOCKET_TIMEOUT=300
WEBSOCKET_MAX_CONNECTIONS_PER_USER=5
WEBSOCKET_MAX_SUBSCRIPTIONS_PER_CONNECTION=50
WEBSOCKET_MESSAGE_SIZE_LIMIT=1024
WEBSOCKET_RATE_LIMIT_MESSAGES_PER_MINUTE=60

# Market Data API Keys
YAHOO_FINANCE_API_KEY=your-yahoo-finance-api-key
ALPHA_VANTAGE_API_KEY=your-alpha-vantage-api-key
IEX_CLOUD_API_KEY=your-iex-cloud-api-key

# Redis for WebSocket channels (optional)
REDIS_WEBSOCKET_URL=redis://localhost:6379/3

Production Deployment

Docker Configuration

Configure Docker for WebSocket support:

# Dockerfile - Add WebSocket support
FROM python:3.11-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \\
    build-essential \\
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Expose ports
EXPOSE 8000

# Command for WebSocket-enabled server
CMD [\"daphne\", \"-b\", \"0.0.0.0\", \"-p\", \"8000\", \"config.asgi:application\"]
# docker-compose.yml - Real-time services
version: '3.8'

services:
  web:
    build: .
    ports:
      - \"8000:8000\"
    environment:
      - DJANGO_SETTINGS_MODULE=config.settings.production
      - REALTIME_UPDATE_INTERVAL=30
      - WEBSOCKET_TIMEOUT=300
    depends_on:
      - redis
      - db

  redis:
    image: redis:7-alpine
    ports:
      - \"6379:6379\"
    volumes:
      - redis_data:/data

  price-feed:
    build: .
    command: python manage.py start_price_feed --interval 30 --batch-size 50
    environment:
      - DJANGO_SETTINGS_MODULE=config.settings.production
    depends_on:
      - redis
      - db

volumes:
  redis_data:

Nginx Configuration

Configure Nginx for WebSocket proxying:

# nginx.conf - WebSocket proxy configuration
upstream django_app {
    server web:8000;
}

server {
    listen 80;
    server_name yourfinance.com;

    # Regular HTTP traffic
    location / {
        proxy_pass http://django_app;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    # WebSocket traffic
    location /ws/ {
        proxy_pass http://django_app;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection \"upgrade\";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # WebSocket specific settings
        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
        proxy_connect_timeout 60s;
    }
}

Systemd Service Configuration

Configure systemd service for price feed:

# /etc/systemd/system/personal-finance-price-feed.service
[Unit]
Description=Personal Finance Price Feed Service
After=network.target redis.service postgresql.service
Requires=redis.service postgresql.service

[Service]
Type=simple
User=finance
Group=finance
WorkingDirectory=/opt/personal-finance
Environment=DJANGO_SETTINGS_MODULE=config.settings.production
Environment=PATH=/opt/personal-finance/venv/bin:/usr/bin:/bin
ExecStart=/opt/personal-finance/venv/bin/python manage.py start_price_feed --interval 30 --batch-size 50
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
SyslogIdentifier=price-feed

[Install]
WantedBy=multi-user.target
# Enable and start the service
sudo systemctl daemon-reload
sudo systemctl enable personal-finance-price-feed
sudo systemctl start personal-finance-price-feed

# Check service status
sudo systemctl status personal-finance-price-feed

# View logs
sudo journalctl -u personal-finance-price-feed -f

Monitoring and Logging

Health Check Endpoints

Implement comprehensive health checks:

# realtime/views.py
from django.http import JsonResponse
from django.utils import timezone
from .services import PriceFeedService
from .connection_manager import ConnectionManager

def health_check(request):
    \"\"\"Comprehensive health check for real-time services.\"\"\"
    price_service = PriceFeedService()
    connection_manager = ConnectionManager()

    # Check service status
    price_feed_status = price_service.get_service_status()

    # Check WebSocket connections
    connection_stats = {
        'total_connections': connection_manager.get_connection_count(),
        'authenticated_users': connection_manager.get_authenticated_user_count(),
        'asset_subscriptions': connection_manager.get_total_asset_subscriptions(),
        'portfolio_subscriptions': connection_manager.get_total_portfolio_subscriptions()
    }

    # Check data freshness
    from .models import AssetPrice
    latest_price_update = AssetPrice.objects.order_by('-last_updated').first()

    data_freshness = None
    if latest_price_update:
        time_since_update = (timezone.now() - latest_price_update.last_updated).total_seconds()
        data_freshness = {
            'last_update': latest_price_update.last_updated.isoformat(),
            'seconds_since_update': int(time_since_update),
            'is_stale': time_since_update > 300  # 5 minutes
        }

    return JsonResponse({
        'status': 'healthy' if price_feed_status['running'] else 'degraded',
        'timestamp': timezone.now().isoformat(),
        'services': {
            'price_feed': price_feed_status,
            'websocket_connections': connection_stats,
            'data_freshness': data_freshness
        }
    })

Logging Configuration

Configure detailed logging for real-time services:

# config/settings/production.py
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'verbose': {
            'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
            'style': '{',
        },
        'json': {
            '()': 'pythonjsonlogger.jsonlogger.JsonFormatter',
            'format': '%(levelname)s %(asctime)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'realtime_file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/personal-finance/realtime.log',
            'maxBytes': 10485760,  # 10MB
            'backupCount': 5,
            'formatter': 'json',
        },
        'websocket_file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/personal-finance/websocket.log',
            'maxBytes': 10485760,
            'backupCount': 3,
            'formatter': 'json',
        },
        'price_feed_file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/personal-finance/price_feed.log',
            'maxBytes': 10485760,
            'backupCount': 3,
            'formatter': 'json',
        }
    },
    'loggers': {
        'personal_finance.realtime': {
            'handlers': ['realtime_file'],
            'level': 'INFO',
            'propagate': False,
        },
        'personal_finance.realtime.consumers': {
            'handlers': ['websocket_file'],
            'level': 'INFO',
            'propagate': False,
        },
        'personal_finance.realtime.services': {
            'handlers': ['price_feed_file'],
            'level': 'INFO',
            'propagate': False,
        }
    }
}

Performance Monitoring

Implement performance monitoring and metrics:

# realtime/monitoring.py
import time
import logging
from django.core.cache import cache
from django.utils import timezone

class PerformanceMonitor:
    \"\"\"Monitor real-time service performance.\"\"\"

    def __init__(self):
        self.logger = logging.getLogger('personal_finance.realtime.performance')

    def record_price_update_latency(self, symbol, latency_ms):
        \"\"\"Record price update latency for monitoring.\"\"\"
        cache_key = f'price_update_latency:{symbol}'
        latencies = cache.get(cache_key, [])
        latencies.append(latency_ms)

        # Keep only last 100 measurements
        if len(latencies) > 100:
            latencies = latencies[-100:]

        cache.set(cache_key, latencies, 3600)  # 1 hour

        # Log if latency is high
        if latency_ms > 1000:  # > 1 second
            self.logger.warning(f'High price update latency: {symbol} took {latency_ms}ms')

    def record_websocket_message_latency(self, message_type, latency_ms):
        \"\"\"Record WebSocket message processing latency.\"\"\"
        cache_key = f'websocket_latency:{message_type}'
        latencies = cache.get(cache_key, [])
        latencies.append(latency_ms)

        if len(latencies) > 1000:
            latencies = latencies[-1000:]

        cache.set(cache_key, latencies, 3600)

    def get_performance_metrics(self):
        \"\"\"Get current performance metrics.\"\"\"
        metrics = {}

        # Price update metrics
        price_symbols = ['AAPL', 'GOOGL', 'MSFT']  # Sample symbols
        for symbol in price_symbols:
            cache_key = f'price_update_latency:{symbol}'
            latencies = cache.get(cache_key, [])

            if latencies:
                metrics[f'price_update_avg_latency_{symbol}'] = sum(latencies) / len(latencies)
                metrics[f'price_update_max_latency_{symbol}'] = max(latencies)

        # WebSocket metrics
        message_types = ['asset_update', 'portfolio_update']
        for msg_type in message_types:
            cache_key = f'websocket_latency:{msg_type}'
            latencies = cache.get(cache_key, [])

            if latencies:
                metrics[f'websocket_avg_latency_{msg_type}'] = sum(latencies) / len(latencies)

        return metrics

Troubleshooting

Common Issues and Solutions

WebSocket Connection Failures

# Debug WebSocket connection issues
def debug_websocket_connection(request):
    \"\"\"Debug helper for WebSocket connection issues.\"\"\"

    checks = {
        'user_authenticated': request.user.is_authenticated,
        'websocket_endpoint_accessible': True,  # Test this programmatically
        'redis_connection': check_redis_connection(),
        'price_feed_running': PriceFeedService().get_service_status()['running']
    }

    return JsonResponse({
        'debug_info': checks,
        'recommendations': generate_connection_recommendations(checks)
    })

def check_redis_connection():
    \"\"\"Check Redis connection for WebSocket channels.\"\"\"
    try:
        from django.core.cache import cache
        cache.set('test_key', 'test_value', 10)
        return cache.get('test_key') == 'test_value'
    except:
        return False

No Price Updates

# Debug price update issues

# Check service status
python manage.py monitor_websockets --detailed

# Test specific asset update
python manage.py update_asset_prices --symbols AAPL --dry-run --verbose

# Check API connectivity
curl -H \"Authorization: Bearer $API_KEY\" https://api.finance.yahoo.com/v1/quote/AAPL

High Memory Usage

# Monitor memory usage
import psutil

def monitor_memory_usage():
    \"\"\"Monitor real-time service memory usage.\"\"\"
    process = psutil.Process()
    memory_info = process.memory_info()

    print(f\"RSS Memory: {memory_info.rss / 1024 / 1024:.1f} MB\")
    print(f\"VMS Memory: {memory_info.vms / 1024 / 1024:.1f} MB\")

    # Check connection counts
    connection_manager = ConnectionManager()
    print(f\"Active connections: {connection_manager.get_connection_count()}\")

Performance Issues

# Optimize price update performance
class OptimizedPriceFeedService:
    def __init__(self):
        self.connection_pool = {}  # Reuse HTTP connections
        self.cache_timeout = 30    # Cache results briefly

    def batch_update_with_caching(self, symbols):
        \"\"\"Update prices with intelligent caching.\"\"\"
        cached_symbols = []
        fresh_symbols = []

        for symbol in symbols:
            cache_key = f'price_cache:{symbol}'
            if cache.get(cache_key):
                cached_symbols.append(symbol)
            else:
                fresh_symbols.append(symbol)

        # Only fetch uncached symbols
        if fresh_symbols:
            results = self.fetch_prices(fresh_symbols)

            # Cache results
            for symbol, price_data in results.items():
                cache_key = f'price_cache:{symbol}'
                cache.set(cache_key, price_data, self.cache_timeout)

        print(f\"Used cache for {len(cached_symbols)} symbols, fetched {len(fresh_symbols)}\")

Best Practices

For Developers

  1. Handle Connection Failures Gracefully

    // Implement exponential backoff for reconnection
    class RobustWebSocketClient {
        constructor(options) {
            this.maxRetries = options.maxRetries || 10;
            this.retryDelay = options.retryDelay || 1000;
            this.retryMultiplier = options.retryMultiplier || 2;
        }
    
        scheduleReconnect() {
            const delay = Math.min(
                this.retryDelay * Math.pow(this.retryMultiplier, this.reconnectAttempts),
                30000  // Max 30 second delay
            );
    
            setTimeout(() => this.connect(), delay);
        }
    }
    
  2. Optimize Message Handling

    # Use async processing for message handling
    import asyncio
    
    async def handle_price_update(self, message_data):
        \"\"\"Handle price updates asynchronously.\"\"\"
        tasks = []
    
        for subscriber in self.subscribers:
            task = asyncio.create_task(
                subscriber.send_message(message_data)
            )
            tasks.append(task)
    
        await asyncio.gather(*tasks, return_exceptions=True)
    
  3. Implement Proper Rate Limiting

    from django.core.cache import cache
    
    def rate_limit_websocket_messages(user, message_type):
        \"\"\"Rate limit WebSocket messages per user.\"\"\"
        cache_key = f'websocket_rate_limit:{user.id}:{message_type}'
    
        current_count = cache.get(cache_key, 0)
        if current_count >= 60:  # 60 messages per minute
            return False
    
        cache.set(cache_key, current_count + 1, 60)
        return True
    

For System Administrators

  1. Monitor Resource Usage

    # Monitor WebSocket connections and memory usage
    watch -n 5 'ps aux | grep "start_price_feed\\|daphne" | grep -v grep'
    
    # Monitor Redis memory usage
    redis-cli info memory
    
  2. Set Up Alerts

    # Set up alerts for service degradation
    def check_service_health():
        status = get_realtime_service_status()
    
        if status['price_feed_down_time'] > 300:  # 5 minutes
            send_alert('Price feed has been down for 5+ minutes')
    
        if status['connection_count'] > 1000:
            send_alert('High WebSocket connection count detected')
    
  3. Regular Maintenance

    # Clean up old logs
    find /var/log/personal-finance/ -name \"*.log\" -mtime +7 -delete
    
    # Restart services weekly
    sudo systemctl restart personal-finance-price-feed
    

Security Considerations

  1. Authentication and Authorization

    # Ensure proper user authentication
    class SecureRealtimeConsumer(AsyncWebsocketConsumer):
        async def connect(self):
            user = self.scope.get('user')
    
            if not user or not user.is_authenticated:
                await self.close(code=4401)  # Unauthorized
                return
    
            # Check user permissions
            if not user.has_perm('realtime.access_websocket'):
                await self.close(code=4403)  # Forbidden
                return
    
  2. Data Validation

    # Validate all incoming messages
    def validate_websocket_message(message):
        \"\"\"Validate WebSocket message structure and content.\"\"\"
        if not isinstance(message, dict):
            raise ValueError(\"Message must be a dictionary\")
    
        if 'type' not in message:
            raise ValueError(\"Message must have a 'type' field\")
    
        if message['type'] not in ALLOWED_MESSAGE_TYPES:
            raise ValueError(f\"Invalid message type: {message['type']}\")
    
        # Validate message-specific data
        if message['type'] == 'subscribe_asset':
            symbol = message.get('data', {}).get('symbol')
            if not symbol or not validate_asset_symbol(symbol):
                raise ValueError(\"Invalid asset symbol\")
    
  3. Rate Limiting and DoS Protection

    # Implement comprehensive rate limiting
    class RateLimitedWebSocketConsumer(AsyncWebsocketConsumer):
        async def receive(self, text_data):
            # Check rate limits
            if not self.check_rate_limits():
                await self.close(code=4429)  # Too Many Requests
                return
    
            # Check message size
            if len(text_data) > settings.WEBSOCKET_MESSAGE_SIZE_LIMIT:
                await self.close(code=4413)  # Request Entity Too Large
                return
    
            await super().receive(text_data)
    

See Also

  • REST API Endpoints Reference - REST API integration for real-time services

  • ../modules/portfolio - Portfolio integration for real-time tracking

  • Django Settings Reference - Real-time configuration settings

  • ../deployment/production - Production deployment for real-time services