weather_ingest.py#

"""
Asynchronous ingestion of nowcast weather observations from MET Norway.

This module fetches current Nowcast 2.0 data for all configured coordinates
using ``aiohttp``, parses the first timeseries entry, and upserts
``WeatherObservation`` rows in the database. It is designed to be triggered by
the in-process scheduler in :mod:`app.main` and commits changes once per
ingestion cycle for efficiency.

See Also
--------
app.models.WeatherObservation : ORM model persisted by this module.
app.models.Coordinate : Source of coordinates to fetch.
app.database.SessionLocal : Session factory used for DB access.
app.imputation.run_imputation_cycle : Fills gaps after ingestion.
app.main : Schedules periodic ingestion via APScheduler.

Notes
-----
- Primary role: retrieve per-coordinate nowcast payloads and stage inserts or
  updates for :class:`app.models.WeatherObservation`.
- Key dependencies: external API at ``BASE_URL``, ``aiohttp``, a configured
  SQLAlchemy session factory (:data:`app.database.SessionLocal`), and a valid
  ``User-Agent`` string in :data:`app.config.settings.user_agent`.
- Invariants: coordinates must exist to ingest. Only imputed observations are
  overwritten to preserve real measurements.

Examples
--------
>>> # Trigger one ingestion cycle (DB + network required)     # doctest: +SKIP
>>> from app.weather_ingest import trigger_weather_ingestion_cycle  # doctest: +SKIP
>>> trigger_weather_ingestion_cycle()                          # doctest: +SKIP
"""

import asyncio
import logging
from datetime import datetime
from typing import Any, Optional, Sequence, Tuple, cast

import aiohttp
from aiohttp import ClientError, ClientResponseError
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session as DBSession

from .config import settings
from .database import SessionLocal
from .models import Coordinate, WeatherObservation

logger = logging.getLogger(__name__)

BASE_URL: str = "https://api.met.no/weatherapi/nowcast/2.0/complete"
USER_AGENT: str = settings.user_agent
TIMESERIES_FIRST_INDEX: int = 0


async def fetch_weather_for_coord(
    http_session: aiohttp.ClientSession, lat: float, lon: float
) -> Optional[dict[str, Any]]:
    """Fetch nowcast JSON payload for one coordinate.

    Parameters
    ----------
    http_session : aiohttp.ClientSession
        Open client session used to perform the request.
    lat : float
        Coordinate latitude in decimal degrees.
    lon : float
        Coordinate longitude in decimal degrees.

    Returns
    -------
    dict[str, Any] | None
        Parsed JSON payload on success; ``None`` on HTTP, network, or JSON
        parsing errors.

    Raises
    ------
    AssertionError
        If ``lat`` or ``lon`` are not floats.

    Notes
    -----
    - Adds the configured ``User-Agent`` header to comply with api.met.no
      policy.
    """
    assert isinstance(lat, float), f"Latitude must be float, got {type(lat)}"
    assert isinstance(lon, float), f"Longitude must be float, got {type(lon)}"
    url = f"{BASE_URL}?lat={lat:.4f}&lon={lon:.4f}"
    try:
        async with http_session.get(
            url, headers={"User-Agent": USER_AGENT}
        ) as response:
            response.raise_for_status()
            return cast(dict[str, Any], await response.json())
    except ClientResponseError as e:
        logger.warning(
            f"HTTP error fetching data for {lat},{lon}. Status: {e.status}, Message: {e.message}"
        )
        return None
    except ClientError as error:
        logger.error(
            f"Network error fetching data for {lat},{lon}: {error}", exc_info=True
        )
        return None
    except ValueError as error:
        logger.error(f"JSON parsing error for {lat},{lon}: {error}", exc_info=True)
        return None


