Source code for app.imputation

# src/app/imputation.py
"""
Impute missing weather observation values on a fixed five-minute cadence.

This module detects and fills gaps in the time series of weather observations
per coordinate. It aligns data to a regular 5-minute timeline and imputes
values either via a weighted average of the most recent actual observations
or, when there are not enough prior points, via time-based linear
interpolation. The result is a consistent dataset suitable for downstream
training and analytics.

See Also
--------
app.weather_ingest : Acquires raw observations that may require imputation.
app.models.WeatherObservation : ORM entity persisted/created by this module.
app.training_helpers : Utilities used by the model training pipeline.

Notes
-----
- Alignment frequency is ``5T`` (five minutes), controlled by ``TIME_FREQUENCY``.
- Weighted imputation relies on at most three prior non-imputed observations
  using weights from ``WEIGHT_ARRAY``; otherwise linear interpolation is attempted.
- Target features are listed in ``FEATURE_COLUMNS_TO_IMPUTE`` and are imputed
  independently.
- This module depends on pandas, numpy, and SQLAlchemy ORM models.

Examples
--------
>>> # Run a single imputation cycle (database required)   # doctest: +SKIP
>>> from app.imputation import run_imputation_cycle       # doctest: +SKIP
>>> run_imputation_cycle()                                # doctest: +SKIP
"""


import logging
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session

from .database import SessionLocal
from .models import WeatherObservation

logger = logging.getLogger(__name__)

FEATURE_COLUMNS_TO_IMPUTE: List[str] = [
    "air_temperature",
    "wind_speed",
    "wind_direction",
    "precipitation_amount",
]

TIME_FREQUENCY: str = "5T"
WEIGHT_ARRAY: np.ndarray[Any, np.dtype[np.floating[Any]]] = np.array([0.1, 0.3, 0.6])
MAX_WEIGHT_OBSERVATIONS: int = 3


