"""Database engines and session factories (sync and async) for the app.This module centralizes SQLAlchemy setup. It exposes reusable, process-wideengine singletons and companion session factories for both synchronous andasynchronous usage. When targeting SQLite, it applies pragmatic defaults toimprove concurrency (WAL) and resilience (busy timeout) via connection eventhooks. All ORM models inherit from the exported ``Base``.See Also--------app.models : Declarative ORM models (bound to ``app.database.Base``).app.schemas : Pydantic schemas mirroring ORM shapes.app.main : FastAPI app wiring request-scoped DB sessions.app.config : Source of ``settings.DATABASE_URL``.Notes------ Primary role: provide ready-to-use SQLAlchemy engines and session factories and a declarative ``Base`` for model definitions.- Key dependencies: ``app.config.settings.DATABASE_URL`` must be defined; SQLAlchemy (sync/async) drivers must be available for the chosen URL.- Invariants: engines are module-level singletons; SQLite engines use ``check_same_thread=False`` and have PRAGMA ``journal_mode=WAL`` and ``busy_timeout`` set at connect time.Examples-------->>> # Ensure schema exists (tables are created if missing) # doctest: +SKIP>>> from app.database import ensure_database_schema>>> ensure_database_schema()>>> # Sync usage # doctest: +SKIP>>> from app.database import SessionLocal>>> with SessionLocal() as session:... pass>>> # Async usage # doctest: +SKIP>>> from app.database import AsyncSessionLocal>>> async def get_count() -> int:... async with AsyncSessionLocal() as session: # doctest: +SKIP... return 0"""importloggingimportosfromtypingimportAny,Optionalfromsqlalchemyimportcreate_engine,eventfromsqlalchemy.engineimportEnginefromsqlalchemy.excimportSQLAlchemyErrorfromsqlalchemy.ext.asyncioimport(AsyncEngine,AsyncSession,async_sessionmaker,create_async_engine,)fromsqlalchemy.ormimportDeclarativeBase,sessionmakerfrom.configimportsettingslogger=logging.getLogger(__name__)DATABASE_URL:str=settings.DATABASE_URL# URL prefixes and connection argumentsSQLITE_URL_PREFIX:str="sqlite"SQLITE_SYNC_URL_PREFIX:str="sqlite:///"SQLITE_ASYNC_URL_PREFIX:str="sqlite+aiosqlite:///"POSTGRESQL_SYNC_URL_PREFIX:str="postgresql://"POSTGRESQL_ASYNC_URL_PREFIX:str="postgresql+asyncpg://"SQLITE_CONNECT_ARGS:dict[str,Any]={"check_same_thread":False}# PRAGMA constants for SQLitePRAGMA_JOURNAL_MODE:str="WAL"PRAGMA_BUSY_TIMEOUT_MS:int=60000def_get_engine(database_url:str)->Engine:"""Create a SQLAlchemy engine with optional SQLite tuning. Parameters ---------- database_url : str Database URL (e.g., ``sqlite:///...`` or ``postgresql://...``). Returns ------- sqlalchemy.engine.Engine Configured engine instance with SQLite PRAGMAs applied when relevant. Raises ------ SQLAlchemyError If engine creation fails (e.g., invalid URL or driver error). Notes ----- - For SQLite URLs, ``check_same_thread=False`` is used for thread safety in multi-threaded contexts like FastAPI, and WAL/timeout PRAGMAs are set. Examples -------- >>> from app.database import _get_engine >>> engine = _get_engine("sqlite:///:memory:") >>> engine.dialect.name == "sqlite" True """assert(database_url),f"Database URL must be a non-empty string, got {database_url!r}"try:logger.debug("Creating SQLAlchemy engine for URL %s",database_url)ifdatabase_url.startswith(SQLITE_URL_PREFIX):connect_args=SQLITE_CONNECT_ARGSelse:connect_args={}engine=create_engine(database_url,connect_args=connect_args)ifdatabase_url.startswith(SQLITE_URL_PREFIX):_configure_sqlite_engine(engine)returnengineexceptSQLAlchemyError:logger.error(f"Failed to create SQLAlchemy engine for URL {database_url}",exc_info=True)raisedef_configure_sqlite_engine(engine:Engine)->None:"""Apply SQLite-specific PRAGMA statements via event listeners. Parameters ---------- engine : sqlalchemy.engine.Engine Engine connected to a SQLite database. Notes ----- - Sets ``journal_mode=WAL`` and a busy timeout to improve concurrency. Examples -------- >>> from sqlalchemy import create_engine >>> from app.database import _configure_sqlite_engine >>> engine = create_engine("sqlite:///:memory:") >>> _configure_sqlite_engine(engine) # doctest: +SKIP """assertisinstance(engine,Engine),f"Engine must be SQLAlchemy Engine, got {type(engine).__name__}"defset_sqlite_pragmas(dbapi_connection:Any,connection_record:Any)->None:withdbapi_connection.cursor()ascursor:cursor.execute(f"PRAGMA journal_mode={PRAGMA_JOURNAL_MODE}")cursor.execute(f"PRAGMA busy_timeout={PRAGMA_BUSY_TIMEOUT_MS}")event.listen(engine,"connect",set_sqlite_pragmas)logger.debug("Registered SQLite PRAGMA listeners (journal_mode=%s, busy_timeout=%dms)",PRAGMA_JOURNAL_MODE,PRAGMA_BUSY_TIMEOUT_MS,)ifos.environ.get("SKIP_ENGINE_INIT"):engine:Optional[Engine]=NoneSessionLocal:Optional[sessionmaker[Any]]=Noneelse:engine:Engine=_get_engine(DATABASE_URL)SessionLocal:sessionmaker[Any]=sessionmaker(autocommit=False,autoflush=False,bind=engine,)# --- ASYNC SQLALCHEMY SUPPORT ---def_get_async_database_url(database_url:str)->str:"""Convert a sync URL to an async-compatible SQLAlchemy URL. Parameters ---------- database_url : str Original database URL (sync or already async). Returns ------- str Async driver URL (``sqlite+aiosqlite:///`` or ``postgresql+asyncpg://``). Raises ------ ValueError If the input URL uses an unsupported scheme for async operation. Examples -------- >>> from app.database import _get_async_database_url >>> _get_async_database_url("sqlite:///test.db") 'sqlite+aiosqlite:///test.db' >>> _get_async_database_url("postgresql://u:p@h/db") 'postgresql+asyncpg://u:p@h/db' """assert(database_url),f"Database URL must be a non-empty string, got {database_url!r}"ifdatabase_url.startswith(SQLITE_SYNC_URL_PREFIX):returndatabase_url.replace(SQLITE_SYNC_URL_PREFIX,SQLITE_ASYNC_URL_PREFIX)ifdatabase_url.startswith(POSTGRESQL_SYNC_URL_PREFIX):returndatabase_url.replace(POSTGRESQL_SYNC_URL_PREFIX,POSTGRESQL_ASYNC_URL_PREFIX)ifdatabase_url.startswith(SQLITE_ASYNC_URL_PREFIX)ordatabase_url.startswith(POSTGRESQL_ASYNC_URL_PREFIX):returndatabase_urlraiseValueError(f"Unsupported database URL for async: {database_url}")ifos.environ.get("SKIP_ENGINE_INIT"):ASYNC_DATABASE_URL:Optional[str]=Noneasync_engine:Optional[AsyncEngine]=NoneAsyncSessionLocal:Optional[async_sessionmaker[AsyncSession]]=Noneelse:ASYNC_DATABASE_URL:str=_get_async_database_url(DATABASE_URL)async_engine:AsyncEngine=create_async_engine(ASYNC_DATABASE_URL,echo=False,future=True,)AsyncSessionLocal:async_sessionmaker[AsyncSession]=async_sessionmaker(bind=async_engine,expire_on_commit=False,autoflush=False,autocommit=False,)classBase(DeclarativeBase):"""Declarative base for all ORM models. All SQLAlchemy ORM models in this project must inherit from this base. The metadata bound to this base is used by :func:`ensure_database_schema` to create database tables. Notes ----- - Exposed so that ``app.models`` can subclass it for all entities. - Keeps a single metadata registry for consistent schema management. """passdefensure_database_schema()->None:"""Create missing tables based on ORM metadata. Delegates to SQLAlchemy's ``Base.metadata.create_all`` using the configured synchronous engine. This is idempotent and safe to call during startup or migrations bootstrap. Raises ------ SQLAlchemyError If the schema creation step fails (for example due to missing privileges or an unavailable database). Examples -------- >>> from app.database import ensure_database_schema # doctest: +SKIP >>> ensure_database_schema() # doctest: +SKIP See Also -------- app.models : Entities whose tables are created from the metadata. app.main : Application startup that may call this function. """assertisinstance(engine,Engine),f"Engine must be SQLAlchemy Engine, got {type(engine).__name__}"logger.info("Ensuring database schema exists; creating tables if missing.")try:Base.metadata.create_all(engine)exceptSQLAlchemyError:logger.error("Failed to create database schema",exc_info=True)raise__all__=["engine","SessionLocal","Base","ensure_database_schema","async_engine","AsyncSessionLocal",]