app.training_jobs module#
Coordinate manual and background training jobs across horizons.
This module orchestrates the end-to-end training flow invoked either from the
web application or from scheduled background tasks. It sets and clears the
TrainingStatus singleton, fetches raw observations as a Pandas DataFrame,
iterates over configured horizons, and delegates horizon-specific work to the
training helpers. Errors are logged with stack traces and the database state
is kept consistent by clearing the in-progress flag on failure.
See Also#
app.ml_train.HORIZON_SHIFTS
app.training_helpers.get_horizon_shift
app.training_helpers.train_models_for_horizon
app.training_helpers.unpack_training_result
app.database.SessionLocal
Notes#
Primary role: orchestrate training across all horizons and maintain the
TrainingStatusstate while delegating scoring to the training pipeline.Key dependencies:
app.database.SessionLocalfor DB access; ORM tablesapp.models.TrainingStatusandapp.models.WeatherObservation; horizon mapping fromapp.ml_train.HORIZON_SHIFTS; helpers inapp.training_helpers.Invariants:
TrainingStatusis treated as a singleton withid=1; the database must be reachable.
Examples#
>>> from app.training_jobs import launch_training_thread
>>> launch_training_thread()
>>> # Or run synchronously (useful for tests/local debugging)
>>> from app.training_jobs import run_training_and_update_status
>>> run_training_and_update_status()
- app.training_jobs.clear_training_in_progress(session: sqlalchemy.orm.Session, error: bool = False) None[source]#
Clear the in-progress flag and update completion timestamp.
- Parameters:
- session
sqlalchemy.orm.Session Open SQLAlchemy session.
- errorbool,
optional Whether training ended with an error (affects log level), by default
False.
- session
- Raises:
sqlalchemy.exc.SQLAlchemyErrorIf the update or commit fails.
- app.training_jobs.fetch_all_data_points() pandas.DataFrame[source]#
Load all observations as a timestamp-sorted DataFrame.
- Returns:
pandas.DataFrameAll rows from
WeatherObservationsorted ascending bytimestamp.
- Raises:
AssertionErrorIf the session is not bound to an engine.
sqlalchemy.exc.SQLAlchemyErrorIf the query or read fails due to database errors.
- app.training_jobs.get_or_create_training_status(session: sqlalchemy.orm.Session) TrainingStatus[source]#
Return the singleton
TrainingStatusrow, creating it if missing.- Parameters:
- session
sqlalchemy.orm.Session Open SQLAlchemy session bound to the application engine.
- session
- Returns:
app.models.TrainingStatusThe existing or newly created singleton row with
id=1.
- Raises:
sqlalchemy.exc.SQLAlchemyErrorIf the insert/commit of a missing row fails or if the query fails.
- app.training_jobs.launch_training_thread(on_complete: Callable[[], None] | None = None) None[source]#
Launch the training workflow in a daemon thread.
The thread runs
run_training_and_update_status()and returns immediately to the caller. The optionalon_completecallback is invoked only when the training finishes without raising an exception.- Parameters:
Notes
The thread is created with
daemon=True; it may be terminated early if the hosting process exits before training completes.
Examples
>>> from app.training_jobs import launch_training_thread >>> launch_training_thread()
- app.training_jobs.run_training_and_update_status(on_complete: Callable[[], None] | None = None) None[source]#
Run the full training workflow across all horizons.
The function toggles the
TrainingStatussingleton to in-progress, fetches all observations, iterates through the configured horizons, and delegates per-horizon training. On success, it clears the in-progress flag and optionally invokeson_complete. On any exception, the error is logged, and the in-progress flag is cleared with an error marker.- Parameters:
Notes
All exceptions are caught and logged; they do not propagate to the caller. Inspect logs and the
TrainingStatusrow for error details.
- app.training_jobs.set_training_in_progress(training_status: TrainingStatus, session: sqlalchemy.orm.Session) None[source]#
Mark training as in-progress and increment the run counter.
- Parameters:
- training_status
app.models.TrainingStatus The status row to update (its
idis used in the update query).- session
sqlalchemy.orm.Session Open SQLAlchemy session.
- training_status
- Raises:
sqlalchemy.exc.SQLAlchemyErrorIf the update or commit fails.
- app.training_jobs.train_and_log_for_horizon(data_frame: pandas.DataFrame, horizon: str) None[source]#
Train for one horizon and log summary metrics.
Delegates to the helpers to obtain the shift value and execute training. Supports a legacy call signature by retrying without a shift argument when a
TypeErroris raised (for backward compatibility). Metrics are normalized viaapp.training_helpers.unpack_training_result()and logged for observability.- Parameters:
- data_frame
pandas.DataFrame Full dataset used for training across horizons.
- horizon
str The horizon label to train for.
- data_frame
- Raises:
ExceptionAny exception raised by the underlying training helpers after the compatibility retry is re-raised to the caller.