Files
python-api/app/api/v1/endpoints/devices.py
2026-03-01 07:44:19 +09:00

71 lines
2.4 KiB
Python

from __future__ import annotations
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_session
from app.core.constants import Role
from app.core.dependencies import get_current_user_payload, require_role
from app.schemas.common import PaginatedResponse
from app.schemas.device import DeviceCreate, DeviceRead, DeviceUpdate
from app.services.device_service import DeviceService
router = APIRouter(prefix="/devices", tags=["devices"])
@router.get("", response_model=PaginatedResponse[DeviceRead])
async def list_devices(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
_: dict = Depends(get_current_user_payload),
session: AsyncSession = Depends(get_session),
) -> PaginatedResponse[DeviceRead]:
service = DeviceService(session)
skip = (page - 1) * size
items = await service.list_devices(skip=skip, limit=size)
total = await service.count_devices()
return PaginatedResponse(
items=items, total=total, page=page, size=size, pages=(total + size - 1) // size
)
@router.get("/{device_id}", response_model=DeviceRead)
async def get_device(
device_id: int,
_: dict = Depends(get_current_user_payload),
session: AsyncSession = Depends(get_session),
) -> DeviceRead:
service = DeviceService(session)
return await service.get_device(device_id)
@router.post("", response_model=DeviceRead, status_code=201)
async def create_device(
body: DeviceCreate,
_: dict = Depends(require_role(Role.SUPERADMIN, Role.ADMIN, Role.MANAGER)),
session: AsyncSession = Depends(get_session),
) -> DeviceRead:
service = DeviceService(session)
return await service.create_device(body)
@router.patch("/{device_id}", response_model=DeviceRead)
async def update_device(
device_id: int,
body: DeviceUpdate,
_: dict = Depends(require_role(Role.SUPERADMIN, Role.ADMIN, Role.MANAGER)),
session: AsyncSession = Depends(get_session),
) -> DeviceRead:
service = DeviceService(session)
return await service.update_device(device_id, body)
@router.delete("/{device_id}", status_code=204)
async def delete_device(
device_id: int,
_: dict = Depends(require_role(Role.SUPERADMIN, Role.ADMIN)),
session: AsyncSession = Depends(get_session),
) -> None:
service = DeviceService(session)
await service.delete_device(device_id)