Files
python-api/app/services/user_service.py
2026-03-01 07:44:19 +09:00

95 lines
3.3 KiB
Python

from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import ConflictException, NotFoundException
from app.core.security import hash_password
from app.models.mariadb.user import User
from app.repositories.user_repo import UserRepository
from app.schemas.user import UserCreate, UserRead, UserUpdate
class UserService:
def __init__(self, session: AsyncSession):
self.user_repo = UserRepository(session)
async def get_user(self, user_id: int) -> UserRead:
user = await self.user_repo.get_with_profile(user_id)
if not user:
raise NotFoundException("User not found")
return self._to_read(user)
async def list_users(self, skip: int = 0, limit: int = 20) -> list[UserRead]:
users = await self.user_repo.get_all(skip=skip, limit=limit)
return [self._to_read(u) for u in users]
async def count_users(self) -> int:
return await self.user_repo.count()
async def create_user(self, data: UserCreate) -> UserRead:
existing = await self.user_repo.get_by_email(data.email)
if existing:
raise ConflictException("Email already registered")
user = User(
email=data.email,
hashed_password=hash_password(data.password),
role=data.role,
)
user = await self.user_repo.create_with_profile(
user,
full_name=data.full_name,
phone=data.phone,
organization=data.organization,
)
return self._to_read(user)
async def update_user(self, user_id: int, data: UserUpdate) -> UserRead:
user = await self.user_repo.get_with_profile(user_id)
if not user:
raise NotFoundException("User not found")
user_fields = {}
profile_fields = {}
if data.is_active is not None:
user_fields["is_active"] = data.is_active
if data.role is not None:
user_fields["role"] = data.role
for field in ("full_name", "phone", "organization", "avatar_url"):
val = getattr(data, field, None)
if val is not None:
profile_fields[field] = val
if user_fields:
await self.user_repo.update(user, user_fields)
if profile_fields and user.profile:
for k, v in profile_fields.items():
setattr(user.profile, k, v)
user = await self.user_repo.get_with_profile(user_id)
return self._to_read(user) # type: ignore[arg-type]
async def delete_user(self, user_id: int) -> None:
user = await self.user_repo.get_by_id(user_id)
if not user:
raise NotFoundException("User not found")
await self.user_repo.update(user, {"is_deleted": True})
@staticmethod
def _to_read(user: User) -> UserRead:
profile = user.profile
return UserRead(
id=user.id, # type: ignore[arg-type]
email=user.email,
role=user.role,
is_active=user.is_active,
is_verified=user.is_verified,
full_name=profile.full_name if profile else "",
phone=profile.phone if profile else "",
organization=profile.organization if profile else "",
avatar_url=profile.avatar_url if profile else "",
created_at=user.created_at,
)