# src/app/imputation.py
"""
Impute missing weather observation values on a fixed five-minute cadence.
This module detects and fills gaps in the time series of weather observations
per coordinate. It aligns data to a regular 5-minute timeline and imputes
values either via a weighted average of the most recent actual observations
or, when there are not enough prior points, via time-based linear
interpolation. The result is a consistent dataset suitable for downstream
training and analytics.
See Also
--------
app.weather_ingest : Acquires raw observations that may require imputation.
app.models.WeatherObservation : ORM entity persisted/created by this module.
app.training_helpers : Utilities used by the model training pipeline.
Notes
-----
- Alignment frequency is ``5T`` (five minutes), controlled by ``TIME_FREQUENCY``.
- Weighted imputation relies on at most three prior non-imputed observations
using weights from ``WEIGHT_ARRAY``; otherwise linear interpolation is attempted.
- Target features are listed in ``FEATURE_COLUMNS_TO_IMPUTE`` and are imputed
independently.
- This module depends on pandas, numpy, and SQLAlchemy ORM models.
Examples
--------
>>> # Run a single imputation cycle (database required) # doctest: +SKIP
>>> from app.imputation import run_imputation_cycle # doctest: +SKIP
>>> run_imputation_cycle() # doctest: +SKIP
"""
import logging
from typing import Any, Dict, List, Optional
import numpy as np
import pandas as pd
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from .database import SessionLocal
from .models import WeatherObservation
logger = logging.getLogger(__name__)
FEATURE_COLUMNS_TO_IMPUTE: List[str] = [
"air_temperature",
"wind_speed",
"wind_direction",
"precipitation_amount",
]
TIME_FREQUENCY: str = "5T"
WEIGHT_ARRAY: np.ndarray[Any, np.dtype[np.floating[Any]]] = np.array([0.1, 0.3, 0.6])
MAX_WEIGHT_OBSERVATIONS: int = 3
[docs]
def run_imputation_cycle() -> None:
"""Execute one full imputation pass over stored observations.
Loads observations, aligns each coordinate's series to a 5-minute timeline,
imputes missing rows using a weighted average of recent actual values (when
sufficient history exists) or time-based linear interpolation, and merges
the imputed rows back into the database.
Returns
-------
None
This function produces side effects only (database writes and logging).
Notes
-----
- Errors are logged with context and not re-raised to keep scheduled runs
resilient; callers do not need to wrap this function in try/except.
- Imputed values are marked using the ``is_imputed`` flag.
Examples
--------
>>> # Requires a configured database and ORM models # doctest: +SKIP
>>> from app.imputation import run_imputation_cycle # doctest: +SKIP
>>> run_imputation_cycle() # doctest: +SKIP
"""
logger.info("Starting data imputation cycle...")
try:
with SessionLocal() as session:
all_data_df = _load_all_data(session)
if all_data_df.empty:
logger.info("No data in database to process for imputation.")
return
imputed_records = _process_all_groups(all_data_df)
if imputed_records:
_commit_imputed_records(session, imputed_records)
logger.info(
f"Imputation cycle completed. Merged {len(imputed_records)} "
"imputed rows into the database."
)
else:
logger.info("Imputation cycle completed. No new rows were imputed.")
except SQLAlchemyError as db_error:
logger.error(
"Database error during imputation cycle: %s", db_error, exc_info=True
)
except Exception:
logger.exception("Unexpected error during imputation cycle.")
def _load_all_data(session: Session) -> pd.DataFrame:
"""Load all weather observations into a pandas DataFrame.
Parameters
----------
session : sqlalchemy.orm.Session
Open SQLAlchemy session bound to an engine.
Returns
-------
pandas.DataFrame
DataFrame sorted by latitude, longitude, and timestamp. The
``timestamp`` column is converted to pandas ``datetime64[ns]``. The
``is_imputed`` column is ensured to be boolean; if missing, it is
created and set to ``False`` for all rows.
Raises
------
SQLAlchemyError
If the underlying SQL query fails.
Notes
-----
- Uses ``pandas.read_sql`` over a SQLAlchemy query for convenience.
- The ORM model is :class:`app.models.WeatherObservation`.
"""
assert session is not None, "Session must not be None."
db_bind = session.bind
assert db_bind is not None, "Database session is not bound to an engine."
query = session.query(WeatherObservation).order_by(
WeatherObservation.latitude,
WeatherObservation.longitude,
WeatherObservation.timestamp,
)
data_frame = pd.read_sql(query.statement, db_bind)
data_frame["timestamp"] = pd.to_datetime(data_frame["timestamp"])
if "is_imputed" in data_frame.columns:
data_frame["is_imputed"] = data_frame["is_imputed"].astype(bool)
else:
logger.warning(
"'is_imputed' column not found in DataFrame. Assuming False for all "
"existing rows."
)
data_frame["is_imputed"] = False
return data_frame
def _process_all_groups(all_data_df: pd.DataFrame) -> List[WeatherObservation]:
"""Process imputation for each coordinate group.
Parameters
----------
all_data_df : pandas.DataFrame
DataFrame containing all observations across all coordinates.
Returns
-------
list[app.models.WeatherObservation]
ORM instances representing imputed rows to be merged into the DB.
Notes
-----
- Groups by ``(latitude, longitude)``. Within each group, data is sorted
by timestamp and imputation is applied independently.
"""
imputed_records: List[WeatherObservation] = []
for (latitude, longitude), group_df in all_data_df.groupby(
["latitude", "longitude"]
):
logger.debug(f"Processing imputation for coordinate ({latitude}, {longitude}).")
sorted_group = group_df.sort_values("timestamp").set_index("timestamp")
if sorted_group.empty:
continue
imputed_records.extend(_impute_for_group(sorted_group, latitude, longitude))
return imputed_records
def _impute_for_group(
group_df: pd.DataFrame, latitude: float, longitude: float
) -> List[WeatherObservation]:
"""Impute missing observations for a single coordinate group.
Parameters
----------
group_df : pandas.DataFrame
Grouped DataFrame indexed by ``timestamp`` for one coordinate.
latitude : float
Latitude of the group.
longitude : float
Longitude of the group.
Returns
-------
list[app.models.WeatherObservation]
Imputed ORM rows for the missing timestamps in this group.
Notes
-----
- Only the features in ``FEATURE_COLUMNS_TO_IMPUTE`` are imputed.
- When no relevant feature columns exist, the group is skipped.
"""
min_time = group_df.index.min()
max_time = group_df.index.max()
full_time_index = pd.date_range(start=min_time, end=max_time, freq=TIME_FREQUENCY)
relevant_cols = FEATURE_COLUMNS_TO_IMPUTE + ["is_imputed"]
cols_to_reindex = [col for col in relevant_cols if col in group_df.columns]
if not cols_to_reindex:
logger.warning(
f"No relevant columns for imputation found for ({latitude}, {longitude}). "
"Skipping group."
)
return []
resampled_group = group_df[cols_to_reindex].reindex(full_time_index)
missing_index = resampled_group[
resampled_group[FEATURE_COLUMNS_TO_IMPUTE[0]].isna()
& (~resampled_group["is_imputed"].fillna(False))
].index
imputed_records: List[WeatherObservation] = []
for missing_timestamp in missing_index:
record = _impute_single_timestamp(
group_df, latitude, longitude, missing_timestamp
)
if record:
imputed_records.append(record)
for col_name, value in record.__dict__.items():
if col_name in resampled_group.columns:
resampled_group.loc[missing_timestamp, col_name] = value
return imputed_records
def _get_prior_actual(
group_df: pd.DataFrame, missing_timestamp: pd.Timestamp
) -> pd.DataFrame:
"""Return the most recent non-imputed observations before a timestamp.
Parameters
----------
group_df : pandas.DataFrame
DataFrame indexed by timestamp for a single coordinate.
missing_timestamp : pandas.Timestamp
Timestamp to impute.
Returns
-------
pandas.DataFrame
Up to ``MAX_WEIGHT_OBSERVATIONS`` rows, sorted descending by timestamp.
"""
prior = group_df[(group_df.index < missing_timestamp) & (~group_df["is_imputed"])]
return prior.sort_index(ascending=False).head(MAX_WEIGHT_OBSERVATIONS)
def _apply_weighted_imputation(
recent_actual: pd.DataFrame, imputed_values: Dict[str, Any]
) -> bool:
"""Apply weighted-average imputation on configured features.
Parameters
----------
recent_actual : pandas.DataFrame
The most recent non-imputed observations (typically exactly three).
imputed_values : dict[str, Any]
Mutable mapping updated in place with imputed feature values.
Returns
-------
bool
``True`` if at least one feature received an imputed value, otherwise
``False``.
Notes
-----
- Uses weights from ``WEIGHT_ARRAY`` truncated to the available sample
length and normalized to sum to 1.
- The caller is responsible for ensuring the correct history length; in
- this module, weighted imputation is used only when exactly three prior
actual observations exist.
"""
success = False
for feature in FEATURE_COLUMNS_TO_IMPUTE:
if feature not in recent_actual.columns:
continue
valid_series = recent_actual[feature].dropna()
if not valid_series.empty:
values = valid_series.to_numpy(dtype=float)
weights = WEIGHT_ARRAY[-len(values) :]
normalized = weights / weights.sum()
imputed_values[feature] = float(np.average(values, weights=normalized))
success = True
else:
imputed_values[feature] = None
return success
def _apply_interpolated_imputation(
group_df: pd.DataFrame,
missing_timestamp: pd.Timestamp,
imputed_values: Dict[str, Any],
) -> bool:
"""Apply time-based linear interpolation on configured features.
Parameters
----------
group_df : pandas.DataFrame
DataFrame for a single coordinate, indexed by timestamp.
missing_timestamp : pandas.Timestamp
Timestamp at which to interpolate.
imputed_values : dict[str, Any]
Mutable mapping updated in place with imputed feature values.
Returns
-------
bool
``True`` if at least one feature received an interpolated value,
otherwise ``False``.
"""
success = False
for feature in FEATURE_COLUMNS_TO_IMPUTE:
if feature not in group_df.columns:
continue
series = group_df[feature].copy()
if missing_timestamp not in series.index:
series.at[missing_timestamp] = np.nan
interpolated = series.sort_index().interpolate(method="time")
value = interpolated.get(missing_timestamp)
if pd.notna(value):
imputed_values[feature] = float(value)
success = True
else:
imputed_values[feature] = None
return success
def _impute_single_timestamp(
group_df: pd.DataFrame,
latitude: float,
longitude: float,
missing_timestamp: pd.Timestamp,
) -> Optional[WeatherObservation]:
"""Impute values for a single missing timestamp.
Parameters
----------
group_df : pandas.DataFrame
DataFrame indexed by timestamp for the coordinate.
latitude : float
Latitude for the new observation.
longitude : float
Longitude for the new observation.
missing_timestamp : pandas.Timestamp
Timestamp to fill.
Returns
-------
app.models.WeatherObservation | None
ORM instance when imputation succeeds; ``None`` if no strategy could
produce a value for any configured feature.
Notes
-----
- Uses weighted imputation when exactly ``MAX_WEIGHT_OBSERVATIONS`` prior
actual rows exist; otherwise falls back to linear interpolation if at
least one prior actual row is present.
- All NumPy scalars are converted to native Python types for ORM safety.
"""
recent_actual = _get_prior_actual(group_df, missing_timestamp)
imputed_values: Dict[str, Any] = {
"latitude": latitude,
"longitude": longitude,
"timestamp": missing_timestamp,
"is_imputed": True,
}
if len(recent_actual) == MAX_WEIGHT_OBSERVATIONS:
success = _apply_weighted_imputation(recent_actual, imputed_values)
elif not recent_actual.empty:
logger.debug(
f"Less than {MAX_WEIGHT_OBSERVATIONS} observations for "
f"({latitude},{longitude}) at {missing_timestamp}. Using linear "
"interpolation."
)
success = _apply_interpolated_imputation(
group_df, missing_timestamp, imputed_values
)
else:
logger.debug(
f"No prior observations for ({latitude}, {longitude}) at "
f"{missing_timestamp}."
)
success = False
if success:
# Convert all numpy types in imputed_values to native Python types
for key, value in imputed_values.items():
if hasattr(value, "item"):
imputed_values[key] = value.item()
elif isinstance(value, (np.generic, np.ndarray)):
imputed_values[key] = value.tolist()
return WeatherObservation(**imputed_values)
return None
def _commit_imputed_records(
session: Session, records: List[WeatherObservation]
) -> None:
"""Merge and commit imputed records to the database.
Parameters
----------
session : sqlalchemy.orm.Session
Open SQLAlchemy session bound to an engine.
records : list[app.models.WeatherObservation]
Imputed ORM instances to be merged (upsert-like semantics).
Returns
-------
None
Raises
------
SQLAlchemyError
If the commit fails.
Notes
-----
- Uses ``Session.merge`` to upsert by primary key (timestamp, latitude,
longitude) followed by a single commit for the batch.
"""
assert session is not None, "Session must not be None."
for record in records:
session.merge(record)
session.commit()