Files
python-api/app/tasks/device_tasks.py
2026-03-01 07:44:19 +09:00

65 lines
2.2 KiB
Python

from __future__ import annotations
import structlog
from app.tasks.celery_app import celery_app
logger = structlog.get_logger("tasks.device")
@celery_app.task(name="app.tasks.device_tasks.check_device_health")
def check_device_health() -> dict:
"""Check all devices for heartbeat timeout and mark offline."""
import asyncio
from datetime import datetime, timedelta
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
from app.core.constants import DeviceStatus
from app.models.mariadb.device import Device
async def _check() -> int:
engine = create_async_engine(settings.MARIADB_DSN)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
threshold = datetime.utcnow() - timedelta(minutes=10)
async with async_session() as session:
stmt = (
update(Device)
.where(
Device.status == DeviceStatus.ONLINE,
Device.last_seen_at < threshold,
Device.is_deleted == False, # noqa: E712
)
.values(status=DeviceStatus.OFFLINE)
)
result = await session.execute(stmt)
await session.commit()
count = result.rowcount # type: ignore[assignment]
await engine.dispose()
return count
count = asyncio.get_event_loop().run_until_complete(_check())
logger.info("device_health_check", offline_count=count)
return {"marked_offline": count}
@celery_app.task(name="app.tasks.device_tasks.batch_firmware_update")
def batch_firmware_update(device_uids: list[str], firmware_url: str) -> dict:
"""Trigger OTA firmware update for a batch of devices."""
import asyncio
from app.communication.mqtt.publisher import publish_ota
async def _update() -> int:
for uid in device_uids:
await publish_ota(uid, {"url": firmware_url, "action": "update"})
return len(device_uids)
count = asyncio.get_event_loop().run_until_complete(_update())
logger.info("batch_firmware_update", count=count)
return {"updated": count}