Real-time Updates and WebSocket Broadcasting
DeviceRealtimeHub
Manages real-time WebSocket connections and broadcasts device updates to connected clients.
Connection Management
- Per User: Tracks connected WebSocket clients per user
- Set Storage: Uses set to manage multiple connections per user
- Disconnect Handling: Automatically removes disconnected clients
Event Loop Thread-Safety
- Maintains reference to main event loop
- Allows background threads (MQTT) to safely schedule broadcasts
- Uses
asyncio.run_coroutine_threadsafe()for thread-safe coroutine scheduling
Broadcast Methods
broadcast_to_user() - Async method for within-loop calls
- Sends JSON payload to all connected WebSocket clients for user
- Removes dead connections (send failures)
- Handles exceptions gracefully
broadcast_to_user_threadsafe() - Thread-safe method for background threads
- Schedules broadcast coroutine in main event loop
- Safe to call from MQTT background thread
- Logs exceptions without crashing
DeviceStatusProcessor
Processes incoming device status messages and updates database and WebSocket clients.
Processing Steps
-
Parse Message
- Extract from topic: user_identifier, device_code
- Parse JSON payload
- Fallback to payload.device_code if not in topic
-
Lookup Device
- Query database by device_code
- Load relationships (groups)
- Return if not found
-
Update Device Fields
- Status (online/offline/warning/error)
- Temperature (with null check)
- Humidity (with null check)
- Latitude/Longitude (GPS coordinates)
- Last_seen (updated to current time)
-
Persist to Database
- Commit changes to PostgreSQL
- Update updated_at timestamp
-
Broadcast to Frontend
- Serialize device as
_serialize_device_for_ws() - Call
broadcast_to_user_threadsafe(user_id, message) - WebSocket clients receive immediate update
- Serialize device as
Serialization Format
{
"id": device_id,
"key": device_key,
"device_code": device_code,
"product_type": product_type,
"name": name,
"status": current_status,
"last_seen": last_seen_iso,
"is_active": is_active,
"device_type": device_type,
"firmware_version": firmware_version,
"temperature": temperature,
"humidity": humidity,
"latitude": latitude,
"longitude": longitude,
"groups": [{"id": group_id, "name": group_name}]
}