초기 커밋

This commit is contained in:
2026-03-01 07:44:19 +09:00
commit 09359f30be
146 changed files with 6120 additions and 0 deletions

0
tests/__init__.py Normal file
View File

72
tests/conftest.py Normal file
View File

@@ -0,0 +1,72 @@
from __future__ import annotations
from collections.abc import AsyncGenerator
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlmodel import SQLModel
from app.api.deps import get_session
from app.core.config import settings
from app.core.constants import Role
from app.core.security import create_access_token, hash_password
from app.main import create_app
# Use a test database
TEST_MARIADB_DSN = settings.MARIADB_DSN.replace(
settings.MARIADB_DATABASE, f"{settings.MARIADB_DATABASE}_test"
)
test_engine = create_async_engine(TEST_MARIADB_DSN, echo=False)
TestSessionLocal = sessionmaker(bind=test_engine, class_=AsyncSession, expire_on_commit=False)
@pytest_asyncio.fixture
async def db_session() -> AsyncGenerator[AsyncSession, None]:
async with test_engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async with TestSessionLocal() as session:
yield session
async with test_engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.drop_all)
await test_engine.dispose()
@pytest_asyncio.fixture
async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]:
app = create_app()
async def override_get_session() -> AsyncGenerator[AsyncSession, None]:
yield db_session
app.dependency_overrides[get_session] = override_get_session
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.fixture
def admin_token() -> str:
return create_access_token(subject=1, role=Role.SUPERADMIN)
@pytest.fixture
def user_token() -> str:
return create_access_token(subject=2, role=Role.USER)
@pytest.fixture
def auth_headers(admin_token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {admin_token}"}
@pytest.fixture
def user_headers(user_token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {user_token}"}

0
tests/e2e/__init__.py Normal file
View File

View File

@@ -0,0 +1,42 @@
from __future__ import annotations
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_device_crud_flow(client: AsyncClient, auth_headers: dict) -> None:
"""Test full device CRUD lifecycle."""
# Create
response = await client.post(
"/api/v1/devices",
json={"device_uid": "test-device-001", "name": "Test Sensor", "device_type": "temperature"},
headers=auth_headers,
)
assert response.status_code == 201
device = response.json()
device_id = device["id"]
assert device["device_uid"] == "test-device-001"
# Read
response = await client.get(f"/api/v1/devices/{device_id}", headers=auth_headers)
assert response.status_code == 200
assert response.json()["name"] == "Test Sensor"
# Update
response = await client.patch(
f"/api/v1/devices/{device_id}",
json={"name": "Updated Sensor"},
headers=auth_headers,
)
assert response.status_code == 200
assert response.json()["name"] == "Updated Sensor"
# List
response = await client.get("/api/v1/devices", headers=auth_headers)
assert response.status_code == 200
assert response.json()["total"] >= 1
# Delete
response = await client.delete(f"/api/v1/devices/{device_id}", headers=auth_headers)
assert response.status_code == 204

View File

View File

@@ -0,0 +1,68 @@
from __future__ import annotations
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import hash_password
from app.models.mariadb.user import User, UserProfile
@pytest.mark.asyncio
async def test_register(client: AsyncClient) -> None:
response = await client.post(
"/api/v1/auth/register",
json={"email": "test@example.com", "password": "password123", "full_name": "Test User"},
)
assert response.status_code == 201
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
assert data["token_type"] == "bearer"
@pytest.mark.asyncio
async def test_register_duplicate_email(client: AsyncClient) -> None:
await client.post(
"/api/v1/auth/register",
json={"email": "dup@example.com", "password": "pass123"},
)
response = await client.post(
"/api/v1/auth/register",
json={"email": "dup@example.com", "password": "pass456"},
)
assert response.status_code == 409
@pytest.mark.asyncio
async def test_login(client: AsyncClient, db_session: AsyncSession) -> None:
user = User(email="login@example.com", hashed_password=hash_password("pass123"))
db_session.add(user)
await db_session.flush()
profile = UserProfile(user_id=user.id, full_name="Login User")
db_session.add(profile)
await db_session.commit()
response = await client.post(
"/api/v1/auth/login",
json={"email": "login@example.com", "password": "pass123"},
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
@pytest.mark.asyncio
async def test_login_wrong_password(client: AsyncClient, db_session: AsyncSession) -> None:
user = User(email="wrong@example.com", hashed_password=hash_password("correct"))
db_session.add(user)
await db_session.flush()
profile = UserProfile(user_id=user.id)
db_session.add(profile)
await db_session.commit()
response = await client.post(
"/api/v1/auth/login",
json={"email": "wrong@example.com", "password": "incorrect"},
)
assert response.status_code == 401

View File

@@ -0,0 +1,13 @@
from __future__ import annotations
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_health_check(client: AsyncClient) -> None:
response = await client.get("/api/v1/system/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["service"] == "core-api"

0
tests/unit/__init__.py Normal file
View File

View File

@@ -0,0 +1,25 @@
from __future__ import annotations
from app.core.constants import Role
from app.core.permissions import can_manage_user, is_admin, is_management
def test_is_admin() -> None:
assert is_admin(Role.SUPERADMIN)
assert is_admin(Role.ADMIN)
assert not is_admin(Role.MANAGER)
assert not is_admin(Role.USER)
def test_is_management() -> None:
assert is_management(Role.SUPERADMIN)
assert is_management(Role.ADMIN)
assert is_management(Role.MANAGER)
assert not is_management(Role.USER)
def test_can_manage_user() -> None:
assert can_manage_user(Role.SUPERADMIN, Role.ADMIN)
assert can_manage_user(Role.ADMIN, Role.USER)
assert not can_manage_user(Role.USER, Role.ADMIN)
assert not can_manage_user(Role.ADMIN, Role.ADMIN)

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
from app.core.constants import TokenType
from app.core.security import (
create_access_token,
create_refresh_token,
decode_token,
hash_password,
verify_password,
)
def test_password_hash_and_verify() -> None:
password = "securepassword123"
hashed = hash_password(password)
assert hashed != password
assert verify_password(password, hashed)
assert not verify_password("wrongpassword", hashed)
def test_create_access_token() -> None:
token = create_access_token(subject=1, role="admin")
payload = decode_token(token)
assert payload is not None
assert payload["sub"] == "1"
assert payload["role"] == "admin"
assert payload["type"] == TokenType.ACCESS
def test_create_refresh_token() -> None:
token = create_refresh_token(subject=1)
payload = decode_token(token)
assert payload is not None
assert payload["sub"] == "1"
assert payload["type"] == TokenType.REFRESH
def test_decode_invalid_token() -> None:
result = decode_token("invalid.token.string")
assert result is None

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from app.processing.utils.statistics import detect_anomalies, moving_average, percentile_stats
def test_moving_average() -> None:
values = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]
result = moving_average(values, window=3)
assert len(result) == 5
assert abs(result[0] - 2.0) < 0.001
def test_moving_average_short_input() -> None:
values = [1.0, 2.0]
result = moving_average(values, window=5)
assert result == values
def test_detect_anomalies() -> None:
values = [10.0, 10.1, 9.9, 10.0, 50.0, 10.0, 9.8]
anomalies = detect_anomalies(values, threshold=2.0)
assert len(anomalies) >= 1
assert any(a["value"] == 50.0 for a in anomalies)
def test_percentile_stats() -> None:
values = list(range(1, 101))
stats = percentile_stats([float(v) for v in values])
assert abs(stats["p50"] - 50.5) < 1.0
assert stats["p99"] > stats["p95"]

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
from app.utils.validators import is_valid_device_uid, is_valid_email
def test_valid_device_uid() -> None:
assert is_valid_device_uid("device-001")
assert is_valid_device_uid("SENSOR_ABC_123")
assert not is_valid_device_uid("ab") # too short
assert not is_valid_device_uid("device uid") # space
assert not is_valid_device_uid("")
def test_valid_email() -> None:
assert is_valid_email("user@example.com")
assert is_valid_email("test.user+tag@domain.co.kr")
assert not is_valid_email("not-an-email")
assert not is_valid_email("@domain.com")