app.ml_train module#
Standalone ML training job executed inside the Slurm cluster.
This module encapsulates the horizon-wise training workflow used by the
platform’s MLOps pipeline. It loads non-imputed observations for selected
coordinates, prepares future targets per forecasting horizon, trains a
baseline Scikit-learn model and a simple PyTorch regressor, persists model
artifacts under the shared /data volume, and records scores in the
database for monitoring and visualization.
See Also#
app.slurm_job_trigger.create_and_dispatch_training_job
app.slurm_job_trigger.trigger_slurm_job
app.ml_utils.get_latest_training_logs
Notes#
Primary role: execute training for configured horizons and persist both models and TrainingLog entries, updating the TrainingStatus singleton throughout execution.
Key dependencies: a reachable database via
DATABASE_URL, a writable shared volume at/data, and environment variables controlling training.Invariants:
DATABASE_URLmust be set; the shared/datavolume must exist and be writable; minimum data thresholds determine training viability.
Examples#
>>> # Executed by Slurm via the dispatcher
>>> # $ python3 /data/app_code_for_slurm/ml_train.py
>>> from app.ml_train import main
>>> main()
- class app.ml_train.Base(*args: Any, **kwargs: Any)[source]#
Bases:
DeclarativeBaseBase class for ORM models in this standalone job.
Notes
The standalone training job defines a minimal subset of ORM models to avoid importing the main application’s metadata. Tables mirror the fields in
app.modelssufficiently for writing logs and status.
- class app.ml_train.SimpleRegressionNet(*args: Any, **kwargs: Any)[source]#
Bases:
ModuleSimple feed-forward regression network for 1-step regression.
- Parameters:
- input_dim
int Number of input features (must be positive).
- input_dim
Examples
>>> import torch >>> from app.ml_train import SimpleRegressionNet >>> net = SimpleRegressionNet(input_dim=4) >>> x = torch.randn(2, 4) >>> y = net(x) >>> y.shape == (2, 1) True
- forward(x: torch.Tensor) torch.Tensor[source]#
Compute predictions for input features.
- Parameters:
- x
torch.Tensor Input batch with shape
(batch_size, input_dim).
- x
- Returns:
torch.TensorPredicted target values with shape
(batch_size, 1).
- class app.ml_train.TrainingLog(*args: Any, **kwargs: Any)[source]#
Bases:
BaseLog entry for one training run and horizon.
- Attributes:
- id
str UUID identifier of the run.
- timestamp
datetime When the run completed (UTC).
- horizon
str Unique horizon key (often
"<coord>_<horizon>").- sklearn_score
float R^2 score for the Sklearn model on the test split.
- pytorch_score
float R^2 score for the PyTorch model on the test split.
- data_count
int Number of samples used for this horizon after preprocessing.
- coord_latitude
float|None Latitude of the coordinate or
Nonefor aggregate runs.- coord_longitude
float|None Longitude of the coordinate or
Nonefor aggregate runs.- horizon_label
str|None One of
{"5min", "1h", "12h", "24h"}.
- id
- class app.ml_train.TrainingStatus(*args: Any, **kwargs: Any)[source]#
Bases:
BaseSingleton table reflecting the current training state.
- Attributes:
- id
int Primary key (always 1 in this job flow).
- is_trainingbool
Whether a training job is running.
- last_trained_at
datetime|None Timestamp of the last successful training completion.
- train_count
int Number of training runs since system start or initialization.
- current_horizon
str|None Human-readable status message or horizon marker.
- id
- class app.ml_train.WeatherObservation(*args: Any, **kwargs: Any)[source]#
Bases:
BaseObservational weather record (non-imputed preferred for training).
- Attributes:
- timestamp
datetime Unique timestamp for the observation (UTC).
- latitude
float Coordinate latitude in decimal degrees.
- longitude
float Coordinate longitude in decimal degrees.
- air_temperature
float|None Air temperature in degrees Celsius.
- wind_speed
float|None Wind speed in m/s.
- wind_direction
float|None Wind direction in degrees.
- cloud_area_fraction
float|None Cloud cover fraction (0–1) when available.
- precipitation_amount
float|None Precipitation amount in mm for the interval.
- is_imputedbool
Whether the record was imputed (training uses non-imputed).
- timestamp
- app.ml_train.get_db_session() sqlalchemy.orm.Session[source]#
Create and return a new SQLAlchemy session.
- Returns:
sqlalchemy.orm.SessionSession bound to the engine specified by
DATABASE_URL.
- Raises:
ValueErrorIf
DATABASE_URLis not defined in the environment.sqlalchemy.exc.SQLAlchemyErrorIf the engine cannot be created or the metadata initialization fails.
Examples
>>> import os >>> os.environ["DATABASE_URL"] = "sqlite:///:memory:" >>> from app.ml_train import get_db_session >>> s = get_db_session() >>> s.close()
- app.ml_train.load_training_data(session: sqlalchemy.orm.Session, latitude: float, longitude: float) pandas.DataFrame[source]#
Load non-imputed observations for one coordinate.
- Parameters:
- session
sqlalchemy.orm.Session Open SQLAlchemy session.
- latitude
float Coordinate latitude in decimal degrees.
- longitude
float Coordinate longitude in decimal degrees.
- session
- Returns:
pandas.DataFrameDataFrame with a
timestampcolumn parsed todatetime64[ns]. Empty DataFrame if no non-imputed rows are available.
Examples
>>> # Requires seeded DB with weather_observations >>> from app.ml_train import load_training_data >>> df = load_training_data(session, 57.70, 11.90) >>> isinstance(df.empty, bool) True
- app.ml_train.main() None[source]#
Entry point for the standalone ML training job.
Notes
Coordinates are fetched (preferentially central ones) and iterated. For each horizon, data are prepared, models trained, artifacts saved, and a
TrainingLogrow written.Status updates are written to
TrainingStatusthroughout the run.
Examples#
>>> # Executed inside Slurm job container >>> from app.ml_train import main >>> main()
- app.ml_train.prepare_horizon_data(df: pandas.DataFrame, horizon_label: str, shift_steps: int) Tuple[numpy.typing.NDArray.numpy.float64, numpy.typing.NDArray.numpy.float64, int][source]#
Prepare features and targets for a specific horizon.
- Parameters:
- df
pandas.DataFrame Input data sorted/filtered per coordinate.
- horizon_label
str Label for the horizon (e.g.,
"5min","1h").- shift_steps
int Positive number of steps the target is shifted into the future.
- df
- Returns:
tuple[numpy.ndarray,numpy.ndarray,int]Tuple
(X, y, count)where count is the number of samples retained after dropping NA rows for the chosen horizon.
Examples
>>> import pandas as pd >>> from app.ml_train import prepare_horizon_data >>> ts = pd.date_range("2024-01-01", periods=12, freq="5min") >>> df = pd.DataFrame({ ... "timestamp": ts, ... "air_temperature": list(range(12)), ... "wind_speed": [1.0]*12, ... "wind_direction": [0.0]*12, ... "precipitation_amount": [0.0]*12, ... }) >>> X, y, count = prepare_horizon_data(df, "5min", 1) >>> count >= 10 True
- app.ml_train.train_and_save_model(X_train: numpy.typing.NDArray.numpy.float64, y_train: numpy.typing.NDArray.numpy.float64, X_test: numpy.typing.NDArray.numpy.float64, y_test: numpy.typing.NDArray.numpy.float64, horizon: str, coord_str: str) Tuple[float, float][source]#
Train Ridge and PyTorch models, persist them, and return R² scores.
- Parameters:
- X_train, y_train, X_test, y_test
numpy.ndarray Training and test splits.
- horizon
str Horizon label used for filenames and logging.
- coord_str
str Coordinate identifier (e.g.,
"lat57_7000_lon11_9000").
- X_train, y_train, X_test, y_test
- Returns:
- Raises:
OSErrorIf persisting the model artifacts to disk fails.
Notes
Models are saved under
/data/models/<coord_str>/with deterministic filenames per horizon and framework.
Examples
>>> import numpy as np >>> from app.ml_train import train_and_save_model >>> X = np.random.rand(100, 4); y = np.random.rand(100) >>> s = int(len(X)*0.8) >>> train_and_save_model(X[:s], y[:s], X[s:], y[s:], ... "5min", "lat57_7000_lon11_9000") (..., ...)
- app.ml_train.update_training_status(session: sqlalchemy.orm.Session, is_training: bool, current_status_message: str | None = None, increment_count: bool = False) None[source]#
Update the
training_statusrow with the current state.- Parameters:
- session
sqlalchemy.orm.Session Open SQLAlchemy session.
- is_trainingbool
Flag indicating whether a training is in progress.
- current_status_message
str|None,optional Optional status message or horizon marker.
- increment_countbool,
optional Whether to increment the total training counter.
- session
Examples
>>> # Within an open SQLAlchemy session >>> from app.ml_train import update_training_status >>> update_training_status(session, True, "Job started")