Skip to main content

Data Flows and Architecture Patterns

Main Flows

1. Device Data Ingestion

Device (IoT)
↓ (MQTT publish: users/{user_id}/devices/{device_code}/feeds/{feed_key})
MQTT Broker
↓ (on_message callback)
MQTTService
├─ Parse topic → extract user_id, feed_key
├─ Queue message (thread-safe queue.Queue)
└─ (non-blocking put_nowait())
Scheduler (10s interval)
├─ Pull batch from message_queue
├─ MQTT Message Processor:
│ ├─ Lookup Feed by user_id + feed_key
│ ├─ Validate threshold
│ └─ Create FeedValue record in DB
└─ Commit transaction

Key Detail: Uses queue.Queue (thread-safe) not asyncio.Queue because MQTT callbacks run in background thread.

2. Device Status Updates

Device Status Message (MQTT)
↓ (Topic: users/{user_id}/devices/{device_code}/status)
MQTTService.on_message()
├─ Parse topic
└─ Queue to device_status_queue
Scheduler (500ms interval, priority processing)
├─ Pull device status batch
├─ Device Status Processor:
│ ├─ Update Device: status, temperature, humidity, location
│ ├─ Update last_seen timestamp
│ └─ Broadcast via WebSocket to frontend
└─ Real-time hub publishes to connected websocket clients

3. Real-time WebSocket Broadcasting

Device Status Processor
↓ (after device update)
DeviceRealtimeHub.broadcast_to_user_threadsafe()
├─ Schedule coroutine in main event loop
└─ Send JSON to all connected WebSocket client
Frontend
└─ Receives update, renders immediately

4. Media Schedule Triggers

Scheduler (5s interval)
├─ Pull pending MediaSchedule records
├─ Check if `trigger_time <= now`
├─ Build MQTT message with ScheduleMessageBuilder
├─ Publish to device topic
├─ Update schedule status (triggered)
└─ Device receives and executes command

Data Flow: Device → API → Dashboard

Flow 1: Sensor Data Publication (⏱ ~11 seconds)

┌──────────────────────────────────────────────────────────────┐
│ 1. IoT DEVICE │
│ Sensor reading: temp=25.5°C, humidity=60% │
│ MQTT publish to: users/{user_id}/devices/{device_code} │
│ /feeds/temperature │
│ Payload: {"value": "25.5"} │
└────────────────┬─────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ 2. MQTT BROKER (Mosquitto) - immediate │
│ Receives message, broadcasts to subscribers │
└────────────────┬──────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ 3. BACKEND (MQTTService.on_message) - immediate │
│ - Parse topic → extract user_id, feed_key │
│ - Validate format │
│ - Put message in thread-safe queue (non-blocking) │
│ - Return immediately │
└────────────────┬──────────────────────────────────────────────┘

⏳ Wait 10 seconds (scheduler interval)

┌────────────────────────────────────────────────────────────────┐
│ 4. SCHEDULER JOB: process_mqtt_messages_wrapper (10s) │
│ - Pull ALL messages from queue (batch) │
│ - For each message: │
│ └─ MQTTMessageProcessor.process_batch() │
│ ├─ Find Feed by user_id + feed_key in DB │
│ ├─ Validate: feed enabled? threshold OK? │
│ ├─ Create FeedValue record │
│ ├─ Update Feed.last_value │
│ └─ Commit to PostgreSQL │
└────────────────┬──────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ 5. FRONTEND (via API polling or dashboard view) │
│ - Next.js component: FeedValuesDisplay │
│ - Call: GET /api/v1/feed-values/{feed_id} │
│ - Render latest value from database │
│ │
│ OR: Call feedValueService.getFeedValues() │
│ └─ apiClient.get('/feed-values/?feed_id=...') │
└────────────────────────────────────────────────────────────────┘

Total Delay: ~11 seconds (batch + scheduler) for DB persistence
Live Updates: Via device status WebSocket (500ms priority processing)

Flow 2: Device Status Real-time Update (⏱ ~500ms)

┌────────────────────────────────────────────────────────────────┐
│ 1. IoT DEVICE - STATUS MESSAGE │
│ Goes online, publishes status │
│ Topic: users/{user_id}/devices/{device_code}/status │
│ Payload: { │
│ "status": "online", │
│ "temperature": 25.5, │
│ "humidity": 60, │
│ "latitude": 21.0285, │
│ "longitude": 105.8542 │
│ } │
└────────────────┬─────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ 2. MQTT BROKER - IMMEDIATE │
│ Routes to backend subscriber │
└────────────────┬──────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ 3. MQTTService.on_message() - IMMEDIATE │
│ - Route to device_status_queue (priority) │
│ - NON-BLOCKING put_nowait() │
└────────────────┬──────────────────────────────────────────────┘

⏳ VERY SHORT WAIT: 500ms scheduler

