app.training_jobs module#

Coordinate manual and background training jobs across horizons.

This module orchestrates the end-to-end training flow invoked either from the web application or from scheduled background tasks. It sets and clears the TrainingStatus singleton, fetches raw observations as a Pandas DataFrame, iterates over configured horizons, and delegates horizon-specific work to the training helpers. Errors are logged with stack traces and the database state is kept consistent by clearing the in-progress flag on failure.

See Also#

app.ml_train.HORIZON_SHIFTS

app.training_helpers.get_horizon_shift

app.training_helpers.train_models_for_horizon

app.training_helpers.unpack_training_result

app.models.TrainingStatus

app.models.WeatherObservation

app.database.SessionLocal

Notes#

  • Primary role: orchestrate training across all horizons and maintain the TrainingStatus state while delegating scoring to the training pipeline.

  • Key dependencies: app.database.SessionLocal for DB access; ORM tables app.models.TrainingStatus and app.models.WeatherObservation; horizon mapping from app.ml_train.HORIZON_SHIFTS; helpers in app.training_helpers.

  • Invariants: TrainingStatus is treated as a singleton with id=1; the database must be reachable.

Examples#

>>> from app.training_jobs import launch_training_thread
>>> launch_training_thread()
>>> # Or run synchronously (useful for tests/local debugging)
>>> from app.training_jobs import run_training_and_update_status
>>> run_training_and_update_status()
app.training_jobs.clear_training_in_progress(session: sqlalchemy.orm.Session, error: bool = False) None[source]#

Clear the in-progress flag and update completion timestamp.

Parameters:
sessionsqlalchemy.orm.Session

Open SQLAlchemy session.

errorbool, optional

Whether training ended with an error (affects log level), by default False.

Raises:
sqlalchemy.exc.SQLAlchemyError

If the update or commit fails.

app.training_jobs.fetch_all_data_points() pandas.DataFrame[source]#

Load all observations as a timestamp-sorted DataFrame.

Returns:
pandas.DataFrame

All rows from WeatherObservation sorted ascending by timestamp.

Raises:
AssertionError

If the session is not bound to an engine.

sqlalchemy.exc.SQLAlchemyError

If the query or read fails due to database errors.

app.training_jobs.get_or_create_training_status(session: sqlalchemy.orm.Session) TrainingStatus[source]#

Return the singleton TrainingStatus row, creating it if missing.

Parameters:
sessionsqlalchemy.orm.Session

Open SQLAlchemy session bound to the application engine.

Returns:
app.models.TrainingStatus

The existing or newly created singleton row with id=1.

Raises:
sqlalchemy.exc.SQLAlchemyError

If the insert/commit of a missing row fails or if the query fails.

app.training_jobs.launch_training_thread(on_complete: Callable[[], None] | None = None) None[source]#

Launch the training workflow in a daemon thread.

The thread runs run_training_and_update_status() and returns immediately to the caller. The optional on_complete callback is invoked only when the training finishes without raising an exception.

Parameters:
on_completeCallable[[], None] | None, optional

Callback executed after a successful training run, by default None.

Notes

  • The thread is created with daemon=True; it may be terminated early if the hosting process exits before training completes.

Examples

>>> from app.training_jobs import launch_training_thread
>>> launch_training_thread()
app.training_jobs.run_training_and_update_status(on_complete: Callable[[], None] | None = None) None[source]#

Run the full training workflow across all horizons.

The function toggles the TrainingStatus singleton to in-progress, fetches all observations, iterates through the configured horizons, and delegates per-horizon training. On success, it clears the in-progress flag and optionally invokes on_complete. On any exception, the error is logged, and the in-progress flag is cleared with an error marker.

Parameters:
on_completeCallable[[], None] | None, optional

Callback executed after a successful run, by default None.

Notes

  • All exceptions are caught and logged; they do not propagate to the caller. Inspect logs and the TrainingStatus row for error details.

app.training_jobs.set_training_in_progress(training_status: TrainingStatus, session: sqlalchemy.orm.Session) None[source]#

Mark training as in-progress and increment the run counter.

Parameters:
training_statusapp.models.TrainingStatus

The status row to update (its id is used in the update query).

sessionsqlalchemy.orm.Session

Open SQLAlchemy session.

Raises:
sqlalchemy.exc.SQLAlchemyError

If the update or commit fails.

app.training_jobs.train_and_log_for_horizon(data_frame: pandas.DataFrame, horizon: str) None[source]#

Train for one horizon and log summary metrics.

Delegates to the helpers to obtain the shift value and execute training. Supports a legacy call signature by retrying without a shift argument when a TypeError is raised (for backward compatibility). Metrics are normalized via app.training_helpers.unpack_training_result() and logged for observability.

Parameters:
data_framepandas.DataFrame

Full dataset used for training across horizons.

horizonstr

The horizon label to train for.

Raises:
Exception

Any exception raised by the underlying training helpers after the compatibility retry is re-raised to the caller.

app.training_jobs.update_current_horizon(horizon: str) None[source]#

Persist the current horizon label to TrainingStatus.

Parameters:
horizonstr

Horizon label being trained (for example, "5min" or "1h").

Raises:
sqlalchemy.exc.SQLAlchemyError

If the update or commit fails.