app.main module#

FastAPI application entrypoint for the weather ML platform.

This module wires the HTTP interface (pages, JSON APIs, HTMX partials), initializes persistent logging, and coordinates background tasks that ingest data, impute gaps, and train models. It delegates Slurm job dispatch to the app.slurm_job_trigger module and uses SQLAlchemy for persistence.

See Also#

app.slurm_job_trigger.create_and_dispatch_training_job

Dispatch path to Slurm

app.slurm_job_trigger.trigger_slurm_job

sbatch submission helper

app.ml_utils.get_latest_training_logs

Collects latest scores per horizon

app.ml_utils.get_historical_scores

Groups historical scores

app.database.SessionLocal

Request-scoped sync session factory

app.models.TrainingStatus

Tracks training lifecycle state

app.schemas.TrainingStatusSchema

Response schema for training status

app.imputation.run_imputation_cycle

Imputation job implementation

app.weather_ingest.trigger_weather_ingestion_cycle

Weather ingestion job

Notes#

  • Primary role: expose the web UI and APIs; schedule recurring maintenance and training jobs; orchestrate job dispatch to Slurm.

  • Key dependencies: a reachable database (settings.DATABASE_URL), APScheduler in-process scheduler, and a writable shared volume at /data for logs, synchronized app code, and model artifacts.

  • Invariants: persistent logging is configured on import via app.utils.configure_persistent_logging(). Background scheduler runs in process; job IDs are stable and unique.

Examples#

>>> # Start locally with uvicorn (from project root)
>>> # $ cd src && uvicorn app.main:app --reload
app.main.check_and_reset_training_status() None[source]#

Reset training status when no Slurm training jobs are active.

Returns:
None

Updates the training_status record in place when applicable.

See also

app.models.TrainingStatus

Persistent training state record.

app.slurm_job_trigger.create_and_dispatch_training_job

Training dispatch.

Notes

  • Queries the Slurm queue via _is_slurm_training_job_running; if no jobs are present but the DB flag indicates training, the flag is reset.

  • Database failures are caught and logged; no exception is propagated to the scheduler loop to keep it resilient.

async app.main.get_all_coordinates_endpoint() List[CoordinateSchema]#

Return all configured coordinates.

Returns:
list[app.schemas.CoordinateSchema]

All coordinate definitions stored in the database.

Raises:
fastapi.HTTPException

500 if the database query fails.

See also

app.coordinates_manager.get_coordinates

Database retrieval function.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.get("/api/coordinates")
>>> r.status_code in (200, 500)
True
async app.main.get_dashboard_stats() Dict[str, int]#

Return general dashboard statistics.

Returns:
dict[str, int]

Counts for total/actual/imputed observations and total trainings.

See also

app.models.WeatherObservation

Counts observations (actual/imputed).

app.models.TrainingLog

Counts global training runs.

Notes

  • On database errors, the error is logged; response content may be empty depending on failure timing.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.get("/api/stats")
>>> r.status_code in (200, 500)
True
async app.main.get_latest_data_for_coord_endpoint(lat: float, lon: float) WeatherObservationSchema#

Return the latest observation for a given coordinate.

Parameters:
latfloat

Latitude in decimal degrees.

lonfloat

Longitude in decimal degrees.

Returns:
app.schemas.WeatherObservationSchema

Most recent observation at the coordinate.

Raises:
fastapi.HTTPException

404 if no observation exists, 500 on database failure.

See also

app.models.WeatherObservation

Query target model.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.get("/api/latest-data?lat=0&lon=0")
>>> r.status_code in (200, 404, 500)
True
async app.main.get_prediction_data_endpoint() PredictionDataResponse#

Return prediction history for charts.

Returns:
app.schemas.PredictionDataResponse

Historical scores grouped by horizon key.

Raises:
fastapi.HTTPException

500 if fetching historical scores fails.

See also

app.ml_utils.get_historical_scores

Historical score aggregation.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.get("/prediction-data")
>>> r.status_code in (200, 500)
True
async app.main.get_scheduled_jobs_endpoint() fastapi.responses.JSONResponse#

Return currently scheduled APScheduler jobs.

Returns:
fastapi.responses.JSONResponse

A list of job metadata including id, name, trigger, and next run time.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.get("/jobs")
>>> r.status_code in (200, 500)
True
async app.main.get_training_history(horizon: str = '5min') Dict[str, Any]#

Return training history for a specific horizon.

Parameters:
horizonstr, default “5min”

One of {"5min", "1h", "12h", "24h"}.

Returns:
dict

Dictionary with timestamps, sklearn_data, pytorch_data, and data_counts for the requested horizon.

Raises:
fastapi.HTTPException

500 on database failure.

See also

app.models.TrainingLog

