"""
Asynchronous ingestion of nowcast weather observations from MET Norway.
This module fetches current Nowcast 2.0 data for all configured coordinates
using ``aiohttp``, parses the first timeseries entry, and upserts
``WeatherObservation`` rows in the database. It is designed to be triggered by
the in-process scheduler in :mod:`app.main` and commits changes once per
ingestion cycle for efficiency.
See Also
--------
app.models.WeatherObservation : ORM model persisted by this module.
app.models.Coordinate : Source of coordinates to fetch.
app.database.SessionLocal : Session factory used for DB access.
app.imputation.run_imputation_cycle : Fills gaps after ingestion.
app.main : Schedules periodic ingestion via APScheduler.
Notes
-----
- Primary role: retrieve per-coordinate nowcast payloads and stage inserts or
updates for :class:`app.models.WeatherObservation`.
- Key dependencies: external API at ``BASE_URL``, ``aiohttp``, a configured
SQLAlchemy session factory (:data:`app.database.SessionLocal`), and a valid
``User-Agent`` string in :data:`app.config.settings.user_agent`.
- Invariants: coordinates must exist to ingest. Only imputed observations are
overwritten to preserve real measurements.
Examples
--------
>>> # Trigger one ingestion cycle (DB + network required) # doctest: +SKIP
>>> from app.weather_ingest import trigger_weather_ingestion_cycle # doctest: +SKIP
>>> trigger_weather_ingestion_cycle() # doctest: +SKIP
"""
import asyncio
import logging
from datetime import datetime
from typing import Any, Optional, Sequence, Tuple, cast
import aiohttp
from aiohttp import ClientError, ClientResponseError
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session as DBSession
from .config import settings
from .database import SessionLocal
from .models import Coordinate, WeatherObservation
logger = logging.getLogger(__name__)
BASE_URL: str = "https://api.met.no/weatherapi/nowcast/2.0/complete"
USER_AGENT: str = settings.user_agent
TIMESERIES_FIRST_INDEX: int = 0
[docs]
async def fetch_weather_for_coord(
http_session: aiohttp.ClientSession, lat: float, lon: float
) -> Optional[dict[str, Any]]:
"""Fetch nowcast JSON payload for one coordinate.
Parameters
----------
http_session : aiohttp.ClientSession
Open client session used to perform the request.
lat : float
Coordinate latitude in decimal degrees.
lon : float
Coordinate longitude in decimal degrees.
Returns
-------
dict[str, Any] | None
Parsed JSON payload on success; ``None`` on HTTP, network, or JSON
parsing errors.
Raises
------
AssertionError
If ``lat`` or ``lon`` are not floats.
Notes
-----
- Adds the configured ``User-Agent`` header to comply with api.met.no
policy.
"""
assert isinstance(lat, float), f"Latitude must be float, got {type(lat)}"
assert isinstance(lon, float), f"Longitude must be float, got {type(lon)}"
url = f"{BASE_URL}?lat={lat:.4f}&lon={lon:.4f}"
try:
async with http_session.get(
url, headers={"User-Agent": USER_AGENT}
) as response:
response.raise_for_status()
return cast(dict[str, Any], await response.json())
except ClientResponseError as e:
logger.warning(
f"HTTP error fetching data for {lat},{lon}. Status: {e.status}, Message: {e.message}"
)
return None
except ClientError as error:
logger.error(
f"Network error fetching data for {lat},{lon}: {error}", exc_info=True
)
return None
except ValueError as error:
logger.error(f"JSON parsing error for {lat},{lon}: {error}", exc_info=True)
return None
[docs]
async def gather_weather_data(
coordinates: Sequence[Coordinate],
) -> list[Optional[dict[str, Any]]]:
"""Fetch payloads for all coordinates concurrently.
Parameters
----------
coordinates : Sequence[app.models.Coordinate]
Coordinates to fetch.
Returns
-------
list[dict[str, Any] | None]
One entry per input coordinate. Each entry is ``None`` if fetching
failed for that coordinate.
Notes
-----
- Uses :func:`fetch_weather_for_coord` under ``asyncio.gather`` with
``return_exceptions=False``; errors are handled inside the helper and
materialize as ``None`` results.
"""
assert all(
isinstance(coord, Coordinate) for coord in coordinates
), "All items in coordinates must be Coordinate instances."
async with aiohttp.ClientSession(
headers={"User-Agent": USER_AGENT}
) as http_session:
tasks = [
fetch_weather_for_coord(http_session, coord.latitude, coord.longitude)
for coord in coordinates
]
results: list[Optional[dict[str, Any]]] = await asyncio.gather(
*tasks, return_exceptions=False
)
return results
[docs]
def process_weather_payload(
db_session: DBSession,
coordinate: Coordinate,
payload: Optional[dict[str, Any]],
) -> Tuple[int, int]:
"""Parse a payload and stage a new or updated observation.
This function extracts the first timeseries entry (if present), normalizes
field names, and either updates an existing imputed row or adds a new one
to the session. It does not commit.
Parameters
----------
db_session : sqlalchemy.orm.Session
Open session used to query and stage changes.
coordinate : app.models.Coordinate
Coordinate corresponding to the payload.
payload : dict[str, Any] | None
Parsed payload, or ``None`` if fetching failed.
Returns
-------
tuple[int, int]
Pair ``(new_count, updated_count)`` indicating staged inserts/updates.
Raises
------
SQLAlchemyError
If ORM operations fail (e.g., during query execution).
Notes
-----
- Only the first timeseries item is used (index
``TIMESERIES_FIRST_INDEX``).
- Existing non‑imputed rows are never modified; only imputed rows are
overwritten to preserve real measurements.
- NumPy scalars are converted to native Python types for ORM safety.
"""
assert db_session is not None, "db_session must not be None."
assert isinstance(
coordinate, Coordinate
), f"coordinate must be Coordinate instance, got {type(coordinate)}"
if payload is None:
return 0, 0
properties = payload.get("properties")
timeseries = properties.get("timeseries") if properties else None
if not timeseries:
logger.debug(
f"No timeseries data for coordinate ({coordinate.latitude}, {coordinate.longitude})."
)
return 0, 0
try:
ts_entry = timeseries[TIMESERIES_FIRST_INDEX]
timestamp_str: str = ts_entry["time"]
if timestamp_str.endswith("Z"):
timestamp_str = timestamp_str[:-1] + "+00:00"
observation_time = datetime.fromisoformat(timestamp_str)
details = ts_entry["data"]["instant"]["details"]
observation_mapping: dict[str, Any] = {
"timestamp": observation_time,
"latitude": payload["geometry"]["coordinates"][1],
"longitude": payload["geometry"]["coordinates"][0],
"air_temperature": details.get("air_temperature"),
"wind_speed": details.get("wind_speed"),
"wind_from_direction": details.get("wind_from_direction"),
"cloud_area_fraction": details.get("cloud_area_fraction"),
"precipitation_amount": ts_entry.get("data", {})
.get("next_1_hours", {})
.get("details", {})
.get("precipitation_amount"),
"is_imputed": False,
}
# Normalize wind direction field
wind_direction = observation_mapping.pop("wind_from_direction", None)
observation_mapping["wind_direction"] = wind_direction
existing_observation = (
db_session.query(WeatherObservation)
.filter_by(
timestamp=observation_mapping["timestamp"],
latitude=observation_mapping["latitude"],
longitude=observation_mapping["longitude"],
)
.first()
)
new_count = 0
updated_count = 0
if existing_observation:
if existing_observation.is_imputed:
for attr, value in observation_mapping.items():
setattr(existing_observation, attr, value)
updated_count = 1
logger.debug(
f"Overwriting imputed observation at {observation_time} for "
f"{coordinate.latitude},{coordinate.longitude}."
)
else:
# Convert all numpy types in observation_mapping to native Python types
for key, value in observation_mapping.items():
if hasattr(value, "item"):
observation_mapping[key] = value.item()
elif isinstance(value, (list, tuple)):
observation_mapping[key] = [
v.item() if hasattr(v, "item") else v for v in value
]
new_observation = WeatherObservation(**observation_mapping)
db_session.add(new_observation)
new_count = 1
return new_count, updated_count
except (KeyError, IndexError, TypeError) as parse_error:
logger.warning(
f"Failed to parse weather payload for coordinate "
f"({coordinate.latitude}, {coordinate.longitude}): {parse_error}. "
f"Payload snippet: {str(payload)[:200]}"
)
return 0, 0
[docs]
async def run_weather_ingestion_cycle_async() -> None:
"""Run a full ingestion pass for all stored coordinates.
Coordinates are read from the database, payloads are fetched concurrently,
and each payload is processed and staged. A single commit persists any
changes.
Returns
-------
None
Raises
------
Exception
Unexpected errors are propagated after being logged. Known HTTP and
database errors are caught and logged without raising.
Notes
-----
- Commits only when there is at least one new or updated observation.
- Designed for periodic execution via APScheduler.
See Also
--------
app.weather_ingest.gather_weather_data : Concurrent fetching helper.
app.weather_ingest.process_weather_payload : Payload parsing/staging.
"""
logger.info("Running data ingestion cycle from weather API...")
try:
with SessionLocal() as db_session:
coordinates = db_session.query(Coordinate).all()
if not coordinates:
logger.warning(
"No coordinates in database. Skipping weather data ingestion."
)
return
payloads = await gather_weather_data(coordinates)
total_new = 0
total_updated = 0
for coordinate, payload in zip(coordinates, payloads):
new_count, updated_count = process_weather_payload(
db_session, coordinate, payload
)
total_new += new_count
total_updated += updated_count
if total_new > 0 or total_updated > 0:
db_session.commit()
logger.info(
f"Ingestion complete. Added: {total_new}, "
f"Updated: {total_updated} observations."
)
else:
logger.info("Ingestion complete. No new or updated observations.")
except ClientError as http_error:
logger.error(f"HTTP error during ingestion cycle: {http_error}", exc_info=True)
except SQLAlchemyError as db_error:
logger.error(
f"Database error during ingestion cycle: {db_error}", exc_info=True
)
except Exception as unexpected_error:
logger.error(
f"Unexpected error during ingestion cycle: {unexpected_error}",
exc_info=True,
)
raise
[docs]
def trigger_weather_ingestion_cycle() -> None:
"""Synchronous wrapper around the asynchronous ingestion cycle.
Intended for use by in‑process schedulers such as APScheduler.
Returns
-------
None
Raises
------
Exception
Propagates unexpected exceptions raised by the asynchronous cycle.
Examples
--------
>>> from app.weather_ingest import trigger_weather_ingestion_cycle # doctest: +SKIP
>>> trigger_weather_ingestion_cycle() # doctest: +SKIP
"""
logger.info("APScheduler triggering weather ingestion cycle.")
asyncio.run(run_weather_ingestion_cycle_async())