Source code for app.weather_ingest

"""
Asynchronous ingestion of nowcast weather observations from MET Norway.

This module fetches current Nowcast 2.0 data for all configured coordinates
using ``aiohttp``, parses the first timeseries entry, and upserts
``WeatherObservation`` rows in the database. It is designed to be triggered by
the in-process scheduler in :mod:`app.main` and commits changes once per
ingestion cycle for efficiency.

See Also
--------
app.models.WeatherObservation : ORM model persisted by this module.
app.models.Coordinate : Source of coordinates to fetch.
app.database.SessionLocal : Session factory used for DB access.
app.imputation.run_imputation_cycle : Fills gaps after ingestion.
app.main : Schedules periodic ingestion via APScheduler.

Notes
-----
- Primary role: retrieve per-coordinate nowcast payloads and stage inserts or
  updates for :class:`app.models.WeatherObservation`.
- Key dependencies: external API at ``BASE_URL``, ``aiohttp``, a configured
  SQLAlchemy session factory (:data:`app.database.SessionLocal`), and a valid
  ``User-Agent`` string in :data:`app.config.settings.user_agent`.
- Invariants: coordinates must exist to ingest. Only imputed observations are
  overwritten to preserve real measurements.

Examples
--------
>>> # Trigger one ingestion cycle (DB + network required)     # doctest: +SKIP
>>> from app.weather_ingest import trigger_weather_ingestion_cycle  # doctest: +SKIP
>>> trigger_weather_ingestion_cycle()                          # doctest: +SKIP
"""

import asyncio
import logging
from datetime import datetime
from typing import Any, Optional, Sequence, Tuple, cast

import aiohttp
from aiohttp import ClientError, ClientResponseError
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session as DBSession

from .config import settings
from .database import SessionLocal
from .models import Coordinate, WeatherObservation

logger = logging.getLogger(__name__)

BASE_URL: str = "https://api.met.no/weatherapi/nowcast/2.0/complete"
USER_AGENT: str = settings.user_agent
TIMESERIES_FIRST_INDEX: int = 0


