Skip to main content

Backend Architecture

Core Technology Stack

  • Framework: FastAPI (Python)
  • Database: PostgreSQL with SQLModel (ORM)
  • Message Broker: MQTT (Mosquitto)
  • Async: APScheduler for scheduled tasks
  • Real-time: WebSocket support
  • Authentication: JWT tokens

Service Components

MQTTService

  • Manages paho-mqtt client connection
  • Subscribes to topics: users/+/feeds/+, users/+/devices/+/feeds/+, users/+/devices/+/status
  • Queues messages for async processing
  • Queue Strategy: Separate queues for feed data (message_queue) and device status (device_status_queue)

SchedulerService

  • APScheduler (AsyncIOScheduler)
  • Jobs:
    • Expire Pending Orders (5 min interval)
    • Process MQTT Messages (10s, batch insert FeedValues)
    • Process Device Status (500ms, high-priority live updates)
    • Media Schedule Trigger (5s, automation execution)

DeviceRealtimeHub

  • Tracks active WebSocket connections per user
  • Maintains event loop reference for thread-safe broadcasting
  • Broadcasts device updates to connected clients
  • Handles dead connection cleanup

DeviceStatusProcessor

  • Parses MQTT device status payloads
  • Updates Device model (status, temp, humidity, location, last_seen)
  • Triggers WebSocket broadcasts
  • Batch processing for efficiency