Files
python-api/app/tasks/celery_app.py
2026-03-01 07:44:19 +09:00

50 lines
1.4 KiB
Python

from __future__ import annotations
from celery import Celery
from celery.schedules import crontab
from app.core.config import settings
celery_app = Celery(
"core_api",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Seoul",
enable_utc=True,
task_track_started=True,
task_routes={
"app.tasks.analytics_tasks.*": {"queue": "analytics"},
"app.tasks.notification_tasks.*": {"queue": "notifications"},
"app.tasks.device_tasks.*": {"queue": "devices"},
"app.tasks.auth_tasks.*": {"queue": "default"},
},
beat_schedule={
"cleanup-expired-tokens": {
"task": "app.tasks.auth_tasks.cleanup_expired_tokens",
"schedule": crontab(hour=3, minute=0),
},
"check-device-health": {
"task": "app.tasks.device_tasks.check_device_health",
"schedule": crontab(minute="*/5"),
},
"daily-analytics": {
"task": "app.tasks.analytics_tasks.run_daily_analytics",
"schedule": crontab(hour=1, minute=0),
},
},
)
celery_app.autodiscover_tasks([
"app.tasks.auth_tasks",
"app.tasks.device_tasks",
"app.tasks.notification_tasks",
"app.tasks.analytics_tasks",
"app.tasks.scheduled",
])