[docs] def run_imputation_cycle() -> None: """Execute one full imputation pass over stored observations. Loads observations, aligns each coordinate's series to a 5-minute timeline, imputes missing rows using a weighted average of recent actual values (when sufficient history exists) or time-based linear interpolation, and merges the imputed rows back into the database. Returns ------- None This function produces side effects only (database writes and logging). Notes ----- - Errors are logged with context and not re-raised to keep scheduled runs resilient; callers do not need to wrap this function in try/except. - Imputed values are marked using the ``is_imputed`` flag. Examples -------- >>> # Requires a configured database and ORM models # doctest: +SKIP >>> from app.imputation import run_imputation_cycle # doctest: +SKIP >>> run_imputation_cycle() # doctest: +SKIP """ logger.info("Starting data imputation cycle...") try: with SessionLocal() as session: all_data_df = _load_all_data(session) if all_data_df.empty: logger.info("No data in database to process for imputation.") return imputed_records = _process_all_groups(all_data_df) if imputed_records: _commit_imputed_records(session, imputed_records) logger.info( f"Imputation cycle completed. Merged {len(imputed_records)} " "imputed rows into the database." ) else: logger.info("Imputation cycle completed. No new rows were imputed.") except SQLAlchemyError as db_error: logger.error( "Database error during imputation cycle: %s", db_error, exc_info=True ) except Exception: logger.exception("Unexpected error during imputation cycle.")
def _load_all_data(session: Session) -> pd.DataFrame: """Load all weather observations into a pandas DataFrame. Parameters ---------- session : sqlalchemy.orm.Session Open SQLAlchemy session bound to an engine. Returns ------- pandas.DataFrame DataFrame sorted by latitude, longitude, and timestamp. The ``timestamp`` column is converted to pandas ``datetime64[ns]``. The ``is_imputed`` column is ensured to be boolean; if missing, it is created and set to ``False`` for all rows. Raises ------ SQLAlchemyError If the underlying SQL query fails. Notes ----- - Uses ``pandas.read_sql`` over a SQLAlchemy query for convenience. - The ORM model is :class:`app.models.WeatherObservation`. """ assert session is not None, "Session must not be None." db_bind = session.bind assert db_bind is not None, "Database session is not bound to an engine." query = session.query(WeatherObservation).order_by( WeatherObservation.latitude, WeatherObservation.longitude, WeatherObservation.timestamp, ) data_frame = pd.read_sql(query.statement, db_bind) data_frame["timestamp"] = pd.to_datetime(data_frame["timestamp"]) if "is_imputed" in data_frame.columns: data_frame["is_imputed"] = data_frame["is_imputed"].astype(bool) else: logger.warning( "'is_imputed' column not found in DataFrame. Assuming False for all " "existing rows." ) data_frame["is_imputed"] = False return data_frame def _process_all_groups(all_data_df: pd.DataFrame) -> List[WeatherObservation]: """Process imputation for each coordinate group. Parameters ---------- all_data_df : pandas.DataFrame DataFrame containing all observations across all coordinates. Returns ------- list[app.models.WeatherObservation] ORM instances representing imputed rows to be merged into the DB. Notes ----- - Groups by ``(latitude, longitude)``. Within each group, data is sorted by timestamp and imputation is applied independently. """ imputed_records: List[WeatherObservation] = [] for (latitude, longitude), group_df in all_data_df.groupby( ["latitude", "longitude"] ): logger.debug(f"Processing imputation for coordinate ({latitude}, {longitude}).") sorted_group = group_df.sort_values("timestamp").set_index("timestamp") if sorted_group.empty: continue imputed_records.extend(_impute_for_group(sorted_group, latitude, longitude)) return imputed_records def _impute_for_group( group_df: pd.DataFrame, latitude: float, longitude: float ) -> List[WeatherObservation]: """Impute missing observations for a single coordinate group. Parameters ---------- group_df : pandas.DataFrame Grouped DataFrame indexed by ``timestamp`` for one coordinate. latitude : float Latitude of the group. longitude : float Longitude of the group. Returns ------- list[app.models.WeatherObservation] Imputed ORM rows for the missing timestamps in this group. Notes ----- - Only the features in ``FEATURE_COLUMNS_TO_IMPUTE`` are imputed. - When no relevant feature columns exist, the group is skipped. """ min_time = group_df.index.min() max_time = group_df.index.max() full_time_index = pd.date_range(start=min_time, end=max_time, freq=TIME_FREQUENCY) relevant_cols = FEATURE_COLUMNS_TO_IMPUTE + ["is_imputed"] cols_to_reindex = [col for col in relevant_cols if col in group_df.columns] if not cols_to_reindex: logger.warning( f"No relevant columns for imputation found for ({latitude}, {longitude}). " "Skipping group." ) return [] resampled_group = group_df[cols_to_reindex].reindex(full_time_index) missing_index = resampled_group[ resampled_group[FEATURE_COLUMNS_TO_IMPUTE[0]].isna() & (~resampled_group["is_imputed"].fillna(False)) ].index imputed_records: List[WeatherObservation] = [] for missing_timestamp in missing_index: record = _impute_single_timestamp( group_df, latitude, longitude, missing_timestamp ) if record: imputed_records.append(record) for col_name, value in record.__dict__.items(): if col_name in resampled_group.columns: resampled_group.loc[missing_timestamp, col_name] = value return imputed_records def _get_prior_actual( group_df: pd.DataFrame, missing_timestamp: pd.Timestamp ) -> pd.DataFrame: """Return the most recent non-imputed observations before a timestamp. Parameters ---------- group_df : pandas.DataFrame DataFrame indexed by timestamp for a single coordinate. missing_timestamp : pandas.Timestamp Timestamp to impute. Returns ------- pandas.DataFrame Up to ``MAX_WEIGHT_OBSERVATIONS`` rows, sorted descending by timestamp. """ prior = group_df[(group_df.index < missing_timestamp) & (~group_df["is_imputed"])] return prior.sort_index(ascending=False).head(MAX_WEIGHT_OBSERVATIONS) def _apply_weighted_imputation( recent_actual: pd.DataFrame, imputed_values: Dict[str, Any] ) -> bool: """Apply weighted-average imputation on configured features. Parameters ---------- recent_actual : pandas.DataFrame The most recent non-imputed observations (typically exactly three). imputed_values : dict[str, Any] Mutable mapping updated in place with imputed feature values. Returns ------- bool ``True`` if at least one feature received an imputed value, otherwise ``False``. Notes ----- - Uses weights from ``WEIGHT_ARRAY`` truncated to the available sample length and normalized to sum to 1. - The caller is responsible for ensuring the correct history length; in - this module, weighted imputation is used only when exactly three prior actual observations exist. """ success = False for feature in FEATURE_COLUMNS_TO_IMPUTE: if feature not in recent_actual.columns: continue valid_series = recent_actual[feature].dropna() if not valid_series.empty: values = valid_series.to_numpy(dtype=float) weights = WEIGHT_ARRAY[-len(values) :] normalized = weights / weights.sum() imputed_values[feature] = float(np.average(values, weights=normalized)) success = True else: imputed_values[feature] = None return success def _apply_interpolated_imputation( group_df: pd.DataFrame, missing_timestamp: pd.Timestamp, imputed_values: Dict[str, Any], ) -> bool: """Apply time-based linear interpolation on configured features. Parameters ---------- group_df : pandas.DataFrame DataFrame for a single coordinate, indexed by timestamp. missing_timestamp : pandas.Timestamp Timestamp at which to interpolate. imputed_values : dict[str, Any] Mutable mapping updated in place with imputed feature values. Returns ------- bool ``True`` if at least one feature received an interpolated value, otherwise ``False``. """ success = False for feature in FEATURE_COLUMNS_TO_IMPUTE: if feature not in group_df.columns: continue series = group_df[feature].copy() if missing_timestamp not in series.index: series.at[missing_timestamp] = np.nan interpolated = series.sort_index().interpolate(method="time") value = interpolated.get(missing_timestamp) if pd.notna(value): imputed_values[feature] = float(value) success = True else: imputed_values[feature] = None return success def _impute_single_timestamp( group_df: pd.DataFrame, latitude: float, longitude: float, missing_timestamp: pd.Timestamp, ) -> Optional[WeatherObservation]: """Impute values for a single missing timestamp. Parameters ---------- group_df : pandas.DataFrame DataFrame indexed by timestamp for the coordinate. latitude : float Latitude for the new observation. longitude : float Longitude for the new observation. missing_timestamp : pandas.Timestamp Timestamp to fill. Returns ------- app.models.WeatherObservation | None ORM instance when imputation succeeds; ``None`` if no strategy could produce a value for any configured feature. Notes ----- - Uses weighted imputation when exactly ``MAX_WEIGHT_OBSERVATIONS`` prior actual rows exist; otherwise falls back to linear interpolation if at least one prior actual row is present. - All NumPy scalars are converted to native Python types for ORM safety. """ recent_actual = _get_prior_actual(group_df, missing_timestamp) imputed_values: Dict[str, Any] = { "latitude": latitude, "longitude": longitude, "timestamp": missing_timestamp, "is_imputed": True, } if len(recent_actual) == MAX_WEIGHT_OBSERVATIONS: success = _apply_weighted_imputation(recent_actual, imputed_values) elif not recent_actual.empty: logger.debug( f"Less than {MAX_WEIGHT_OBSERVATIONS} observations for " f"({latitude},{longitude}) at {missing_timestamp}. Using linear " "interpolation." ) success = _apply_interpolated_imputation( group_df, missing_timestamp, imputed_values ) else: logger.debug( f"No prior observations for ({latitude}, {longitude}) at " f"{missing_timestamp}." ) success = False if success: # Convert all numpy types in imputed_values to native Python types for key, value in imputed_values.items(): if hasattr(value, "item"): imputed_values[key] = value.item() elif isinstance(value, (np.generic, np.ndarray)): imputed_values[key] = value.tolist() return WeatherObservation(**imputed_values) return None def _commit_imputed_records( session: Session, records: List[WeatherObservation] ) -> None: """Merge and commit imputed records to the database. Parameters ---------- session : sqlalchemy.orm.Session Open SQLAlchemy session bound to an engine. records : list[app.models.WeatherObservation] Imputed ORM instances to be merged (upsert-like semantics). Returns ------- None Raises ------ SQLAlchemyError If the commit fails. Notes ----- - Uses ``Session.merge`` to upsert by primary key (timestamp, latitude, longitude) followed by a single commit for the batch. """ assert session is not None, "Session must not be None." for record in records: session.merge(record) session.commit()