74 lines
2.3 KiB
Python
74 lines
2.3 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
|
|
from app.core.constants import Role
|
|
from app.core.dependencies import require_role
|
|
from app.schemas.analytics import (
|
|
AnalyticsResultRead,
|
|
ReportResponse,
|
|
TelemetryAggregateResponse,
|
|
)
|
|
from app.services.analytics_service import AnalyticsService
|
|
|
|
router = APIRouter(prefix="/analytics", tags=["analytics"])
|
|
|
|
|
|
@router.get("/telemetry/{device_id}", response_model=TelemetryAggregateResponse)
|
|
async def get_telemetry_aggregate(
|
|
device_id: str,
|
|
start: datetime = Query(...),
|
|
end: datetime = Query(...),
|
|
interval: str = Query("1h"),
|
|
_: dict = Depends(require_role(*Role.MANAGEMENT_ROLES)),
|
|
) -> TelemetryAggregateResponse:
|
|
service = AnalyticsService()
|
|
return await service.get_telemetry_aggregate(device_id, start, end, interval)
|
|
|
|
|
|
@router.post("/reports/{device_id}", response_model=ReportResponse)
|
|
async def generate_report(
|
|
device_id: str,
|
|
start: datetime = Query(...),
|
|
end: datetime = Query(...),
|
|
_: dict = Depends(require_role(*Role.MANAGEMENT_ROLES)),
|
|
) -> ReportResponse:
|
|
service = AnalyticsService()
|
|
return await service.generate_report(device_id, start, end)
|
|
|
|
|
|
@router.get("/status/{device_id}")
|
|
async def device_status_analysis(
|
|
device_id: str,
|
|
start: datetime = Query(...),
|
|
end: datetime = Query(...),
|
|
_: dict = Depends(require_role(*Role.MANAGEMENT_ROLES)),
|
|
) -> dict:
|
|
service = AnalyticsService()
|
|
return await service.get_device_status_analysis(device_id, start, end)
|
|
|
|
|
|
@router.get("/trends/{device_id}")
|
|
async def trend_analysis(
|
|
device_id: str,
|
|
start: datetime = Query(...),
|
|
end: datetime = Query(...),
|
|
_: dict = Depends(require_role(*Role.MANAGEMENT_ROLES)),
|
|
) -> dict:
|
|
service = AnalyticsService()
|
|
return await service.get_trend_analysis(device_id, start, end)
|
|
|
|
|
|
@router.get("/results", response_model=list[AnalyticsResultRead])
|
|
async def list_analytics_results(
|
|
analysis_type: str = Query(...),
|
|
device_id: str | None = Query(None),
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(20, ge=1, le=100),
|
|
_: dict = Depends(require_role(*Role.MANAGEMENT_ROLES)),
|
|
) -> list[AnalyticsResultRead]:
|
|
service = AnalyticsService()
|
|
return await service.list_results(analysis_type, device_id, skip, limit)
|