app.ml_train module#

Standalone ML training job executed inside the Slurm cluster.

This module encapsulates the horizon-wise training workflow used by the platform’s MLOps pipeline. It loads non-imputed observations for selected coordinates, prepares future targets per forecasting horizon, trains a baseline Scikit-learn model and a simple PyTorch regressor, persists model artifacts under the shared /data volume, and records scores in the database for monitoring and visualization.

See Also#

app.slurm_job_trigger.create_and_dispatch_training_job

app.slurm_job_trigger.trigger_slurm_job

app.ml_utils.get_latest_training_logs

app.ml_utils.get_historical_scores

app.models.TrainingLog

app.models.TrainingStatus

Notes#

  • Primary role: execute training for configured horizons and persist both models and TrainingLog entries, updating the TrainingStatus singleton throughout execution.

  • Key dependencies: a reachable database via DATABASE_URL, a writable shared volume at /data, and environment variables controlling training.

  • Invariants: DATABASE_URL must be set; the shared /data volume must exist and be writable; minimum data thresholds determine training viability.

Examples#

>>> # Executed by Slurm via the dispatcher
>>> # $ python3 /data/app_code_for_slurm/ml_train.py
>>> from app.ml_train import main
>>> main()
class app.ml_train.Base(*args: Any, **kwargs: Any)[source]#

Bases: DeclarativeBase

Base class for ORM models in this standalone job.

Notes

The standalone training job defines a minimal subset of ORM models to avoid importing the main application’s metadata. Tables mirror the fields in app.models sufficiently for writing logs and status.

class app.ml_train.SimpleRegressionNet(*args: Any, **kwargs: Any)[source]#

Bases: Module

Simple feed-forward regression network for 1-step regression.

Parameters:
input_dimint

Number of input features (must be positive).

Examples

>>> import torch
>>> from app.ml_train import SimpleRegressionNet
>>> net = SimpleRegressionNet(input_dim=4)
>>> x = torch.randn(2, 4)
>>> y = net(x)
>>> y.shape == (2, 1)
True
forward(x: torch.Tensor) torch.Tensor[source]#

Compute predictions for input features.

Parameters:
xtorch.Tensor

Input batch with shape (batch_size, input_dim).

Returns:
torch.Tensor

Predicted target values with shape (batch_size, 1).

class app.ml_train.TrainingLog(*args: Any, **kwargs: Any)[source]#

Bases: Base

Log entry for one training run and horizon.

Attributes:
idstr

UUID identifier of the run.

timestampdatetime

When the run completed (UTC).

horizonstr

Unique horizon key (often "<coord>_<horizon>").

sklearn_scorefloat

R^2 score for the Sklearn model on the test split.

pytorch_scorefloat

R^2 score for the PyTorch model on the test split.

data_countint

Number of samples used for this horizon after preprocessing.

coord_latitudefloat | None

Latitude of the coordinate or None for aggregate runs.

coord_longitudefloat | None

Longitude of the coordinate or None for aggregate runs.

horizon_labelstr | None

One of {"5min", "1h", "12h", "24h"}.

class app.ml_train.TrainingStatus(*args: Any, **kwargs: Any)[source]#

Bases: Base

Singleton table reflecting the current training state.

Attributes:
idint

Primary key (always 1 in this job flow).

is_trainingbool

Whether a training job is running.

last_trained_atdatetime | None

Timestamp of the last successful training completion.

train_countint

Number of training runs since system start or initialization.

current_horizonstr | None

Human-readable status message or horizon marker.

class app.ml_train.WeatherObservation(*args: Any, **kwargs: Any)[source]#

Bases: Base

Observational weather record (non-imputed preferred for training).

Attributes:
timestampdatetime

Unique timestamp for the observation (UTC).

latitudefloat

Coordinate latitude in decimal degrees.

longitudefloat

Coordinate longitude in decimal degrees.

air_temperaturefloat | None

Air temperature in degrees Celsius.

wind_speedfloat | None

Wind speed in m/s.

wind_directionfloat | None

Wind direction in degrees.

cloud_area_fractionfloat | None

Cloud cover fraction (0–1) when available.

precipitation_amountfloat | None

Precipitation amount in mm for the interval.

is_imputedbool

Whether the record was imputed (training uses non-imputed).

app.ml_train.get_db_session() sqlalchemy.orm.Session[source]#

Create and return a new SQLAlchemy session.

Returns:
sqlalchemy.orm.Session

Session bound to the engine specified by DATABASE_URL.

Raises:
ValueError

If DATABASE_URL is not defined in the environment.

