app.main module#
FastAPI application entrypoint for the weather ML platform.
This module wires the HTTP interface (pages, JSON APIs, HTMX partials),
initializes persistent logging, and coordinates background tasks that ingest
data, impute gaps, and train models. It delegates Slurm job dispatch to the
app.slurm_job_trigger module and uses SQLAlchemy for persistence.
See Also#
app.slurm_job_trigger.create_and_dispatch_training_jobDispatch path to Slurm
app.slurm_job_trigger.trigger_slurm_jobsbatch submission helper
app.ml_utils.get_latest_training_logsCollects latest scores per horizon
app.ml_utils.get_historical_scoresGroups historical scores
app.database.SessionLocalRequest-scoped sync session factory
app.models.TrainingStatusTracks training lifecycle state
app.schemas.TrainingStatusSchemaResponse schema for training status
app.imputation.run_imputation_cycleImputation job implementation
app.weather_ingest.trigger_weather_ingestion_cycleWeather ingestion job
Notes#
Primary role: expose the web UI and APIs; schedule recurring maintenance and training jobs; orchestrate job dispatch to Slurm.
Key dependencies: a reachable database (
settings.DATABASE_URL), APScheduler in-process scheduler, and a writable shared volume at/datafor logs, synchronized app code, and model artifacts.Invariants: persistent logging is configured on import via
app.utils.configure_persistent_logging(). Background scheduler runs in process; job IDs are stable and unique.
Examples#
>>> # Start locally with uvicorn (from project root)
>>> # $ cd src && uvicorn app.main:app --reload
- app.main.check_and_reset_training_status() None[source]#
Reset training status when no Slurm training jobs are active.
- Returns:
NoneUpdates the
training_statusrecord in place when applicable.
See also
app.models.TrainingStatusPersistent training state record.
app.slurm_job_trigger.create_and_dispatch_training_jobTraining dispatch.
Notes
Queries the Slurm queue via
_is_slurm_training_job_running; if no jobs are present but the DB flag indicates training, the flag is reset.Database failures are caught and logged; no exception is propagated to the scheduler loop to keep it resilient.
- async app.main.get_all_coordinates_endpoint() List[CoordinateSchema]#
Return all configured coordinates.
- Returns:
list[app.schemas.CoordinateSchema]All coordinate definitions stored in the database.
- Raises:
fastapi.HTTPException500 if the database query fails.
See also
app.coordinates_manager.get_coordinatesDatabase retrieval function.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.get("/api/coordinates") >>> r.status_code in (200, 500) True
- async app.main.get_dashboard_stats() Dict[str, int]#
Return general dashboard statistics.
See also
app.models.WeatherObservationCounts observations (actual/imputed).
app.models.TrainingLogCounts global training runs.
Notes
On database errors, the error is logged; response content may be empty depending on failure timing.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.get("/api/stats") >>> r.status_code in (200, 500) True
- async app.main.get_latest_data_for_coord_endpoint(lat: float, lon: float) WeatherObservationSchema#
Return the latest observation for a given coordinate.
- Parameters:
- Returns:
app.schemas.WeatherObservationSchemaMost recent observation at the coordinate.
- Raises:
fastapi.HTTPException404 if no observation exists, 500 on database failure.
See also
app.models.WeatherObservationQuery target model.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.get("/api/latest-data?lat=0&lon=0") >>> r.status_code in (200, 404, 500) True
- async app.main.get_prediction_data_endpoint() PredictionDataResponse#
Return prediction history for charts.
- Returns:
app.schemas.PredictionDataResponseHistorical scores grouped by horizon key.
- Raises:
fastapi.HTTPException500 if fetching historical scores fails.
See also
app.ml_utils.get_historical_scoresHistorical score aggregation.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.get("/prediction-data") >>> r.status_code in (200, 500) True
- async app.main.get_scheduled_jobs_endpoint() fastapi.responses.JSONResponse#
Return currently scheduled APScheduler jobs.
- Returns:
fastapi.responses.JSONResponseA list of job metadata including id, name, trigger, and next run time.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.get("/jobs") >>> r.status_code in (200, 500) True
- async app.main.get_training_history(horizon: str = '5min') Dict[str, Any]#
Return training history for a specific horizon.
- Parameters:
- horizon
str,default“5min” One of
{"5min", "1h", "12h", "24h"}.
- horizon
- Returns:
dictDictionary with
timestamps,sklearn_data,pytorch_data, anddata_countsfor the requested horizon.
- Raises:
fastapi.HTTPException500 on database failure.
See also
app.models.TrainingLogSource of historical entries.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.get("/api/training-history?horizon=5min") >>> r.status_code in (200, 500) True
- async app.main.health_check() fastapi.responses.JSONResponse#
Return service health including job freshness, scheduler state, and uptime.
- Returns:
fastapi.responses.JSONResponseJSON payload with keys:
ml_job_data_freshness,scheduler_status,recent_errors(up to 10), andfastapi_uptime.
See also
app.utils.configure_persistent_loggingEnsures log file existence.
Notes
Log parsing is best-effort; failures are logged and omitted from output.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.get("/api/health") >>> r.status_code in (200, 500) True
- async app.main.manual_impute_endpoint(background_tasks: fastapi.BackgroundTasks) fastapi.responses.JSONResponse#
Trigger a manual imputation cycle.
- Parameters:
- background_tasks
fastapi.BackgroundTasks FastAPI task scheduler to offload imputation.
- background_tasks
- Returns:
fastapi.responses.JSONResponse202 if accepted.
- Raises:
fastapi.HTTPException500 on unexpected failure.
See also
app.imputation.run_imputation_cycleImputation task implementation.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.post("/impute") >>> r.status_code in (202, 500) True
- async app.main.render_map(request: fastapi.Request) fastapi.responses.HTMLResponse#
Render a Leaflet map of all coordinates with latest observations.
- Parameters:
- request
fastapi.Request Current request object.
- request
- Returns:
fastapi.responses.HTMLResponseRendered HTML page with a map and per-coordinate metadata.
See also
app.models.CoordinateCoordinate source records.
app.models.WeatherObservationLatest observation per coordinate.
Notes
On database errors, the map renders with an empty coordinate list.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.get("/map") >>> r.status_code in (200, 500) True
- async app.main.render_model_performance_partial(request: fastapi.Request) fastapi.responses.HTMLResponse#
Render an HTML partial for model performance status.
- Parameters:
- request
fastapi.Request Current request for template rendering.
- request
- Returns:
fastapi.responses.HTMLResponseRendered partial including recent ML scores.
See also
app.ml_utils.get_latest_training_logsCollects last run per horizon.
app.utils.build_status_dataShapes logs for the template.
Notes
On errors, returns a small failure fragment with HTTP 500 status.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.get("/status") >>> r.status_code in (200, 500) True
- async app.main.render_root(request: fastapi.Request) fastapi.responses.HTMLResponse#
Render the homepage with total observation count.
- Parameters:
- request
fastapi.Request Current request object for template rendering.
- request
- Returns:
fastapi.responses.HTMLResponseRendered index template including
db_row_count.
See also
app.models.WeatherObservationORM model for observations.
Notes
On database errors the page still renders with a fallback count of 0.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.get("/") >>> r.status_code in (200, 500) True
- async app.main.render_training_status_partial(request: fastapi.Request) fastapi.responses.HTMLResponse#
Render an HTML partial for current training status.
- Parameters:
- request
fastapi.Request Current request for template rendering.
- request
- Returns:
fastapi.responses.HTMLResponsePartial containing flags like
is_trainingandtrain_count.
See also
app.schemas.TrainingStatusSchemaResponse schema.
app.utils.format_sweden_timeHuman-readable timestamp formatting.
Notes
On database errors, returns a small failure fragment with HTTP 500.
Used by the dashboard via HTMX polling.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.get("/training-status") >>> r.status_code in (200, 500) True
- app.main.shutdown_event() None#
Stop scheduler and clean up process-scoped resources on shutdown.
See also
app.slurm_job_trigger._sync_app_sourceUses the same target directory.
Notes
Shuts down APScheduler if running and removes
/data/app_code_for_slurmto avoid stale code on the shared volume.Non-fatal cleanup errors are logged as warnings.
- app.main.startup_event() None#
Initialize DB schema, ensure status row, and start background jobs.
See also
app.database.ensure_database_schemaCreates missing tables.
app.utils.startup_coordinate_checkSeeds/verifies coordinates.
Notes
Invoked by FastAPI once per process start. Ensures the schema exists, runs coordinate setup checks, and starts APScheduler with all jobs.
Any fatal error is logged and re-raised to abort startup.
- async app.main.trigger_manual_training_endpoint(background_tasks: fastapi.BackgroundTasks) fastapi.responses.JSONResponse#
Manually trigger ML training via background task.
- Parameters:
- background_tasks
fastapi.BackgroundTasks FastAPI task scheduler to offload dispatching.
- background_tasks
- Returns:
fastapi.responses.JSONResponse202 on accepted trigger, 409 if a training is already running.
- Raises:
fastapi.HTTPException500 if the database update fails.
See also
app.slurm_job_trigger.create_and_dispatch_training_jobDispatch routine.
Examples
>>> from fastapi.testclient import TestClient >>> from app.main import app >>> client = TestClient(app) >>> r = client.post("/train") >>> r.status_code in (202, 409, 500) True