Files
python-api/app/api/v1/endpoints/analytics.py
2026-03-01 07:44:19 +09:00

74 lines
2.3 KiB
Python

from __future__ import annotations
from datetime import datetime
from fastapi import APIRouter, Depends, Query
from app.core.constants import Role
from app.core.dependencies import require_role
from app.schemas.analytics import (
AnalyticsResultRead,
ReportResponse,
TelemetryAggregateResponse,
)
from app.services.analytics_service import AnalyticsService
router = APIRouter(prefix="/analytics", tags=["analytics"])
@router.get("/telemetry/{device_id}", response_model=TelemetryAggregateResponse)
async def get_telemetry_aggregate(
device_id: str,
start: datetime = Query(...),
end: datetime = Query(...),
interval: str = Query("1h"),
_: dict = Depends(require_role(*Role.MANAGEMENT_ROLES)),
) -> TelemetryAggregateResponse:
service = AnalyticsService()
return await service.get_telemetry_aggregate(device_id, start, end, interval)
@router.post("/reports/{device_id}", response_model=ReportResponse)
async def generate_report(
device_id: str,
start: datetime = Query(...),
end: datetime = Query(...),
_: dict = Depends(require_role(*Role.MANAGEMENT_ROLES)),
) -> ReportResponse:
service = AnalyticsService()
return await service.generate_report(device_id, start, end)
@router.get("/status/{device_id}")
async def device_status_analysis(
device_id: str,
start: datetime = Query(...),
end: datetime = Query(...),
_: dict = Depends(require_role(*Role.MANAGEMENT_ROLES)),
) -> dict:
service = AnalyticsService()
return await service.get_device_status_analysis(device_id, start, end)
@router.get("/trends/{device_id}")
async def trend_analysis(
device_id: str,
start: datetime = Query(...),
end: datetime = Query(...),
_: dict = Depends(require_role(*Role.MANAGEMENT_ROLES)),
) -> dict:
service = AnalyticsService()
return await service.get_trend_analysis(device_id, start, end)
@router.get("/results", response_model=list[AnalyticsResultRead])
async def list_analytics_results(
analysis_type: str = Query(...),
device_id: str | None = Query(None),
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
_: dict = Depends(require_role(*Role.MANAGEMENT_ROLES)),
) -> list[AnalyticsResultRead]:
service = AnalyticsService()
return await service.list_results(analysis_type, device_id, skip, limit)