Files
python-api/app/services/notification_service.py
2026-03-01 07:44:19 +09:00

50 lines
1.5 KiB
Python

from __future__ import annotations
from app.communication.socketio.server import sio
from app.models.mongodb.notification import Notification
class NotificationService:
async def create_notification(
self, user_id: int, title: str, message: str, notification_type: str = "info"
) -> Notification:
notification = Notification(
user_id=user_id,
title=title,
message=message,
notification_type=notification_type,
)
await notification.insert()
await sio.emit(
"notification",
{"title": title, "message": message, "type": notification_type},
room=f"user:{user_id}",
namespace="/notification",
)
return notification
async def get_user_notifications(
self, user_id: int, skip: int = 0, limit: int = 20, unread_only: bool = False
) -> list[Notification]:
query: dict = {"user_id": user_id}
if unread_only:
query["is_read"] = False
return await (
Notification.find(query)
.sort("-created_at")
.skip(skip)
.limit(limit)
.to_list()
)
async def mark_as_read(self, notification_id: str, user_id: int) -> None:
from datetime import datetime
notification = await Notification.get(notification_id)
if notification and notification.user_id == user_id:
notification.is_read = True
notification.read_at = datetime.utcnow()
await notification.save()