from __future__ import annotations from datetime import datetime import polars as pl from app.models.mongodb.telemetry import TelemetryData async def aggregate_telemetry( device_id: str, start: datetime, end: datetime, interval: str = "1h", ) -> pl.DataFrame: """Aggregate telemetry data for a device over a time range.""" docs = await ( TelemetryData.find( TelemetryData.device_id == device_id, TelemetryData.timestamp >= start, TelemetryData.timestamp <= end, ) .sort("+timestamp") .to_list() ) if not docs: return pl.DataFrame() records = [ {"timestamp": d.timestamp, "device_id": d.device_id, **d.metrics} for d in docs ] df = pl.DataFrame(records) return df.sort("timestamp").group_by_dynamic("timestamp", every=interval).agg( pl.all().exclude("timestamp", "device_id").mean() ) async def get_latest_telemetry(device_id: str, limit: int = 100) -> pl.DataFrame: """Get latest telemetry records as a Polars DataFrame.""" docs = await ( TelemetryData.find(TelemetryData.device_id == device_id) .sort("-timestamp") .limit(limit) .to_list() ) if not docs: return pl.DataFrame() records = [ {"timestamp": d.timestamp, "device_id": d.device_id, **d.metrics} for d in docs ] return pl.DataFrame(records)