from __future__ import annotations from beanie import init_beanie from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase from app.core.config import settings mongo_client: AsyncIOMotorClient | None = None mongo_db: AsyncIOMotorDatabase | None = None async def init_mongodb() -> None: global mongo_client, mongo_db mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) mongo_db = mongo_client[settings.MONGODB_DATABASE] from app.models.mongodb.analytics_result import AnalyticsResult from app.models.mongodb.device_log import DeviceLog from app.models.mongodb.notification import Notification from app.models.mongodb.telemetry import TelemetryData await init_beanie( database=mongo_db, document_models=[DeviceLog, TelemetryData, AnalyticsResult, Notification], ) async def close_mongodb() -> None: global mongo_client if mongo_client: mongo_client.close() def get_mongo_db() -> AsyncIOMotorDatabase: assert mongo_db is not None, "MongoDB not initialized" return mongo_db