app.weather_ingest module#

Asynchronous ingestion of nowcast weather observations from MET Norway.

This module fetches current Nowcast 2.0 data for all configured coordinates using aiohttp, parses the first timeseries entry, and upserts WeatherObservation rows in the database. It is designed to be triggered by the in-process scheduler in app.main and commits changes once per ingestion cycle for efficiency.

See Also#

app.models.WeatherObservation

ORM model persisted by this module.

app.models.Coordinate

Source of coordinates to fetch.

app.database.SessionLocal

Session factory used for DB access.

app.imputation.run_imputation_cycle

Fills gaps after ingestion.

app.main

Schedules periodic ingestion via APScheduler.

Notes#

  • Primary role: retrieve per-coordinate nowcast payloads and stage inserts or updates for app.models.WeatherObservation.

  • Key dependencies: external API at BASE_URL, aiohttp, a configured SQLAlchemy session factory (app.database.SessionLocal), and a valid User-Agent string in app.config.settings.user_agent.

  • Invariants: coordinates must exist to ingest. Only imputed observations are overwritten to preserve real measurements.

Examples#

>>> # Trigger one ingestion cycle (DB + network required)
>>> from app.weather_ingest import trigger_weather_ingestion_cycle
>>> trigger_weather_ingestion_cycle()
async app.weather_ingest.fetch_weather_for_coord(http_session: ClientSession, lat: float, lon: float) dict[str, Any] | None[source]#

Fetch nowcast JSON payload for one coordinate.

Parameters:
http_sessionaiohttp.ClientSession

Open client session used to perform the request.

latfloat

Coordinate latitude in decimal degrees.

lonfloat

Coordinate longitude in decimal degrees.

Returns:
dict[str, Any] | None

Parsed JSON payload on success; None on HTTP, network, or JSON parsing errors.

Raises:
AssertionError

If lat or lon are not floats.

Notes

  • Adds the configured User-Agent header to comply with api.met.no policy.

async app.weather_ingest.gather_weather_data(coordinates: Sequence[Coordinate]) list[dict[str, Any] | None][source]#

Fetch payloads for all coordinates concurrently.

Parameters:
coordinatesSequence[app.models.Coordinate]

Coordinates to fetch.

Returns:
list[dict[str, Any] | None]

One entry per input coordinate. Each entry is None if fetching failed for that coordinate.

Notes

  • Uses fetch_weather_for_coord() under asyncio.gather with return_exceptions=False; errors are handled inside the helper and materialize as None results.

app.weather_ingest.process_weather_payload(db_session: sqlalchemy.orm.Session, coordinate: Coordinate, payload: dict[str, Any] | None) Tuple[int, int][source]#

Parse a payload and stage a new or updated observation.

This function extracts the first timeseries entry (if present), normalizes field names, and either updates an existing imputed row or adds a new one to the session. It does not commit.

Parameters:
db_sessionsqlalchemy.orm.Session

Open session used to query and stage changes.

coordinateapp.models.Coordinate

Coordinate corresponding to the payload.

payloaddict[str, Any] | None

Parsed payload, or None if fetching failed.

Returns:
tuple[int, int]

Pair (new_count, updated_count) indicating staged inserts/updates.

Raises:
SQLAlchemyError

If ORM operations fail (e.g., during query execution).

Notes

  • Only the first timeseries item is used (index TIMESERIES_FIRST_INDEX).

  • Existing non‑imputed rows are never modified; only imputed rows are overwritten to preserve real measurements.

  • NumPy scalars are converted to native Python types for ORM safety.

async app.weather_ingest.run_weather_ingestion_cycle_async() None[source]#

Run a full ingestion pass for all stored coordinates.

Coordinates are read from the database, payloads are fetched concurrently, and each payload is processed and staged. A single commit persists any changes.

Returns:
None
Raises:
Exception

Unexpected errors are propagated after being logged. Known HTTP and database errors are caught and logged without raising.

See also

app.weather_ingest.gather_weather_data

Concurrent fetching helper.

app.weather_ingest.process_weather_payload

Payload parsing/staging.

Notes

  • Commits only when there is at least one new or updated observation.

  • Designed for periodic execution via APScheduler.

app.weather_ingest.trigger_weather_ingestion_cycle() None[source]#

Synchronous wrapper around the asynchronous ingestion cycle.

Intended for use by in‑process schedulers such as APScheduler.

Returns:
None
Raises:
Exception

Propagates unexpected exceptions raised by the asynchronous cycle.

Examples

>>> from app.weather_ingest import trigger_weather_ingestion_cycle
>>> trigger_weather_ingestion_cycle()