Source of historical entries.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.get("/api/training-history?horizon=5min")
>>> r.status_code in (200, 500)
True
async app.main.health_check() fastapi.responses.JSONResponse#

Return service health including job freshness, scheduler state, and uptime.

Returns:
fastapi.responses.JSONResponse

JSON payload with keys: ml_job_data_freshness, scheduler_status, recent_errors (up to 10), and fastapi_uptime.

See also

app.utils.configure_persistent_logging

Ensures log file existence.

Notes

  • Log parsing is best-effort; failures are logged and omitted from output.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.get("/api/health")
>>> r.status_code in (200, 500)
True
async app.main.manual_impute_endpoint(background_tasks: fastapi.BackgroundTasks) fastapi.responses.JSONResponse#

Trigger a manual imputation cycle.

Parameters:
background_tasksfastapi.BackgroundTasks

FastAPI task scheduler to offload imputation.

Returns:
fastapi.responses.JSONResponse

202 if accepted.

Raises:
fastapi.HTTPException

500 on unexpected failure.

See also

app.imputation.run_imputation_cycle

Imputation task implementation.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.post("/impute")
>>> r.status_code in (202, 500)
True
async app.main.render_map(request: fastapi.Request) fastapi.responses.HTMLResponse#

Render a Leaflet map of all coordinates with latest observations.

Parameters:
requestfastapi.Request

Current request object.

Returns:
fastapi.responses.HTMLResponse

Rendered HTML page with a map and per-coordinate metadata.

See also

app.models.Coordinate

Coordinate source records.

app.models.WeatherObservation

Latest observation per coordinate.

Notes

  • On database errors, the map renders with an empty coordinate list.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.get("/map")
>>> r.status_code in (200, 500)
True
async app.main.render_model_performance_partial(request: fastapi.Request) fastapi.responses.HTMLResponse#

Render an HTML partial for model performance status.

Parameters:
requestfastapi.Request

Current request for template rendering.

Returns:
fastapi.responses.HTMLResponse

Rendered partial including recent ML scores.

See also

app.ml_utils.get_latest_training_logs

Collects last run per horizon.

app.utils.build_status_data

Shapes logs for the template.

Notes

  • On errors, returns a small failure fragment with HTTP 500 status.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.get("/status")
>>> r.status_code in (200, 500)
True
async app.main.render_root(request: fastapi.Request) fastapi.responses.HTMLResponse#

Render the homepage with total observation count.

Parameters:
requestfastapi.Request

Current request object for template rendering.

Returns:
fastapi.responses.HTMLResponse

Rendered index template including db_row_count.

See also

app.models.WeatherObservation

ORM model for observations.

Notes

  • On database errors the page still renders with a fallback count of 0.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.get("/")
>>> r.status_code in (200, 500)
True
async app.main.render_training_status_partial(request: fastapi.Request) fastapi.responses.HTMLResponse#

Render an HTML partial for current training status.

Parameters:
requestfastapi.Request

Current request for template rendering.

Returns:
fastapi.responses.HTMLResponse

Partial containing flags like is_training and train_count.

See also

app.schemas.TrainingStatusSchema

Response schema.

app.utils.format_sweden_time

Human-readable timestamp formatting.

Notes

  • On database errors, returns a small failure fragment with HTTP 500.

  • Used by the dashboard via HTMX polling.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.get("/training-status")
>>> r.status_code in (200, 500)
True
app.main.shutdown_event() None#

Stop scheduler and clean up process-scoped resources on shutdown.

See also

app.slurm_job_trigger._sync_app_source

Uses the same target directory.

Notes

  • Shuts down APScheduler if running and removes /data/app_code_for_slurm to avoid stale code on the shared volume.

  • Non-fatal cleanup errors are logged as warnings.

app.main.startup_event() None#

Initialize DB schema, ensure status row, and start background jobs.

See also

app.database.ensure_database_schema

Creates missing tables.

app.utils.startup_coordinate_check

Seeds/verifies coordinates.

Notes

  • Invoked by FastAPI once per process start. Ensures the schema exists, runs coordinate setup checks, and starts APScheduler with all jobs.

  • Any fatal error is logged and re-raised to abort startup.

async app.main.trigger_manual_training_endpoint(background_tasks: fastapi.BackgroundTasks) fastapi.responses.JSONResponse#

Manually trigger ML training via background task.

Parameters:
background_tasksfastapi.BackgroundTasks

FastAPI task scheduler to offload dispatching.

Returns:
fastapi.responses.JSONResponse

202 on accepted trigger, 409 if a training is already running.

Raises:
fastapi.HTTPException

500 if the database update fails.

Examples

>>> from fastapi.testclient import TestClient
>>> from app.main import app
>>> client = TestClient(app)
>>> r = client.post("/train")
>>> r.status_code in (202, 409, 500)
True