async def gather_weather_data(
    coordinates: Sequence[Coordinate],
) -> list[Optional[dict[str, Any]]]:
    """Fetch payloads for all coordinates concurrently.

    Parameters
    ----------
    coordinates : Sequence[app.models.Coordinate]
        Coordinates to fetch.

    Returns
    -------
    list[dict[str, Any] | None]
        One entry per input coordinate. Each entry is ``None`` if fetching
        failed for that coordinate.

    Notes
    -----
    - Uses :func:`fetch_weather_for_coord` under ``asyncio.gather`` with
      ``return_exceptions=False``; errors are handled inside the helper and
      materialize as ``None`` results.
    """
    assert all(
        isinstance(coord, Coordinate) for coord in coordinates
    ), "All items in coordinates must be Coordinate instances."
    async with aiohttp.ClientSession(
        headers={"User-Agent": USER_AGENT}
    ) as http_session:
        tasks = [
            fetch_weather_for_coord(http_session, coord.latitude, coord.longitude)
            for coord in coordinates
        ]
        results: list[Optional[dict[str, Any]]] = await asyncio.gather(
            *tasks, return_exceptions=False
        )
        return results


def process_weather_payload(
    db_session: DBSession,
    coordinate: Coordinate,
    payload: Optional[dict[str, Any]],
) -> Tuple[int, int]:
    """Parse a payload and stage a new or updated observation.

    This function extracts the first timeseries entry (if present), normalizes
    field names, and either updates an existing imputed row or adds a new one
    to the session. It does not commit.

    Parameters
    ----------
    db_session : sqlalchemy.orm.Session
        Open session used to query and stage changes.
    coordinate : app.models.Coordinate
        Coordinate corresponding to the payload.
    payload : dict[str, Any] | None
        Parsed payload, or ``None`` if fetching failed.

    Returns
    -------
    tuple[int, int]
        Pair ``(new_count, updated_count)`` indicating staged inserts/updates.

    Raises
    ------
    SQLAlchemyError
        If ORM operations fail (e.g., during query execution).

    Notes
    -----
    - Only the first timeseries item is used (index
      ``TIMESERIES_FIRST_INDEX``).
    - Existing non‑imputed rows are never modified; only imputed rows are
      overwritten to preserve real measurements.
    - NumPy scalars are converted to native Python types for ORM safety.
    """
    assert db_session is not None, "db_session must not be None."
    assert isinstance(
        coordinate, Coordinate
    ), f"coordinate must be Coordinate instance, got {type(coordinate)}"
    if payload is None:
        return 0, 0

    properties = payload.get("properties")
    timeseries = properties.get("timeseries") if properties else None
    if not timeseries:
        logger.debug(
            f"No timeseries data for coordinate ({coordinate.latitude}, {coordinate.longitude})."
        )
        return 0, 0

    try:
        ts_entry = timeseries[TIMESERIES_FIRST_INDEX]
        timestamp_str: str = ts_entry["time"]
        if timestamp_str.endswith("Z"):
            timestamp_str = timestamp_str[:-1] + "+00:00"
        observation_time = datetime.fromisoformat(timestamp_str)
        details = ts_entry["data"]["instant"]["details"]
        observation_mapping: dict[str, Any] = {
            "timestamp": observation_time,
            "latitude": payload["geometry"]["coordinates"][1],
            "longitude": payload["geometry"]["coordinates"][0],
            "air_temperature": details.get("air_temperature"),
            "wind_speed": details.get("wind_speed"),
            "wind_from_direction": details.get("wind_from_direction"),
            "cloud_area_fraction": details.get("cloud_area_fraction"),
            "precipitation_amount": ts_entry.get("data", {})
            .get("next_1_hours", {})
            .get("details", {})
            .get("precipitation_amount"),
            "is_imputed": False,
        }
        # Normalize wind direction field
        wind_direction = observation_mapping.pop("wind_from_direction", None)
        observation_mapping["wind_direction"] = wind_direction

        existing_observation = (
            db_session.query(WeatherObservation)
            .filter_by(
                timestamp=observation_mapping["timestamp"],
                latitude=observation_mapping["latitude"],
                longitude=observation_mapping["longitude"],
            )
            .first()
        )
        new_count = 0
        updated_count = 0
        if existing_observation:
            if existing_observation.is_imputed:
                for attr, value in observation_mapping.items():
                    setattr(existing_observation, attr, value)
                updated_count = 1
                logger.debug(
                    f"Overwriting imputed observation at {observation_time} for "
                    f"{coordinate.latitude},{coordinate.longitude}."
                )
        else:
            # Convert all numpy types in observation_mapping to native Python types
            for key, value in observation_mapping.items():
                if hasattr(value, "item"):
                    observation_mapping[key] = value.item()
                elif isinstance(value, (list, tuple)):
                    observation_mapping[key] = [
                        v.item() if hasattr(v, "item") else v for v in value
                    ]
            new_observation = WeatherObservation(**observation_mapping)
            db_session.add(new_observation)
            new_count = 1

        return new_count, updated_count
    except (KeyError, IndexError, TypeError) as parse_error:
        logger.warning(
            f"Failed to parse weather payload for coordinate "
            f"({coordinate.latitude}, {coordinate.longitude}): {parse_error}. "
            f"Payload snippet: {str(payload)[:200]}"
        )
        return 0, 0


