71 lines
2.4 KiB
Python
71 lines
2.4 KiB
Python
from __future__ import annotations
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.api.deps import get_session
|
|
from app.core.constants import Role
|
|
from app.core.dependencies import get_current_user_payload, require_role
|
|
from app.schemas.common import PaginatedResponse
|
|
from app.schemas.device import DeviceCreate, DeviceRead, DeviceUpdate
|
|
from app.services.device_service import DeviceService
|
|
|
|
router = APIRouter(prefix="/devices", tags=["devices"])
|
|
|
|
|
|
@router.get("", response_model=PaginatedResponse[DeviceRead])
|
|
async def list_devices(
|
|
page: int = Query(1, ge=1),
|
|
size: int = Query(20, ge=1, le=100),
|
|
_: dict = Depends(get_current_user_payload),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> PaginatedResponse[DeviceRead]:
|
|
service = DeviceService(session)
|
|
skip = (page - 1) * size
|
|
items = await service.list_devices(skip=skip, limit=size)
|
|
total = await service.count_devices()
|
|
return PaginatedResponse(
|
|
items=items, total=total, page=page, size=size, pages=(total + size - 1) // size
|
|
)
|
|
|
|
|
|
@router.get("/{device_id}", response_model=DeviceRead)
|
|
async def get_device(
|
|
device_id: int,
|
|
_: dict = Depends(get_current_user_payload),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> DeviceRead:
|
|
service = DeviceService(session)
|
|
return await service.get_device(device_id)
|
|
|
|
|
|
@router.post("", response_model=DeviceRead, status_code=201)
|
|
async def create_device(
|
|
body: DeviceCreate,
|
|
_: dict = Depends(require_role(Role.SUPERADMIN, Role.ADMIN, Role.MANAGER)),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> DeviceRead:
|
|
service = DeviceService(session)
|
|
return await service.create_device(body)
|
|
|
|
|
|
@router.patch("/{device_id}", response_model=DeviceRead)
|
|
async def update_device(
|
|
device_id: int,
|
|
body: DeviceUpdate,
|
|
_: dict = Depends(require_role(Role.SUPERADMIN, Role.ADMIN, Role.MANAGER)),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> DeviceRead:
|
|
service = DeviceService(session)
|
|
return await service.update_device(device_id, body)
|
|
|
|
|
|
@router.delete("/{device_id}", status_code=204)
|
|
async def delete_device(
|
|
device_id: int,
|
|
_: dict = Depends(require_role(Role.SUPERADMIN, Role.ADMIN)),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> None:
|
|
service = DeviceService(session)
|
|
await service.delete_device(device_id)
|