sqlalchemy.exc.SQLAlchemyError

If the engine cannot be created or the metadata initialization fails.

Examples

>>> import os
>>> os.environ["DATABASE_URL"] = "sqlite:///:memory:"
>>> from app.ml_train import get_db_session
>>> s = get_db_session()
>>> s.close()
app.ml_train.load_training_data(session: sqlalchemy.orm.Session, latitude: float, longitude: float) pandas.DataFrame[source]#

Load non-imputed observations for one coordinate.

Parameters:
sessionsqlalchemy.orm.Session

Open SQLAlchemy session.

latitudefloat

Coordinate latitude in decimal degrees.

longitudefloat

Coordinate longitude in decimal degrees.

Returns:
pandas.DataFrame

DataFrame with a timestamp column parsed to datetime64[ns]. Empty DataFrame if no non-imputed rows are available.

Examples

>>> # Requires seeded DB with weather_observations
>>> from app.ml_train import load_training_data
>>> df = load_training_data(session, 57.70, 11.90)
>>> isinstance(df.empty, bool)
True
app.ml_train.main() None[source]#

Entry point for the standalone ML training job.

Notes

  • Coordinates are fetched (preferentially central ones) and iterated. For each horizon, data are prepared, models trained, artifacts saved, and a TrainingLog row written.

  • Status updates are written to TrainingStatus throughout the run.

Examples#

>>> # Executed inside Slurm job container
>>> from app.ml_train import main
>>> main()
app.ml_train.prepare_horizon_data(df: pandas.DataFrame, horizon_label: str, shift_steps: int) Tuple[numpy.typing.NDArray.numpy.float64, numpy.typing.NDArray.numpy.float64, int][source]#

Prepare features and targets for a specific horizon.

Parameters:
dfpandas.DataFrame

Input data sorted/filtered per coordinate.

horizon_labelstr

Label for the horizon (e.g., "5min", "1h").

shift_stepsint

Positive number of steps the target is shifted into the future.

Returns:
tuple[numpy.ndarray, numpy.ndarray, int]

Tuple (X, y, count) where count is the number of samples retained after dropping NA rows for the chosen horizon.

Examples

>>> import pandas as pd
>>> from app.ml_train import prepare_horizon_data
>>> ts = pd.date_range("2024-01-01", periods=12, freq="5min")
>>> df = pd.DataFrame({
...     "timestamp": ts,
...     "air_temperature": list(range(12)),
...     "wind_speed": [1.0]*12,
...     "wind_direction": [0.0]*12,
...     "precipitation_amount": [0.0]*12,
... })
>>> X, y, count = prepare_horizon_data(df, "5min", 1)
>>> count >= 10
True
app.ml_train.train_and_save_model(X_train: numpy.typing.NDArray.numpy.float64, y_train: numpy.typing.NDArray.numpy.float64, X_test: numpy.typing.NDArray.numpy.float64, y_test: numpy.typing.NDArray.numpy.float64, horizon: str, coord_str: str) Tuple[float, float][source]#

Train Ridge and PyTorch models, persist them, and return R² scores.

Parameters:
X_train, y_train, X_test, y_testnumpy.ndarray

Training and test splits.

horizonstr

Horizon label used for filenames and logging.

coord_strstr

Coordinate identifier (e.g., "lat57_7000_lon11_9000").

Returns:
tuple[float, float]

(sklearn_r2, pytorch_r2) scores on the test split.

Raises:
OSError

If persisting the model artifacts to disk fails.

Notes

  • Models are saved under /data/models/<coord_str>/ with deterministic filenames per horizon and framework.

Examples

>>> import numpy as np
>>> from app.ml_train import train_and_save_model
>>> X = np.random.rand(100, 4); y = np.random.rand(100)
>>> s = int(len(X)*0.8)
>>> train_and_save_model(X[:s], y[:s], X[s:], y[s:],
...                      "5min", "lat57_7000_lon11_9000")
(..., ...)
app.ml_train.update_training_status(session: sqlalchemy.orm.Session, is_training: bool, current_status_message: str | None = None, increment_count: bool = False) None[source]#

Update the training_status row with the current state.

Parameters:
sessionsqlalchemy.orm.Session

Open SQLAlchemy session.

is_trainingbool

Flag indicating whether a training is in progress.

current_status_messagestr | None, optional

Optional status message or horizon marker.

increment_countbool, optional

Whether to increment the total training counter.

Examples

>>> # Within an open SQLAlchemy session
>>> from app.ml_train import update_training_status
>>> update_training_status(session, True, "Job started")