async def run_weather_ingestion_cycle_async() -> None:
    """Run a full ingestion pass for all stored coordinates.

    Coordinates are read from the database, payloads are fetched concurrently,
    and each payload is processed and staged. A single commit persists any
    changes.

    Returns
    -------
    None

    Raises
    ------
    Exception
        Unexpected errors are propagated after being logged. Known HTTP and
        database errors are caught and logged without raising.

    Notes
    -----
    - Commits only when there is at least one new or updated observation.
    - Designed for periodic execution via APScheduler.

    See Also
    --------
    app.weather_ingest.gather_weather_data : Concurrent fetching helper.
    app.weather_ingest.process_weather_payload : Payload parsing/staging.
    """
    logger.info("Running data ingestion cycle from weather API...")
    try:
        with SessionLocal() as db_session:
            coordinates = db_session.query(Coordinate).all()
            if not coordinates:
                logger.warning(
                    "No coordinates in database. Skipping weather data ingestion."
                )
                return

            payloads = await gather_weather_data(coordinates)
            total_new = 0
            total_updated = 0
            for coordinate, payload in zip(coordinates, payloads):
                new_count, updated_count = process_weather_payload(
                    db_session, coordinate, payload
                )
                total_new += new_count
                total_updated += updated_count

            if total_new > 0 or total_updated > 0:
                db_session.commit()
                logger.info(
                    f"Ingestion complete. Added: {total_new}, "
                    f"Updated: {total_updated} observations."
                )
            else:
                logger.info("Ingestion complete. No new or updated observations.")
    except ClientError as http_error:
        logger.error(f"HTTP error during ingestion cycle: {http_error}", exc_info=True)
    except SQLAlchemyError as db_error:
        logger.error(
            f"Database error during ingestion cycle: {db_error}", exc_info=True
        )
    except Exception as unexpected_error:
        logger.error(
            f"Unexpected error during ingestion cycle: {unexpected_error}",
            exc_info=True,
        )
        raise


def trigger_weather_ingestion_cycle() -> None:
    """Synchronous wrapper around the asynchronous ingestion cycle.

    Intended for use by in‑process schedulers such as APScheduler.

    Returns
    -------
    None

    Raises
    ------
    Exception
        Propagates unexpected exceptions raised by the asynchronous cycle.

    Examples
    --------
    >>> from app.weather_ingest import trigger_weather_ingestion_cycle  # doctest: +SKIP
    >>> trigger_weather_ingestion_cycle()  # doctest: +SKIP
    """
    logger.info("APScheduler triggering weather ingestion cycle.")
    asyncio.run(run_weather_ingestion_cycle_async())