Core Technology Stack
- Framework: FastAPI (Python)
- Database: PostgreSQL with SQLModel (ORM)
- Message Broker: MQTT (Mosquitto)
- Async: APScheduler for scheduled tasks
- Real-time: WebSocket support
- Authentication: JWT tokens
Service Components
MQTTService
- Manages paho-mqtt client connection
- Subscribes to topics:
users/+/feeds/+, users/+/devices/+/feeds/+, users/+/devices/+/status
- Queues messages for async processing
- Queue Strategy: Separate queues for feed data (message_queue) and device status (device_status_queue)
SchedulerService
- APScheduler (AsyncIOScheduler)
- Jobs:
- Expire Pending Orders (5 min interval)
- Process MQTT Messages (10s, batch insert FeedValues)
- Process Device Status (500ms, high-priority live updates)
- Media Schedule Trigger (5s, automation execution)
DeviceRealtimeHub
- Tracks active WebSocket connections per user
- Maintains event loop reference for thread-safe broadcasting
- Broadcasts device updates to connected clients
- Handles dead connection cleanup
DeviceStatusProcessor
- Parses MQTT device status payloads
- Updates Device model (status, temp, humidity, location, last_seen)
- Triggers WebSocket broadcasts
- Batch processing for efficiency