from __future__ import annotations import structlog from app.tasks.celery_app import celery_app logger = structlog.get_logger("tasks.analytics") @celery_app.task(name="app.tasks.analytics_tasks.run_daily_analytics") def run_daily_analytics() -> dict: """Run daily aggregation analytics on telemetry data.""" import asyncio from datetime import datetime, timedelta from app.models.mongodb.analytics_result import AnalyticsResult from app.models.mongodb.telemetry import TelemetryData async def _run() -> dict: yesterday = datetime.utcnow().replace(hour=0, minute=0, second=0) - timedelta(days=1) today = yesterday + timedelta(days=1) pipeline = [ {"$match": {"timestamp": {"$gte": yesterday, "$lt": today}}}, {"$group": { "_id": "$device_id", "count": {"$sum": 1}, "avg_metrics": {"$avg": "$metrics.value"}, }}, ] collection = TelemetryData.get_motor_collection() results = await collection.aggregate(pipeline).to_list(length=1000) for r in results: await AnalyticsResult( analysis_type="daily_telemetry", device_id=r["_id"], parameters={"date": yesterday.isoformat()}, result={"count": r["count"], "avg_value": r.get("avg_metrics")}, period_start=yesterday, period_end=today, ).insert() return {"devices_analyzed": len(results)} result = asyncio.get_event_loop().run_until_complete(_run()) logger.info("daily_analytics_done", **result) return result @celery_app.task(name="app.tasks.analytics_tasks.run_device_analysis") def run_device_analysis(device_id: str, analysis_type: str, params: dict) -> dict: """Run on-demand analysis for a specific device.""" logger.info("device_analysis_started", device_id=device_id, type=analysis_type) return {"status": "completed", "device_id": device_id, "type": analysis_type}