Skip to main content

MQTT Message Processing

Overview

The MQTT message processing system handles incoming sensor data and device status messages from IoT devices, queues them for batch processing, and persists data to the database.

MQTTService

Manages paho-mqtt client connection and message routing:

  • Connection: Connects to MQTT broker with configured host, port, credentials
  • Subscriptions: Subscribes to feed topics and device status topics
  • Callbacks: Handles on_connect, on_message, on_disconnect events
  • Queue Strategy: Separate thread-safe queues for different message types
    • message_queue: Feed data messages (maxsize: 100000)
    • device_status_queue: Device status messages (maxsize: 100000)

Key Implementation Detail

Uses queue.Queue (thread-safe standard library) instead of asyncio.Queue because MQTT callbacks run in a background thread via client.loop_start(). This prevents data corruption and race conditions.

Message Processing Flow

on_message Callback

  1. Receives MQTT message from broker
  2. Parses topic to extract: user_id, device_code, feed_key, message type
  3. Creates message data object with: topic, parsed info, payload, QoS, timestamp
  4. Routes to appropriate queue:
    • Device status messages → device_status_queue (priority)
    • Feed data messages → message_queue (standard)
  5. Non-blocking queue insertion using put_nowait()
  6. Drops message if queue is full (logging warning)

Batch Processing (10s Interval)

MQTTMessageProcessor.process_batch()

  1. Pull all messages from message_queue in batch
  2. For each message:
    • Parse message data
    • Extract user_id and feed_key
    • Validate user_id format (UUID)
    • Lookup Feed by user_id + feed_key in database
    • Skip if feed not found or not active
    • Parse payload value
    • Validate threshold if configured
    • Create FeedValue record
  3. Batch insert all FeedValues
  4. Update Feed.last_value and Feed.updated_at
  5. Commit single database transaction

Result: Efficient batch processing reduces database load and transaction overhead.