87 lines
2.4 KiB
Python
87 lines
2.4 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
import structlog
|
|
|
|
from app.communication.mqtt.client import mqtt
|
|
from app.models.mongodb.device_log import DeviceLog
|
|
from app.models.mongodb.telemetry import TelemetryData
|
|
|
|
logger = structlog.get_logger("mqtt")
|
|
|
|
|
|
def _extract_device_uid(topic: str) -> str:
|
|
parts = topic.split("/")
|
|
return parts[1] if len(parts) >= 3 else "unknown"
|
|
|
|
|
|
@mqtt.on_message()
|
|
async def on_message(client, topic: str, payload: bytes, qos: int, properties) -> None: # type: ignore[no-untyped-def]
|
|
device_uid = _extract_device_uid(topic)
|
|
|
|
try:
|
|
data = json.loads(payload.decode())
|
|
except (json.JSONDecodeError, UnicodeDecodeError):
|
|
logger.warning("invalid_mqtt_payload", topic=topic)
|
|
return
|
|
|
|
if "/telemetry" in topic:
|
|
await _handle_telemetry(device_uid, data)
|
|
elif "/status" in topic:
|
|
await _handle_status(device_uid, data)
|
|
elif "/log" in topic:
|
|
await _handle_log(device_uid, data)
|
|
elif "/response" in topic:
|
|
await _handle_response(device_uid, data)
|
|
|
|
|
|
async def _handle_telemetry(device_uid: str, data: dict) -> None:
|
|
telemetry = TelemetryData(device_id=device_uid, metrics=data)
|
|
await telemetry.insert()
|
|
|
|
# Broadcast via Socket.IO
|
|
from app.communication.socketio.server import sio
|
|
|
|
await sio.emit(
|
|
"telemetry",
|
|
{"device_uid": device_uid, "data": data},
|
|
namespace="/monitoring",
|
|
)
|
|
logger.debug("telemetry_saved", device_uid=device_uid)
|
|
|
|
|
|
async def _handle_status(device_uid: str, data: dict) -> None:
|
|
log = DeviceLog(device_id=device_uid, event_type="status_change", payload=data)
|
|
await log.insert()
|
|
|
|
from app.communication.socketio.server import sio
|
|
|
|
await sio.emit(
|
|
"device_status",
|
|
{"device_uid": device_uid, "status": data},
|
|
namespace="/device",
|
|
)
|
|
logger.debug("status_update", device_uid=device_uid)
|
|
|
|
|
|
async def _handle_log(device_uid: str, data: dict) -> None:
|
|
log = DeviceLog(
|
|
device_id=device_uid,
|
|
event_type=data.get("event_type", "log"),
|
|
payload=data,
|
|
)
|
|
await log.insert()
|
|
logger.debug("device_log_saved", device_uid=device_uid)
|
|
|
|
|
|
async def _handle_response(device_uid: str, data: dict) -> None:
|
|
from app.communication.socketio.server import sio
|
|
|
|
await sio.emit(
|
|
"device_response",
|
|
{"device_uid": device_uid, "data": data},
|
|
namespace="/device",
|
|
)
|
|
logger.debug("device_response", device_uid=device_uid)
|