[docs] async def fetch_weather_for_coord( http_session: aiohttp.ClientSession, lat: float, lon: float ) -> Optional[dict[str, Any]]: """Fetch nowcast JSON payload for one coordinate. Parameters ---------- http_session : aiohttp.ClientSession Open client session used to perform the request. lat : float Coordinate latitude in decimal degrees. lon : float Coordinate longitude in decimal degrees. Returns ------- dict[str, Any] | None Parsed JSON payload on success; ``None`` on HTTP, network, or JSON parsing errors. Raises ------ AssertionError If ``lat`` or ``lon`` are not floats. Notes ----- - Adds the configured ``User-Agent`` header to comply with api.met.no policy. """ assert isinstance(lat, float), f"Latitude must be float, got {type(lat)}" assert isinstance(lon, float), f"Longitude must be float, got {type(lon)}" url = f"{BASE_URL}?lat={lat:.4f}&lon={lon:.4f}" try: async with http_session.get( url, headers={"User-Agent": USER_AGENT} ) as response: response.raise_for_status() return cast(dict[str, Any], await response.json()) except ClientResponseError as e: logger.warning( f"HTTP error fetching data for {lat},{lon}. Status: {e.status}, Message: {e.message}" ) return None except ClientError as error: logger.error( f"Network error fetching data for {lat},{lon}: {error}", exc_info=True ) return None except ValueError as error: logger.error(f"JSON parsing error for {lat},{lon}: {error}", exc_info=True) return None
[docs] async def gather_weather_data( coordinates: Sequence[Coordinate], ) -> list[Optional[dict[str, Any]]]: """Fetch payloads for all coordinates concurrently. Parameters ---------- coordinates : Sequence[app.models.Coordinate] Coordinates to fetch. Returns ------- list[dict[str, Any] | None] One entry per input coordinate. Each entry is ``None`` if fetching failed for that coordinate. Notes ----- - Uses :func:`fetch_weather_for_coord` under ``asyncio.gather`` with ``return_exceptions=False``; errors are handled inside the helper and materialize as ``None`` results. """ assert all( isinstance(coord, Coordinate) for coord in coordinates ), "All items in coordinates must be Coordinate instances." async with aiohttp.ClientSession( headers={"User-Agent": USER_AGENT} ) as http_session: tasks = [ fetch_weather_for_coord(http_session, coord.latitude, coord.longitude) for coord in coordinates ] results: list[Optional[dict[str, Any]]] = await asyncio.gather( *tasks, return_exceptions=False ) return results
[docs] def process_weather_payload( db_session: DBSession, coordinate: Coordinate, payload: Optional[dict[str, Any]], ) -> Tuple[int, int]: """Parse a payload and stage a new or updated observation. This function extracts the first timeseries entry (if present), normalizes field names, and either updates an existing imputed row or adds a new one to the session. It does not commit. Parameters ---------- db_session : sqlalchemy.orm.Session Open session used to query and stage changes. coordinate : app.models.Coordinate Coordinate corresponding to the payload. payload : dict[str, Any] | None Parsed payload, or ``None`` if fetching failed. Returns ------- tuple[int, int] Pair ``(new_count, updated_count)`` indicating staged inserts/updates. Raises ------ SQLAlchemyError If ORM operations fail (e.g., during query execution). Notes ----- - Only the first timeseries item is used (index ``TIMESERIES_FIRST_INDEX``). - Existing non‑imputed rows are never modified; only imputed rows are overwritten to preserve real measurements. - NumPy scalars are converted to native Python types for ORM safety. """ assert db_session is not None, "db_session must not be None." assert isinstance( coordinate, Coordinate ), f"coordinate must be Coordinate instance, got {type(coordinate)}" if payload is None: return 0, 0 properties = payload.get("properties") timeseries = properties.get("timeseries") if properties else None if not timeseries: logger.debug( f"No timeseries data for coordinate ({coordinate.latitude}, {coordinate.longitude})." ) return 0, 0 try: ts_entry = timeseries[TIMESERIES_FIRST_INDEX] timestamp_str: str = ts_entry["time"] if timestamp_str.endswith("Z"): timestamp_str = timestamp_str[:-1] + "+00:00" observation_time = datetime.fromisoformat(timestamp_str) details = ts_entry["data"]["instant"]["details"] observation_mapping: dict[str, Any] = { "timestamp": observation_time, "latitude": payload["geometry"]["coordinates"][1], "longitude": payload["geometry"]["coordinates"][0], "air_temperature": details.get("air_temperature"), "wind_speed": details.get("wind_speed"), "wind_from_direction": details.get("wind_from_direction"), "cloud_area_fraction": details.get("cloud_area_fraction"), "precipitation_amount": ts_entry.get("data", {}) .get("next_1_hours", {}) .get("details", {}) .get("precipitation_amount"), "is_imputed": False, } # Normalize wind direction field wind_direction = observation_mapping.pop("wind_from_direction", None) observation_mapping["wind_direction"] = wind_direction existing_observation = ( db_session.query(WeatherObservation) .filter_by( timestamp=observation_mapping["timestamp"], latitude=observation_mapping["latitude"], longitude=observation_mapping["longitude"], ) .first() ) new_count = 0 updated_count = 0 if existing_observation: if existing_observation.is_imputed: for attr, value in observation_mapping.items(): setattr(existing_observation, attr, value) updated_count = 1 logger.debug( f"Overwriting imputed observation at {observation_time} for " f"{coordinate.latitude},{coordinate.longitude}." ) else: # Convert all numpy types in observation_mapping to native Python types for key, value in observation_mapping.items(): if hasattr(value, "item"): observation_mapping[key] = value.item() elif isinstance(value, (list, tuple)): observation_mapping[key] = [ v.item() if hasattr(v, "item") else v for v in value ] new_observation = WeatherObservation(**observation_mapping) db_session.add(new_observation) new_count = 1 return new_count, updated_count except (KeyError, IndexError, TypeError) as parse_error: logger.warning( f"Failed to parse weather payload for coordinate " f"({coordinate.latitude}, {coordinate.longitude}): {parse_error}. " f"Payload snippet: {str(payload)[:200]}" ) return 0, 0
[docs] async def run_weather_ingestion_cycle_async() -> None: """Run a full ingestion pass for all stored coordinates. Coordinates are read from the database, payloads are fetched concurrently, and each payload is processed and staged. A single commit persists any changes. Returns ------- None Raises ------ Exception Unexpected errors are propagated after being logged. Known HTTP and database errors are caught and logged without raising. Notes ----- - Commits only when there is at least one new or updated observation. - Designed for periodic execution via APScheduler. See Also -------- app.weather_ingest.gather_weather_data : Concurrent fetching helper. app.weather_ingest.process_weather_payload : Payload parsing/staging. """ logger.info("Running data ingestion cycle from weather API...") try: with SessionLocal() as db_session: coordinates = db_session.query(Coordinate).all() if not coordinates: logger.warning( "No coordinates in database. Skipping weather data ingestion." ) return payloads = await gather_weather_data(coordinates) total_new = 0 total_updated = 0 for coordinate, payload in zip(coordinates, payloads): new_count, updated_count = process_weather_payload( db_session, coordinate, payload ) total_new += new_count total_updated += updated_count if total_new > 0 or total_updated > 0: db_session.commit() logger.info( f"Ingestion complete. Added: {total_new}, " f"Updated: {total_updated} observations." ) else: logger.info("Ingestion complete. No new or updated observations.") except ClientError as http_error: logger.error(f"HTTP error during ingestion cycle: {http_error}", exc_info=True) except SQLAlchemyError as db_error: logger.error( f"Database error during ingestion cycle: {db_error}", exc_info=True ) except Exception as unexpected_error: logger.error( f"Unexpected error during ingestion cycle: {unexpected_error}", exc_info=True, ) raise
[docs] def trigger_weather_ingestion_cycle() -> None: """Synchronous wrapper around the asynchronous ingestion cycle. Intended for use by in‑process schedulers such as APScheduler. Returns ------- None Raises ------ Exception Propagates unexpected exceptions raised by the asynchronous cycle. Examples -------- >>> from app.weather_ingest import trigger_weather_ingestion_cycle # doctest: +SKIP >>> trigger_weather_ingestion_cycle() # doctest: +SKIP """ logger.info("APScheduler triggering weather ingestion cycle.") asyncio.run(run_weather_ingestion_cycle_async())