# src/app/imputation.py"""Impute missing weather observation values on a fixed five-minute cadence.This module detects and fills gaps in the time series of weather observationsper coordinate. It aligns data to a regular 5-minute timeline and imputesvalues either via a weighted average of the most recent actual observationsor, when there are not enough prior points, via time-based linearinterpolation. The result is a consistent dataset suitable for downstreamtraining and analytics.See Also--------app.weather_ingest : Acquires raw observations that may require imputation.app.models.WeatherObservation : ORM entity persisted/created by this module.app.training_helpers : Utilities used by the model training pipeline.Notes------ Alignment frequency is ``5T`` (five minutes), controlled by ``TIME_FREQUENCY``.- Weighted imputation relies on at most three prior non-imputed observations using weights from ``WEIGHT_ARRAY``; otherwise linear interpolation is attempted.- Target features are listed in ``FEATURE_COLUMNS_TO_IMPUTE`` and are imputed independently.- This module depends on pandas, numpy, and SQLAlchemy ORM models.Examples-------->>> # Run a single imputation cycle (database required) # doctest: +SKIP>>> from app.imputation import run_imputation_cycle # doctest: +SKIP>>> run_imputation_cycle() # doctest: +SKIP"""importloggingfromtypingimportAny,Dict,List,Optionalimportnumpyasnpimportpandasaspdfromsqlalchemy.excimportSQLAlchemyErrorfromsqlalchemy.ormimportSessionfrom.databaseimportSessionLocalfrom.modelsimportWeatherObservationlogger=logging.getLogger(__name__)FEATURE_COLUMNS_TO_IMPUTE:List[str]=["air_temperature","wind_speed","wind_direction","precipitation_amount",]TIME_FREQUENCY:str="5T"WEIGHT_ARRAY:np.ndarray[Any,np.dtype[np.floating[Any]]]=np.array([0.1,0.3,0.6])MAX_WEIGHT_OBSERVATIONS:int=3defrun_imputation_cycle()->None:"""Execute one full imputation pass over stored observations. Loads observations, aligns each coordinate's series to a 5-minute timeline, imputes missing rows using a weighted average of recent actual values (when sufficient history exists) or time-based linear interpolation, and merges the imputed rows back into the database. Returns ------- None This function produces side effects only (database writes and logging). Notes ----- - Errors are logged with context and not re-raised to keep scheduled runs resilient; callers do not need to wrap this function in try/except. - Imputed values are marked using the ``is_imputed`` flag. Examples -------- >>> # Requires a configured database and ORM models # doctest: +SKIP >>> from app.imputation import run_imputation_cycle # doctest: +SKIP >>> run_imputation_cycle() # doctest: +SKIP """logger.info("Starting data imputation cycle...")try:withSessionLocal()assession:all_data_df=_load_all_data(session)ifall_data_df.empty:logger.info("No data in database to process for imputation.")returnimputed_records=_process_all_groups(all_data_df)ifimputed_records:_commit_imputed_records(session,imputed_records)logger.info(f"Imputation cycle completed. Merged {len(imputed_records)} ""imputed rows into the database.")else:logger.info("Imputation cycle completed. No new rows were imputed.")exceptSQLAlchemyErrorasdb_error:logger.error("Database error during imputation cycle: %s",db_error,exc_info=True)exceptException:logger.exception("Unexpected error during imputation cycle.")def_load_all_data(session:Session)->pd.DataFrame:"""Load all weather observations into a pandas DataFrame. Parameters ---------- session : sqlalchemy.orm.Session Open SQLAlchemy session bound to an engine. Returns ------- pandas.DataFrame DataFrame sorted by latitude, longitude, and timestamp. The ``timestamp`` column is converted to pandas ``datetime64[ns]``. The ``is_imputed`` column is ensured to be boolean; if missing, it is created and set to ``False`` for all rows. Raises ------ SQLAlchemyError If the underlying SQL query fails. Notes ----- - Uses ``pandas.read_sql`` over a SQLAlchemy query for convenience. - The ORM model is :class:`app.models.WeatherObservation`. """assertsessionisnotNone,"Session must not be None."db_bind=session.bindassertdb_bindisnotNone,"Database session is not bound to an engine."query=session.query(WeatherObservation).order_by(WeatherObservation.latitude,WeatherObservation.longitude,WeatherObservation.timestamp,)data_frame=pd.read_sql(query.statement,db_bind)data_frame["timestamp"]=pd.to_datetime(data_frame["timestamp"])if"is_imputed"indata_frame.columns:data_frame["is_imputed"]=data_frame["is_imputed"].astype(bool)else:logger.warning("'is_imputed' column not found in DataFrame. Assuming False for all ""existing rows.")data_frame["is_imputed"]=Falsereturndata_framedef_process_all_groups(all_data_df:pd.DataFrame)->List[WeatherObservation]:"""Process imputation for each coordinate group. Parameters ---------- all_data_df : pandas.DataFrame DataFrame containing all observations across all coordinates. Returns ------- list[app.models.WeatherObservation] ORM instances representing imputed rows to be merged into the DB. Notes ----- - Groups by ``(latitude, longitude)``. Within each group, data is sorted by timestamp and imputation is applied independently. """imputed_records:List[WeatherObservation]=[]for(latitude,longitude),group_dfinall_data_df.groupby(["latitude","longitude"]):logger.debug(f"Processing imputation for coordinate ({latitude}, {longitude}).")sorted_group=group_df.sort_values("timestamp").set_index("timestamp")ifsorted_group.empty:continueimputed_records.extend(_impute_for_group(sorted_group,latitude,longitude))returnimputed_recordsdef_impute_for_group(group_df:pd.DataFrame,latitude:float,longitude:float)->List[WeatherObservation]:"""Impute missing observations for a single coordinate group. Parameters ---------- group_df : pandas.DataFrame Grouped DataFrame indexed by ``timestamp`` for one coordinate. latitude : float Latitude of the group. longitude : float Longitude of the group. Returns ------- list[app.models.WeatherObservation] Imputed ORM rows for the missing timestamps in this group. Notes ----- - Only the features in ``FEATURE_COLUMNS_TO_IMPUTE`` are imputed. - When no relevant feature columns exist, the group is skipped. """min_time=group_df.index.min()max_time=group_df.index.max()full_time_index=pd.date_range(start=min_time,end=max_time,freq=TIME_FREQUENCY)relevant_cols=FEATURE_COLUMNS_TO_IMPUTE+["is_imputed"]cols_to_reindex=[colforcolinrelevant_colsifcolingroup_df.columns]ifnotcols_to_reindex:logger.warning(f"No relevant columns for imputation found for ({latitude}, {longitude}). ""Skipping group.")return[]resampled_group=group_df[cols_to_reindex].reindex(full_time_index)missing_index=resampled_group[resampled_group[FEATURE_COLUMNS_TO_IMPUTE[0]].isna()&(~resampled_group["is_imputed"].fillna(False))].indeximputed_records:List[WeatherObservation]=[]formissing_timestampinmissing_index:record=_impute_single_timestamp(group_df,latitude,longitude,missing_timestamp)ifrecord:imputed_records.append(record)forcol_name,valueinrecord.__dict__.items():ifcol_nameinresampled_group.columns:resampled_group.loc[missing_timestamp,col_name]=valuereturnimputed_recordsdef_get_prior_actual(group_df:pd.DataFrame,missing_timestamp:pd.Timestamp)->pd.DataFrame:"""Return the most recent non-imputed observations before a timestamp. Parameters ---------- group_df : pandas.DataFrame DataFrame indexed by timestamp for a single coordinate. missing_timestamp : pandas.Timestamp Timestamp to impute. Returns ------- pandas.DataFrame Up to ``MAX_WEIGHT_OBSERVATIONS`` rows, sorted descending by timestamp. """prior=group_df[(group_df.index<missing_timestamp)&(~group_df["is_imputed"])]returnprior.sort_index(ascending=False).head(MAX_WEIGHT_OBSERVATIONS)def_apply_weighted_imputation(recent_actual:pd.DataFrame,imputed_values:Dict[str,Any])->bool:"""Apply weighted-average imputation on configured features. Parameters ---------- recent_actual : pandas.DataFrame The most recent non-imputed observations (typically exactly three). imputed_values : dict[str, Any] Mutable mapping updated in place with imputed feature values. Returns ------- bool ``True`` if at least one feature received an imputed value, otherwise ``False``. Notes ----- - Uses weights from ``WEIGHT_ARRAY`` truncated to the available sample length and normalized to sum to 1. - The caller is responsible for ensuring the correct history length; in - this module, weighted imputation is used only when exactly three prior actual observations exist. """success=FalseforfeatureinFEATURE_COLUMNS_TO_IMPUTE:iffeaturenotinrecent_actual.columns:continuevalid_series=recent_actual[feature].dropna()ifnotvalid_series.empty:values=valid_series.to_numpy(dtype=float)weights=WEIGHT_ARRAY[-len(values):]normalized=weights/weights.sum()imputed_values[feature]=float(np.average(values,weights=normalized))success=Trueelse:imputed_values[feature]=Nonereturnsuccessdef_apply_interpolated_imputation(group_df:pd.DataFrame,missing_timestamp:pd.Timestamp,imputed_values:Dict[str,Any],)->bool:"""Apply time-based linear interpolation on configured features. Parameters ---------- group_df : pandas.DataFrame DataFrame for a single coordinate, indexed by timestamp. missing_timestamp : pandas.Timestamp Timestamp at which to interpolate. imputed_values : dict[str, Any] Mutable mapping updated in place with imputed feature values. Returns ------- bool ``True`` if at least one feature received an interpolated value, otherwise ``False``. """success=FalseforfeatureinFEATURE_COLUMNS_TO_IMPUTE:iffeaturenotingroup_df.columns:continueseries=group_df[feature].copy()ifmissing_timestampnotinseries.index:series.at[missing_timestamp]=np.naninterpolated=series.sort_index().interpolate(method="time")value=interpolated.get(missing_timestamp)ifpd.notna(value):imputed_values[feature]=float(value)success=Trueelse:imputed_values[feature]=Nonereturnsuccessdef_impute_single_timestamp(group_df:pd.DataFrame,latitude:float,longitude:float,missing_timestamp:pd.Timestamp,)->Optional[WeatherObservation]:"""Impute values for a single missing timestamp. Parameters ---------- group_df : pandas.DataFrame DataFrame indexed by timestamp for the coordinate. latitude : float Latitude for the new observation. longitude : float Longitude for the new observation. missing_timestamp : pandas.Timestamp Timestamp to fill. Returns ------- app.models.WeatherObservation | None ORM instance when imputation succeeds; ``None`` if no strategy could produce a value for any configured feature. Notes ----- - Uses weighted imputation when exactly ``MAX_WEIGHT_OBSERVATIONS`` prior actual rows exist; otherwise falls back to linear interpolation if at least one prior actual row is present. - All NumPy scalars are converted to native Python types for ORM safety. """recent_actual=_get_prior_actual(group_df,missing_timestamp)imputed_values:Dict[str,Any]={"latitude":latitude,"longitude":longitude,"timestamp":missing_timestamp,"is_imputed":True,}iflen(recent_actual)==MAX_WEIGHT_OBSERVATIONS:success=_apply_weighted_imputation(recent_actual,imputed_values)elifnotrecent_actual.empty:logger.debug(f"Less than {MAX_WEIGHT_OBSERVATIONS} observations for "f"({latitude},{longitude}) at {missing_timestamp}. Using linear ""interpolation.")success=_apply_interpolated_imputation(group_df,missing_timestamp,imputed_values)else:logger.debug(f"No prior observations for ({latitude}, {longitude}) at "f"{missing_timestamp}.")success=Falseifsuccess:# Convert all numpy types in imputed_values to native Python typesforkey,valueinimputed_values.items():ifhasattr(value,"item"):imputed_values[key]=value.item()elifisinstance(value,(np.generic,np.ndarray)):imputed_values[key]=value.tolist()returnWeatherObservation(**imputed_values)returnNonedef_commit_imputed_records(session:Session,records:List[WeatherObservation])->None:"""Merge and commit imputed records to the database. Parameters ---------- session : sqlalchemy.orm.Session Open SQLAlchemy session bound to an engine. records : list[app.models.WeatherObservation] Imputed ORM instances to be merged (upsert-like semantics). Returns ------- None Raises ------ SQLAlchemyError If the commit fails. Notes ----- - Uses ``Session.merge`` to upsert by primary key (timestamp, latitude, longitude) followed by a single commit for the batch. """assertsessionisnotNone,"Session must not be None."forrecordinrecords:session.merge(record)session.commit()