Files
python-api/app/processing/pipelines/telemetry_pipeline.py
2026-03-01 07:44:19 +09:00

58 lines
1.4 KiB
Python

from __future__ import annotations
from datetime import datetime
import polars as pl
from app.models.mongodb.telemetry import TelemetryData
async def aggregate_telemetry(
device_id: str,
start: datetime,
end: datetime,
interval: str = "1h",
) -> pl.DataFrame:
"""Aggregate telemetry data for a device over a time range."""
docs = await (
TelemetryData.find(
TelemetryData.device_id == device_id,
TelemetryData.timestamp >= start,
TelemetryData.timestamp <= end,
)
.sort("+timestamp")
.to_list()
)
if not docs:
return pl.DataFrame()
records = [
{"timestamp": d.timestamp, "device_id": d.device_id, **d.metrics}
for d in docs
]
df = pl.DataFrame(records)
return df.sort("timestamp").group_by_dynamic("timestamp", every=interval).agg(
pl.all().exclude("timestamp", "device_id").mean()
)
async def get_latest_telemetry(device_id: str, limit: int = 100) -> pl.DataFrame:
"""Get latest telemetry records as a Polars DataFrame."""
docs = await (
TelemetryData.find(TelemetryData.device_id == device_id)
.sort("-timestamp")
.limit(limit)
.to_list()
)
if not docs:
return pl.DataFrame()
records = [
{"timestamp": d.timestamp, "device_id": d.device_id, **d.metrics}
for d in docs
]
return pl.DataFrame(records)