app.weather_ingest module#
Asynchronous ingestion of nowcast weather observations from MET Norway.
This module fetches current Nowcast 2.0 data for all configured coordinates
using aiohttp, parses the first timeseries entry, and upserts
WeatherObservation rows in the database. It is designed to be triggered by
the in-process scheduler in app.main and commits changes once per
ingestion cycle for efficiency.
See Also#
app.models.WeatherObservationORM model persisted by this module.
app.models.CoordinateSource of coordinates to fetch.
app.database.SessionLocalSession factory used for DB access.
app.imputation.run_imputation_cycleFills gaps after ingestion.
app.mainSchedules periodic ingestion via APScheduler.
Notes#
Primary role: retrieve per-coordinate nowcast payloads and stage inserts or updates for
app.models.WeatherObservation.Key dependencies: external API at
BASE_URL,aiohttp, a configured SQLAlchemy session factory (app.database.SessionLocal), and a validUser-Agentstring inapp.config.settings.user_agent.Invariants: coordinates must exist to ingest. Only imputed observations are overwritten to preserve real measurements.
Examples#
>>> # Trigger one ingestion cycle (DB + network required)
>>> from app.weather_ingest import trigger_weather_ingestion_cycle
>>> trigger_weather_ingestion_cycle()
- async app.weather_ingest.fetch_weather_for_coord(http_session: ClientSession, lat: float, lon: float) dict[str, Any] | None[source]#
Fetch nowcast JSON payload for one coordinate.
- Parameters:
- Returns:
- Raises:
AssertionErrorIf
latorlonare not floats.
Notes
Adds the configured
User-Agentheader to comply with api.met.no policy.
- async app.weather_ingest.gather_weather_data(coordinates: Sequence[Coordinate]) list[dict[str, Any] | None][source]#
Fetch payloads for all coordinates concurrently.
- Parameters:
- coordinates
Sequence[app.models.Coordinate] Coordinates to fetch.
- coordinates
- Returns:
Notes
Uses
fetch_weather_for_coord()underasyncio.gatherwithreturn_exceptions=False; errors are handled inside the helper and materialize asNoneresults.
- app.weather_ingest.process_weather_payload(db_session: sqlalchemy.orm.Session, coordinate: Coordinate, payload: dict[str, Any] | None) Tuple[int, int][source]#
Parse a payload and stage a new or updated observation.
This function extracts the first timeseries entry (if present), normalizes field names, and either updates an existing imputed row or adds a new one to the session. It does not commit.
- Parameters:
- db_session
sqlalchemy.orm.Session Open session used to query and stage changes.
- coordinate
app.models.Coordinate Coordinate corresponding to the payload.
- payload
dict[str,Any] |None Parsed payload, or
Noneif fetching failed.
- db_session
- Returns:
- Raises:
SQLAlchemyErrorIf ORM operations fail (e.g., during query execution).
Notes
Only the first timeseries item is used (index
TIMESERIES_FIRST_INDEX).Existing non‑imputed rows are never modified; only imputed rows are overwritten to preserve real measurements.
NumPy scalars are converted to native Python types for ORM safety.
- async app.weather_ingest.run_weather_ingestion_cycle_async() None[source]#
Run a full ingestion pass for all stored coordinates.
Coordinates are read from the database, payloads are fetched concurrently, and each payload is processed and staged. A single commit persists any changes.
- Returns:
- Raises:
ExceptionUnexpected errors are propagated after being logged. Known HTTP and database errors are caught and logged without raising.
See also
app.weather_ingest.gather_weather_dataConcurrent fetching helper.
app.weather_ingest.process_weather_payloadPayload parsing/staging.
Notes
Commits only when there is at least one new or updated observation.
Designed for periodic execution via APScheduler.
- app.weather_ingest.trigger_weather_ingestion_cycle() None[source]#
Synchronous wrapper around the asynchronous ingestion cycle.
Intended for use by in‑process schedulers such as APScheduler.
Examples
>>> from app.weather_ingest import trigger_weather_ingestion_cycle >>> trigger_weather_ingestion_cycle()