MQTT Message Processing
Overview
The MQTT message processing system handles incoming sensor data and device status messages from IoT devices, queues them for batch processing, and persists data to the database.
MQTTService
Manages paho-mqtt client connection and message routing:
- Connection: Connects to MQTT broker with configured host, port, credentials
- Subscriptions: Subscribes to feed topics and device status topics
- Callbacks: Handles on_connect, on_message, on_disconnect events
- Queue Strategy: Separate thread-safe queues for different message types
message_queue: Feed data messages (maxsize: 100000)device_status_queue: Device status messages (maxsize: 100000)
Key Implementation Detail
Uses queue.Queue (thread-safe standard library) instead of asyncio.Queue because MQTT callbacks run in a background thread via client.loop_start(). This prevents data corruption and race conditions.
Message Processing Flow
on_message Callback
- Receives MQTT message from broker
- Parses topic to extract: user_id, device_code, feed_key, message type
- Creates message data object with: topic, parsed info, payload, QoS, timestamp
- Routes to appropriate queue:
- Device status messages →
device_status_queue(priority) - Feed data messages →
message_queue(standard)
- Device status messages →
- Non-blocking queue insertion using
put_nowait() - Drops message if queue is full (logging warning)
Batch Processing (10s Interval)
MQTTMessageProcessor.process_batch()
- Pull all messages from
message_queuein batch - For each message:
- Parse message data
- Extract user_id and feed_key
- Validate user_id format (UUID)
- Lookup Feed by user_id + feed_key in database
- Skip if feed not found or not active
- Parse payload value
- Validate threshold if configured
- Create FeedValue record
- Batch insert all FeedValues
- Update Feed.last_value and Feed.updated_at
- Commit single database transaction
Result: Efficient batch processing reduces database load and transaction overhead.