┌────────────────────────────────────────────────────────────────┐
│ 4. SCHEDULER: process_device_status_messages_wrapper (500ms) │
│ - Pull all from device_status_queue (batch) │
│ - For each message: │
│ └─ DeviceStatusProcessor.process_batch() │
│ ├─ Find Device by device_code │
│ ├─ Update Device fields: │
│ │ - status = "online" │
│ │ - temperature = 25.5 │
│ │ - humidity = 60 │
│ │ - latitude/longitude (GPS) │
│ │ - last_seen = now │
│ └─ Commit to PostgreSQL │
└────────────────┬──────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ 5. BROADCAST VIA WEBSOCKET - IMMEDIATE AFTER DB │
│ - DeviceStatusProcessor calls: │
│ device_realtime_hub.broadcast_to_user_threadsafe() │
│ - Schedule coroutine in main event loop │
│ - Serialize device: _serialize_device_for_ws() │
│ - Send JSON to ALL connected websockets for this user │
└────────────────┬──────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ 6. FRONTEND - WEBSOCKET RECEIVED (500ms total) │
│ - useEffect/MQTTContext listener │
│ - Receives DeviceUpdate message: │
│ { │
│ "type": "device_update", │
│ "device": { │
│ "id": "123...", │
│ "name": "Sensor-01", │
│ "status": "online", │
│ "temperature": 25.5, │
│ "humidity": 60, │
│ "last_seen": "2024-04-04T10:30:00Z" │
│ } │
│ } │
│ - Update component state │
│ - Re-render Dashboard/DeviceMap with new status │
└────────────────────────────────────────────────────────────────┘

Total Latency: ~500ms for real-time device status updates

Flow 3: Dashboard Analytics (Flexible)

┌────────────────────────────────────────────────────────────────┐
│ FRONTEND: Dashboard/(overview)/page.tsx │
│ - Server Component (async) │
│ - Renders: │
│ 1. CardWrapper (stats cards - SWR polling) │
│ 2. MediaStatsComponent (upload metrics) │
│ 3. OrderChart (sales trend) │
│ 4. ProductChart (inventory) │
└────────────────┬──────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ BACKEND: GET /api/v1/dashboard/stats │
│ - DashboardService aggregates: │
│ ├─ Total devices (count) │
│ ├─ Online devices (status filter) │
│ ├─ Recent orders (last 24h) │
│ ├─ Feed values (time range aggregation) │
│ └─ Return JSON analytics payload │
└────────────────────────────────────────────────────────────────┘

Real-time Component: FeedChart.tsx
├─ Displays historical feed data
├─ Uses feed_values table (time-series)
├─ Renders line/chart with Chart.js or similar
└─ Updates on WebSocket feed_value messages

Authentication & Authorization Flow

┌─────────────────────────────────────────────────────────────┐
│ LOGIN FLOW │
├─────────────────────────────────────────────────────────────┤

1. Frontend: login-form.tsx
- User enters email/password
- POST /api/v1/login/access-token

2. Backend: LoginService
- Validate credentials
- Create JWT token (8 day expiry)
- Return { access_token, token_type }

3. Frontend: AuthService
- Store token in Cookie (secure, httpOnly)
- Redirect to /dashboard

4. Subsequent Requests:
- All API calls include header:
x-auth-token: <JWT_TOKEN>

5. Backend: get_current_user dependency
- Decode JWT
- Extract user_id from token.sub
- Lookup User in database
- Inject into route handler

┌─────────────────────────────────────────────────────────────┐
│ ZALO OAUTH FLOW (Alt) │
├─────────────────────────────────────────────────────────────┤

1. Frontend: ZaloLoginButton
- POST /api/v1/zalo-user-management/callback
- Pass OAuth code

2. Backend: Zalo Service
- Verify code with Zalo API
- Extract Zalo user info (id, name, picture)
- Check if user exists
- Create user if first-time
- Generate JWT (zalo_id in token.sub)

3. Same as above from step 3 onwards

Key Architectural Patterns

1. Thread-Safe Message Queuing

# ✅ Correct: queue.Queue (thread-safe for background callbacks)
self.message_queue: queue.Queue = queue.Queue(maxsize=100000)

# ❌ Wrong: asyncio.Queue (not thread-safe with background MQTT thread)
# Causes data corruption when called from on_message callback

2. Batch Processing for Efficiency

High-frequency MQTT messages → Non-blocking queue →
Scheduled batch processor (10s interval) → Single DB transaction

3. Priority Queues

Feed data (10s) ← Lower priority
Device Status (500ms) ← Higher priority (for real-time map)

4. Event Loop Thread-Safety

# Background thread (MQTT) → Safe call to async context (FastAPI)
device_realtime_hub.broadcast_to_user_threadsafe(
user_id, message
)
# Uses: asyncio.run_coroutine_threadsafe()

5. Real-time WebSocket Architecture

WebSocket /ws/devices
├─ Authentication: JWT token in query param
├─ Initial: Send device list (device_list message type)
├─ Live: Broadcast device updates (device_update message type)
└─ Auto-cleanup: Disconnect handler removes from hub