Skip to main content

Real-time Updates and WebSocket Broadcasting

DeviceRealtimeHub

Manages real-time WebSocket connections and broadcasts device updates to connected clients.

Connection Management

  • Per User: Tracks connected WebSocket clients per user
  • Set Storage: Uses set to manage multiple connections per user
  • Disconnect Handling: Automatically removes disconnected clients

Event Loop Thread-Safety

  • Maintains reference to main event loop
  • Allows background threads (MQTT) to safely schedule broadcasts
  • Uses asyncio.run_coroutine_threadsafe() for thread-safe coroutine scheduling

Broadcast Methods

broadcast_to_user() - Async method for within-loop calls

  • Sends JSON payload to all connected WebSocket clients for user
  • Removes dead connections (send failures)
  • Handles exceptions gracefully

broadcast_to_user_threadsafe() - Thread-safe method for background threads

  • Schedules broadcast coroutine in main event loop
  • Safe to call from MQTT background thread
  • Logs exceptions without crashing

DeviceStatusProcessor

Processes incoming device status messages and updates database and WebSocket clients.

Processing Steps

  1. Parse Message

    • Extract from topic: user_identifier, device_code
    • Parse JSON payload
    • Fallback to payload.device_code if not in topic
  2. Lookup Device

    • Query database by device_code
    • Load relationships (groups)
    • Return if not found
  3. Update Device Fields

    • Status (online/offline/warning/error)
    • Temperature (with null check)
    • Humidity (with null check)
    • Latitude/Longitude (GPS coordinates)
    • Last_seen (updated to current time)
  4. Persist to Database

    • Commit changes to PostgreSQL
    • Update updated_at timestamp
  5. Broadcast to Frontend

    • Serialize device as _serialize_device_for_ws()
    • Call broadcast_to_user_threadsafe(user_id, message)
    • WebSocket clients receive immediate update

Serialization Format

{
"id": device_id,
"key": device_key,
"device_code": device_code,
"product_type": product_type,
"name": name,
"status": current_status,
"last_seen": last_seen_iso,
"is_active": is_active,
"device_type": device_type,
"firmware_version": firmware_version,
"temperature": temperature,
"humidity": humidity,
"latitude": latitude,
"longitude": longitude,
"groups": [{"id": group_id, "name": group_name}]
}