# src/app/main.py"""FastAPI application entrypoint for the weather ML platform.This module wires the HTTP interface (pages, JSON APIs, HTMX partials),initializes persistent logging, and coordinates background tasks that ingestdata, impute gaps, and train models. It delegates Slurm job dispatch to the``app.slurm_job_trigger`` module and uses SQLAlchemy for persistence.See Also--------app.slurm_job_trigger.create_and_dispatch_training_job : Dispatch path to Slurmapp.slurm_job_trigger.trigger_slurm_job : sbatch submission helperapp.ml_utils.get_latest_training_logs : Collects latest scores per horizonapp.ml_utils.get_historical_scores : Groups historical scoresapp.database.SessionLocal : Request-scoped sync session factoryapp.models.TrainingStatus : Tracks training lifecycle stateapp.schemas.TrainingStatusSchema : Response schema for training statusapp.imputation.run_imputation_cycle : Imputation job implementationapp.weather_ingest.trigger_weather_ingestion_cycle : Weather ingestion jobNotes------ Primary role: expose the web UI and APIs; schedule recurring maintenance and training jobs; orchestrate job dispatch to Slurm.- Key dependencies: a reachable database (``settings.DATABASE_URL``), APScheduler in-process scheduler, and a writable shared volume at ``/data`` for logs, synchronized app code, and model artifacts.- Invariants: persistent logging is configured on import via :func:`app.utils.configure_persistent_logging`. Background scheduler runs in process; job IDs are stable and unique.Examples-------->>> # Start locally with uvicorn (from project root) # doctest: +SKIP>>> # $ cd src && uvicorn app.main:app --reload # doctest: +SKIP"""importloggingimportreimportshutilfromdatetimeimportdatetime,timezonefrompathlibimportPathfromtypingimportAny,Dict,List,Tuplefromapscheduler.schedulers.backgroundimportBackgroundSchedulerfromdocker.errorsimportDockerExceptionfromfastapiimportBackgroundTasks,FastAPI,HTTPException,Requestfromfastapi.responsesimportHTMLResponse,JSONResponsefromfastapi.templatingimportJinja2Templatesfromsqlalchemy.excimportSQLAlchemyErrorfrom.configimportsettingsfrom.coordinates_managerimportget_coordinatesfrom.databaseimportSessionLocal,ensure_database_schemafrom.imputationimportrun_imputation_cyclefrom.ml_utilsimportget_historical_scores,get_latest_training_logsfrom.modelsimportTrainingStatus,WeatherObservationfrom.schemasimport(CoordinateSchema,PredictionDataResponse,TrainingStatusSchema,WeatherObservationSchema,)from.slurm_job_triggerimport(_execute_in_container,_find_slurm_master_container,create_and_dispatch_training_job,)from.utilsimport(build_status_data,configure_persistent_logging,format_sweden_time,format_sweden_time_iso,startup_coordinate_check,)from.weather_ingestimporttrigger_weather_ingestion_cycle# Configure persistent logging before any logger usageconfigure_persistent_logging()logger=logging.getLogger(__name__)FASTAPI_PROCESS_START_TIME=datetime.now(timezone.utc)SLURM_MASTER_CONTAINER_NAMES:Tuple[str,...]=("slurm-master-test","slurm-master","ml_weather-slurm-master-1",)SHARED_DATA_PATH:Path=Path("/data")SBATCH_SCRIPT_NAME:str="run_ml_training_job.sbatch"RSYNC_COMMAND:str="rsync"RSYNC_ARGS:Tuple[str,...]=("-a","--delete")CP_COMMAND:str="cp"CP_ARGS:Tuple[str,...]=("-rT",)CRON_WEATHER_SCHEDULE:str="*/1"CRON_IMPUTATION_SCHEDULE:str="*/10"CRON_ML_TRAINING_SCHEDULE:str="7,27,47"IMPUTATION_SECOND:str="30"CHECK_INTERVAL_SECONDS:int=30# FastAPI application setupapp=FastAPI()templates_dir:Path=Path(__file__).parent/"templates"templates=Jinja2Templates(directory=templates_dir)scheduler=BackgroundScheduler(timezone="UTC")def_is_slurm_training_job_running()->bool:"""Check whether active ML training jobs exist in the Slurm queue. This guard queries the Slurm master container for the current queue using ``squeue --noheader`` and inspects the output for the ``ml_training`` job marker. It prevents overlapping dispatches when a job is already running. Returns ------- bool ``True`` if at least one matching job is found; otherwise ``False``. Raises ------ None All errors are caught and logged; the function returns ``False`` on failures or if the Slurm container cannot be located. See Also -------- app.slurm_job_trigger._find_slurm_master_container : Container discovery. app.slurm_job_trigger._execute_in_container : Command execution helper. """container=_find_slurm_master_container()ifnotcontainer:returnFalsetry:exit_code,stdout,_=_execute_in_container(container,"squeue --noheader")ifexit_code==0andstdoutand"ml_training"instdout:logger.info(f"Training jobs still running: {stdout}")returnTrueexceptDockerExceptionaserror:logger.error(f"Error checking Slurm queue: {error}",exc_info=True)logger.info("No active training jobs found.")returnFalsedefcheck_and_reset_training_status()->None:"""Reset training status when no Slurm training jobs are active. Returns ------- None Updates the ``training_status`` record in place when applicable. Notes ----- - Queries the Slurm queue via ``_is_slurm_training_job_running``; if no jobs are present but the DB flag indicates training, the flag is reset. - Database failures are caught and logged; no exception is propagated to the scheduler loop to keep it resilient. See Also -------- app.models.TrainingStatus : Persistent training state record. app.slurm_job_trigger.create_and_dispatch_training_job : Training dispatch. """try:withSessionLocal()assession:status=session.query(TrainingStatus).get(1)ifnotstatusornotstatus.is_training:returnifnot_is_slurm_training_job_running():status.is_training=Falsestatus.current_horizon="Training status reset - no active jobs"session.commit()logger.warning("Training status reset due to no active jobs.")exceptSQLAlchemyErroraserror:logger.error(f"Error resetting training status: {error}",exc_info=True)def_initialize_training_status()->None:"""Ensure a default TrainingStatus row exists on startup. Creates the singleton ``TrainingStatus`` row if missing, and clears the ``is_training`` flag on restart to avoid a stuck "running" state. Returns ------- None Mutates the database state in place. Notes ----- - Errors are logged with stack traces and do not propagate. See Also -------- app.models.TrainingStatus : ORM model holding training lifecycle flags. """try:withSessionLocal()assession:status=session.query(TrainingStatus).get(1)ifnotstatus:default=TrainingStatus(id=1,is_training=False,train_count=0,current_horizon="Initialized",)session.add(default)session.commit()logger.info("Default TrainingStatus created.")elifstatus.is_training:status.is_training=Falsestatus.current_horizon="System restarted during training"session.commit()logger.warning("TrainingStatus reset on startup.")exceptSQLAlchemyErroraserror:logger.error(f"Error initializing training status: {error}",exc_info=True)def_schedule_jobs()->None:"""Schedule background jobs with APScheduler. Registers four recurring jobs with stable IDs: - ``weather_ingest_job`` (cron ``minute=CRON_WEATHER_SCHEDULE``) - ``imputation_job`` (cron ``minute=CRON_IMPUTATION_SCHEDULE``, ``second=IMPUTATION_SECOND``) - ``ml_training_job_scheduler`` (cron ``minute=CRON_ML_TRAINING_SCHEDULE``) - ``check_training_status`` (interval ``seconds=CHECK_INTERVAL_SECONDS``) Notes ----- - ``replace_existing=True`` ensures idempotent re-scheduling across reloads. - ``coalesce=True`` collapses missed runs into a single immediate run. See Also -------- app.weather_ingest.trigger_weather_ingestion_cycle : Ingestion job. app.imputation.run_imputation_cycle : Imputation job. app.slurm_job_trigger.create_and_dispatch_training_job : Training job. app.main.check_and_reset_training_status : Status guard job. """scheduler.add_job(trigger_weather_ingestion_cycle,"cron",minute=CRON_WEATHER_SCHEDULE,id="weather_ingest_job",misfire_grace_time=60,replace_existing=True,coalesce=True,)scheduler.add_job(run_imputation_cycle,"cron",minute=CRON_IMPUTATION_SCHEDULE,second=IMPUTATION_SECOND,id="imputation_job",misfire_grace_time=120,replace_existing=True,coalesce=True,)scheduler.add_job(create_and_dispatch_training_job,"cron",minute=CRON_ML_TRAINING_SCHEDULE,id="ml_training_job_scheduler",misfire_grace_time=300,replace_existing=True,coalesce=True,)scheduler.add_job(check_and_reset_training_status,"interval",seconds=CHECK_INTERVAL_SECONDS,id="check_training_status",replace_existing=True,)ifnotscheduler.running:scheduler.start()logger.info("APScheduler started.")else:logger.info("APScheduler already running.")@app.on_event("startup")defstartup_event()->None:"""Initialize DB schema, ensure status row, and start background jobs. Notes ----- - Invoked by FastAPI once per process start. Ensures the schema exists, runs coordinate setup checks, and starts APScheduler with all jobs. - Any fatal error is logged and re-raised to abort startup. See Also -------- app.database.ensure_database_schema : Creates missing tables. app.utils.startup_coordinate_check : Seeds/verifies coordinates. """try:logger.info("FastAPI startup event triggered.")ensure_database_schema()startup_coordinate_check()_initialize_training_status()_schedule_jobs()exceptExceptionaserror:logger.critical("Fatal error during FastAPI startup: %s",error,exc_info=True)raise@app.on_event("shutdown")defshutdown_event()->None:"""Stop scheduler and clean up process-scoped resources on shutdown. Notes ----- - Shuts down APScheduler if running and removes ``/data/app_code_for_slurm`` to avoid stale code on the shared volume. - Non-fatal cleanup errors are logged as warnings. See Also -------- app.slurm_job_trigger._sync_app_source : Uses the same target directory. """logger.info("FastAPI shutdown event triggered.")ifscheduler.running:scheduler.shutdown()logger.info("APScheduler shut down.")cleanup_path=SHARED_DATA_PATH/"app_code_for_slurm"ifcleanup_path.exists():try:shutil.rmtree(cleanup_path,ignore_errors=True)logger.info("Cleaned up app_code_for_slurm.")exceptOSErroraserror:logger.warning(f"Error cleaning app_code_for_slurm: {error}")else:logger.info("No app_code_for_slurm to clean.")@app.get("/",response_class=HTMLResponse)asyncdefrender_root(request:Request)->HTMLResponse:"""Render the homepage with total observation count. Parameters ---------- request : fastapi.Request Current request object for template rendering. Returns ------- fastapi.responses.HTMLResponse Rendered index template including ``db_row_count``. Notes ----- - On database errors the page still renders with a fallback count of 0. See Also -------- app.models.WeatherObservation : ORM model for observations. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.get("/") # doctest: +SKIP >>> r.status_code in (200, 500) # doctest: +SKIP True """row_count:int=0try:withSessionLocal()assession:row_count=session.query(WeatherObservation).count()exceptSQLAlchemyErroraserror:logger.error(f"Failed to fetch row count: {error}",exc_info=True)returntemplates.TemplateResponse("index.html",{"request":request,"db_row_count":row_count,"docs_url":settings.documentation_url,},)@app.get("/api/coordinates",response_model=List[CoordinateSchema])asyncdefget_all_coordinates_endpoint()->List[CoordinateSchema]:"""Return all configured coordinates. Returns ------- list[app.schemas.CoordinateSchema] All coordinate definitions stored in the database. Raises ------ fastapi.HTTPException 500 if the database query fails. See Also -------- app.coordinates_manager.get_coordinates : Database retrieval function. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.get("/api/coordinates") # doctest: +SKIP >>> r.status_code in (200, 500) # doctest: +SKIP True """try:withSessionLocal()assession:records=get_coordinates(session)ifnotrecords:logger.warning("No coordinates found for /api/coordinates endpoint.")returnrecordsexceptSQLAlchemyErroraserror:logger.error(f"Error fetching coordinates: {error}",exc_info=True)raiseHTTPException(status_code=500,detail="Could not fetch coordinates.")@app.get("/api/latest-data",response_model=WeatherObservationSchema)asyncdefget_latest_data_for_coord_endpoint(lat:float,lon:float)->WeatherObservationSchema:"""Return the latest observation for a given coordinate. Parameters ---------- lat : float Latitude in decimal degrees. lon : float Longitude in decimal degrees. Returns ------- app.schemas.WeatherObservationSchema Most recent observation at the coordinate. Raises ------ fastapi.HTTPException 404 if no observation exists, 500 on database failure. See Also -------- app.models.WeatherObservation : Query target model. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.get("/api/latest-data?lat=0&lon=0") # doctest: +SKIP >>> r.status_code in (200, 404, 500) # doctest: +SKIP True """try:withSessionLocal()assession:weather_obs=(session.query(WeatherObservation).filter(WeatherObservation.latitude==lat,WeatherObservation.longitude==lon,).order_by(WeatherObservation.timestamp.desc()).first())ifnotweather_obs:raiseHTTPException(status_code=404,detail="No data found for this coordinate.")returnWeatherObservationSchema.model_validate(weather_obs)exceptSQLAlchemyErroraserror:logger.error(f"Error fetching latest data: {error}",exc_info=True)raiseHTTPException(status_code=500,detail="Could not fetch latest data.")@app.get("/status",response_class=HTMLResponse)asyncdefrender_model_performance_partial(request:Request)->HTMLResponse:"""Render an HTML partial for model performance status. Parameters ---------- request : fastapi.Request Current request for template rendering. Returns ------- fastapi.responses.HTMLResponse Rendered partial including recent ML scores. Notes ----- - On errors, returns a small failure fragment with HTTP 500 status. See Also -------- app.ml_utils.get_latest_training_logs : Collects last run per horizon. app.utils.build_status_data : Shapes logs for the template. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.get("/status") # doctest: +SKIP >>> r.status_code in (200, 500) # doctest: +SKIP True """try:logs=get_latest_training_logs()status_data=build_status_data(logs)returntemplates.TemplateResponse("train_status.html",{"request":request,"status_data":status_data},)exceptExceptionaserror:logger.error(f"Error rendering model performance: {error}",exc_info=True)returnHTMLResponse(content=""" <div class="alert alert-danger" role="alert"> Error loading model performance data. </div> """,status_code=500,)@app.get("/training-status",response_class=HTMLResponse)asyncdefrender_training_status_partial(request:Request)->HTMLResponse:"""Render an HTML partial for current training status. Parameters ---------- request : fastapi.Request Current request for template rendering. Returns ------- fastapi.responses.HTMLResponse Partial containing flags like ``is_training`` and ``train_count``. Notes ----- - On database errors, returns a small failure fragment with HTTP 500. - Used by the dashboard via HTMX polling. See Also -------- app.schemas.TrainingStatusSchema : Response schema. app.utils.format_sweden_time : Human-readable timestamp formatting. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.get("/training-status") # doctest: +SKIP >>> r.status_code in (200, 500) # doctest: +SKIP True """try:withSessionLocal()assession:training_status_db=session.query(TrainingStatus).get(1)ifnottraining_status_db:logger.warning("TrainingStatus row missing for rendering status partial.")status_obj=TrainingStatusSchema(id=1,is_training=False,train_count=0,last_trained_at=None,current_horizon="Status unavailable",)else:status_obj=TrainingStatusSchema.model_validate(training_status_db)context={"request":request,"training_status_live":{"last_trained_at_str":(format_sweden_time(status_obj.last_trained_at)ifstatus_obj.last_trained_atelse"Never"),"train_count":status_obj.train_count,"is_training":status_obj.is_training,"current_horizon":status_obj.current_horizonor"Idle",},}returntemplates.TemplateResponse("train_status.html",context)exceptSQLAlchemyErroraserror:logger.error(f"Error rendering training status: {error}",exc_info=True)returnHTMLResponse(content=""" <div class="alert alert-danger" role="alert"> Error loading training status. </div> """,status_code=500,)@app.get("/prediction-data",response_model=PredictionDataResponse)asyncdefget_prediction_data_endpoint()->PredictionDataResponse:"""Return prediction history for charts. Returns ------- app.schemas.PredictionDataResponse Historical scores grouped by horizon key. Raises ------ fastapi.HTTPException 500 if fetching historical scores fails. See Also -------- app.ml_utils.get_historical_scores : Historical score aggregation. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.get("/prediction-data") # doctest: +SKIP >>> r.status_code in (200, 500) # doctest: +SKIP True """try:history=get_historical_scores()returnPredictionDataResponse(history=history)exceptExceptionaserror:logger.error(f"Error fetching prediction data: {error}",exc_info=True)raiseHTTPException(status_code=500,detail="Could not fetch prediction data.")@app.get("/api/training-history")asyncdefget_training_history(horizon:str="5min")->Dict[str,Any]:"""Return training history for a specific horizon. Parameters ---------- horizon : str, default "5min" One of ``{"5min", "1h", "12h", "24h"}``. Returns ------- dict Dictionary with ``timestamps``, ``sklearn_data``, ``pytorch_data``, and ``data_counts`` for the requested horizon. Raises ------ fastapi.HTTPException 500 on database failure. See Also -------- app.models.TrainingLog : Source of historical entries. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.get("/api/training-history?horizon=5min") # doctest: +SKIP >>> r.status_code in (200, 500) # doctest: +SKIP True """try:withSessionLocal()assession:from.modelsimportTrainingLoglogs=(session.query(TrainingLog).filter(TrainingLog.horizon_label==horizon).order_by(TrainingLog.timestamp).all())ifnotlogs:return{"sklearn_data":[],"pytorch_data":[],"timestamps":[]}timestamps=[format_sweden_time_iso(log.timestamp)forloginlogs]sklearn_scores=[log.sklearn_scoreforloginlogs]pytorch_scores=[log.pytorch_scoreforloginlogs]data_counts=[log.data_countforloginlogs]return{"sklearn_data":sklearn_scores,"pytorch_data":pytorch_scores,"timestamps":timestamps,"data_counts":data_counts,"horizon":horizon,}exceptSQLAlchemyErroraserror:logger.error(f"Error fetching training history: {error}",exc_info=True)raiseHTTPException(status_code=500,detail="Could not fetch training history.")@app.get("/api/stats")asyncdefget_dashboard_stats()->Dict[str,int]:"""Return general dashboard statistics. Returns ------- dict[str, int] Counts for total/actual/imputed observations and total trainings. Notes ----- - On database errors, the error is logged; response content may be empty depending on failure timing. See Also -------- app.models.WeatherObservation : Counts observations (actual/imputed). app.models.TrainingLog : Counts global training runs. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.get("/api/stats") # doctest: +SKIP >>> r.status_code in (200, 500) # doctest: +SKIP True """try:withSessionLocal()assession:total_obs=session.query(WeatherObservation).count()actual_obs=(session.query(WeatherObservation).filter(WeatherObservation.is_imputed.is_(False)).count())from.modelsimportTrainingLogtotal_trainings=(session.query(TrainingLog).filter(TrainingLog.coord_latitude.is_(None),TrainingLog.coord_longitude.is_(None),).count())return{"total_observations":total_obs,"actual_observations":actual_obs,"imputed_observations":total_obs-actual_obs,"total_trainings":total_trainings,}exceptSQLAlchemyErroraserror:logger.error(f"Error fetching dashboard stats: {error}",exc_info=True)@app.get("/api/health",response_class=JSONResponse)asyncdefhealth_check()->JSONResponse:"""Return service health including job freshness, scheduler state, and uptime. Returns ------- fastapi.responses.JSONResponse JSON payload with keys: ``ml_job_data_freshness``, ``scheduler_status``, ``recent_errors`` (up to 10), and ``fastapi_uptime``. Notes ----- - Log parsing is best-effort; failures are logged and omitted from output. See Also -------- app.utils.configure_persistent_logging : Ensures log file existence. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.get("/api/health") # doctest: +SKIP >>> r.status_code in (200, 500) # doctest: +SKIP True """from.modelsimportTrainingLog# 1. ML job data freshnesslatest_trained_at=NonewithSessionLocal()assession:# Prefer TrainingStatus.last_trained_at, fallback to latest TrainingLog.timestampstatus=session.query(TrainingStatus).get(1)ifstatusandstatus.last_trained_at:latest_trained_at=status.last_trained_atelse:latest_log=(session.query(TrainingLog).order_by(TrainingLog.timestamp.desc()).first())iflatest_log:latest_trained_at=latest_log.timestampml_job_data_freshness=(latest_trained_at.astimezone(timezone.utc).isoformat()iflatest_trained_atelseNone)# 2. Scheduler statusjob=scheduler.get_job("ml_training_job_scheduler")scheduler_status={"next_run":(job.next_run_time.astimezone(timezone.utc).isoformat()ifjobandjob.next_run_timeelseNone),"is_active":scheduler.running,}# 3. Recent errors (last 10 error/critical log entries)LOG_PATH="/data/logs/fastapi.log"error_entries=[]try:withopen(LOG_PATH,"r",encoding="utf-8")aslog_file:lines=log_file.readlines()[-1000:]# Only scan last 1000 lines for efficiencyerror_pattern=re.compile(r"^(?P<timestamp>[\d\-T:., ]+) - .+ - (ERROR|CRITICAL) - (?P<message>.+)$")forlineinreversed(lines):match=error_pattern.match(line)ifmatch:error_entries.append({"timestamp":match.group("timestamp").strip(),"message":match.group("message").strip(),})iflen(error_entries)>=10:breakexceptExceptionasexc:logger.warning(f"Could not read recent errors from log: {exc}")# 4. FastAPI uptimenow=datetime.now(timezone.utc)uptime_seconds=int((now-FASTAPI_PROCESS_START_TIME).total_seconds())hours,remainder=divmod(uptime_seconds,3600)minutes,seconds=divmod(remainder,60)fastapi_uptime=f"{hours}:{minutes:02}:{seconds:02}"returnJSONResponse(content={"ml_job_data_freshness":ml_job_data_freshness,"scheduler_status":scheduler_status,"recent_errors":error_entries,"fastapi_uptime":fastapi_uptime,})@app.post("/train",status_code=202)asyncdeftrigger_manual_training_endpoint(background_tasks:BackgroundTasks,)->JSONResponse:"""Manually trigger ML training via background task. Parameters ---------- background_tasks : fastapi.BackgroundTasks FastAPI task scheduler to offload dispatching. Returns ------- fastapi.responses.JSONResponse 202 on accepted trigger, 409 if a training is already running. Raises ------ fastapi.HTTPException 500 if the database update fails. See Also -------- app.slurm_job_trigger.create_and_dispatch_training_job : Dispatch routine. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.post("/train") # doctest: +SKIP >>> r.status_code in (202, 409, 500) # doctest: +SKIP True """logger.info("Manual training trigger received.")try:withSessionLocal()assession:status=session.query(TrainingStatus).get(1)ifnotstatus:status=TrainingStatus(id=1,is_training=False,train_count=0,current_horizon=None)session.add(status)session.commit()ifstatus.is_training:message="Conflict: Training is already in progress."logger.warning(message)returnJSONResponse(status_code=409,content={"message":message})status.is_training=Truestatus.current_horizon="Manual trigger: dispatching job..."status.train_count=(status.train_countor0)+1session.commit()background_tasks.add_task(create_and_dispatch_training_job)returnJSONResponse(status_code=202,content={"message":"Accepted: ML training job initiated."},)exceptSQLAlchemyErroraserror:logger.error(f"Error in manual training endpoint: {error}",exc_info=True)raiseHTTPException(status_code=500,detail="Failed to trigger training.")@app.post("/impute")asyncdefmanual_impute_endpoint(background_tasks:BackgroundTasks)->JSONResponse:"""Trigger a manual imputation cycle. Parameters ---------- background_tasks : fastapi.BackgroundTasks FastAPI task scheduler to offload imputation. Returns ------- fastapi.responses.JSONResponse 202 if accepted. Raises ------ fastapi.HTTPException 500 on unexpected failure. See Also -------- app.imputation.run_imputation_cycle : Imputation task implementation. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.post("/impute") # doctest: +SKIP >>> r.status_code in (202, 500) # doctest: +SKIP True """try:logger.info("Manual imputation trigger received.")background_tasks.add_task(run_imputation_cycle)returnJSONResponse(content={"message":"Accepted: Imputation cycle initiated."},status_code=202,)exceptExceptionaserror:logger.error(f"Failed to trigger imputation: {error}",exc_info=True)raiseHTTPException(status_code=500,detail="Failed to trigger imputation.")@app.get("/jobs")asyncdefget_scheduled_jobs_endpoint()->JSONResponse:"""Return currently scheduled APScheduler jobs. Returns ------- fastapi.responses.JSONResponse A list of job metadata including id, name, trigger, and next run time. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.get("/jobs") # doctest: +SKIP >>> r.status_code in (200, 500) # doctest: +SKIP True """jobs_info:List[Dict[str,str]]=[]ifscheduler.running:forjobinscheduler.get_jobs():jobs_info.append({"id":job.id,"name":job.name,"trigger":str(job.trigger),"next_run_time":(str(job.next_run_time)ifjob.next_run_timeelse"N/A"),})else:logger.warning("APScheduler is not running when retrieving jobs.")returnJSONResponse(content=jobs_info)@app.get("/map",response_class=HTMLResponse)asyncdefrender_map(request:Request)->HTMLResponse:"""Render a Leaflet map of all coordinates with latest observations. Parameters ---------- request : fastapi.Request Current request object. Returns ------- fastapi.responses.HTMLResponse Rendered HTML page with a map and per-coordinate metadata. Notes ----- - On database errors, the map renders with an empty coordinate list. See Also -------- app.models.Coordinate : Coordinate source records. app.models.WeatherObservation : Latest observation per coordinate. Examples -------- >>> from fastapi.testclient import TestClient # doctest: +SKIP >>> from app.main import app # doctest: +SKIP >>> client = TestClient(app) # doctest: +SKIP >>> r = client.get("/map") # doctest: +SKIP >>> r.status_code in (200, 500) # doctest: +SKIP True """from.modelsimportCoordinatefrom.schemasimportWeatherObservationSchemacoordinates_with_data=[]try:withSessionLocal()assession:coordinates=session.query(Coordinate).all()COORD_TOLERANCE=1e-4# match DB precisionforcoordinateincoordinates:latest_data=(session.query(WeatherObservation).filter(WeatherObservation.latitude.between(coordinate.latitude-COORD_TOLERANCE,coordinate.latitude+COORD_TOLERANCE,),WeatherObservation.longitude.between(coordinate.longitude-COORD_TOLERANCE,coordinate.longitude+COORD_TOLERANCE,),).order_by(WeatherObservation.timestamp.desc()).first())latest_data_dict=Noneiflatest_data:latest_data_dict=WeatherObservationSchema.model_validate(latest_data).model_dump()# Convert datetime to ISO string for JSON serializationiflatest_data_dict.get("timestamp"):latest_data_dict["timestamp"]=latest_data_dict["timestamp"].isoformat()coordinates_with_data.append({"latitude":coordinate.latitude,"longitude":coordinate.longitude,"label":coordinate.label,"is_central":coordinate.is_central,"latest_data":latest_data_dict,})exceptSQLAlchemyErroraserror:logger.error(f"Error fetching coordinates or latest data: {error}",exc_info=True)coordinates_with_data=[]returntemplates.TemplateResponse("map.html",{"request":request,"coordinates":coordinates_with_data,},)