"""Asynchronous ingestion of nowcast weather observations from MET Norway.This module fetches current Nowcast 2.0 data for all configured coordinatesusing ``aiohttp``, parses the first timeseries entry, and upserts``WeatherObservation`` rows in the database. It is designed to be triggered bythe in-process scheduler in :mod:`app.main` and commits changes once peringestion cycle for efficiency.See Also--------app.models.WeatherObservation : ORM model persisted by this module.app.models.Coordinate : Source of coordinates to fetch.app.database.SessionLocal : Session factory used for DB access.app.imputation.run_imputation_cycle : Fills gaps after ingestion.app.main : Schedules periodic ingestion via APScheduler.Notes------ Primary role: retrieve per-coordinate nowcast payloads and stage inserts or updates for :class:`app.models.WeatherObservation`.- Key dependencies: external API at ``BASE_URL``, ``aiohttp``, a configured SQLAlchemy session factory (:data:`app.database.SessionLocal`), and a valid ``User-Agent`` string in :data:`app.config.settings.user_agent`.- Invariants: coordinates must exist to ingest. Only imputed observations are overwritten to preserve real measurements.Examples-------->>> # Trigger one ingestion cycle (DB + network required) # doctest: +SKIP>>> from app.weather_ingest import trigger_weather_ingestion_cycle # doctest: +SKIP>>> trigger_weather_ingestion_cycle() # doctest: +SKIP"""importasyncioimportloggingfromdatetimeimportdatetimefromtypingimportAny,Optional,Sequence,Tuple,castimportaiohttpfromaiohttpimportClientError,ClientResponseErrorfromsqlalchemy.excimportSQLAlchemyErrorfromsqlalchemy.ormimportSessionasDBSessionfrom.configimportsettingsfrom.databaseimportSessionLocalfrom.modelsimportCoordinate,WeatherObservationlogger=logging.getLogger(__name__)BASE_URL:str="https://api.met.no/weatherapi/nowcast/2.0/complete"USER_AGENT:str=settings.user_agentTIMESERIES_FIRST_INDEX:int=0asyncdeffetch_weather_for_coord(http_session:aiohttp.ClientSession,lat:float,lon:float)->Optional[dict[str,Any]]:"""Fetch nowcast JSON payload for one coordinate. Parameters ---------- http_session : aiohttp.ClientSession Open client session used to perform the request. lat : float Coordinate latitude in decimal degrees. lon : float Coordinate longitude in decimal degrees. Returns ------- dict[str, Any] | None Parsed JSON payload on success; ``None`` on HTTP, network, or JSON parsing errors. Raises ------ AssertionError If ``lat`` or ``lon`` are not floats. Notes ----- - Adds the configured ``User-Agent`` header to comply with api.met.no policy. """assertisinstance(lat,float),f"Latitude must be float, got {type(lat)}"assertisinstance(lon,float),f"Longitude must be float, got {type(lon)}"url=f"{BASE_URL}?lat={lat:.4f}&lon={lon:.4f}"try:asyncwithhttp_session.get(url,headers={"User-Agent":USER_AGENT})asresponse:response.raise_for_status()returncast(dict[str,Any],awaitresponse.json())exceptClientResponseErrorase:logger.warning(f"HTTP error fetching data for {lat},{lon}. Status: {e.status}, Message: {e.message}")returnNoneexceptClientErroraserror:logger.error(f"Network error fetching data for {lat},{lon}: {error}",exc_info=True)returnNoneexceptValueErroraserror:logger.error(f"JSON parsing error for {lat},{lon}: {error}",exc_info=True)returnNoneasyncdefgather_weather_data(coordinates:Sequence[Coordinate],)->list[Optional[dict[str,Any]]]:"""Fetch payloads for all coordinates concurrently. Parameters ---------- coordinates : Sequence[app.models.Coordinate] Coordinates to fetch. Returns ------- list[dict[str, Any] | None] One entry per input coordinate. Each entry is ``None`` if fetching failed for that coordinate. Notes ----- - Uses :func:`fetch_weather_for_coord` under ``asyncio.gather`` with ``return_exceptions=False``; errors are handled inside the helper and materialize as ``None`` results. """assertall(isinstance(coord,Coordinate)forcoordincoordinates),"All items in coordinates must be Coordinate instances."asyncwithaiohttp.ClientSession(headers={"User-Agent":USER_AGENT})ashttp_session:tasks=[fetch_weather_for_coord(http_session,coord.latitude,coord.longitude)forcoordincoordinates]results:list[Optional[dict[str,Any]]]=awaitasyncio.gather(*tasks,return_exceptions=False)returnresultsdefprocess_weather_payload(db_session:DBSession,coordinate:Coordinate,payload:Optional[dict[str,Any]],)->Tuple[int,int]:"""Parse a payload and stage a new or updated observation. This function extracts the first timeseries entry (if present), normalizes field names, and either updates an existing imputed row or adds a new one to the session. It does not commit. Parameters ---------- db_session : sqlalchemy.orm.Session Open session used to query and stage changes. coordinate : app.models.Coordinate Coordinate corresponding to the payload. payload : dict[str, Any] | None Parsed payload, or ``None`` if fetching failed. Returns ------- tuple[int, int] Pair ``(new_count, updated_count)`` indicating staged inserts/updates. Raises ------ SQLAlchemyError If ORM operations fail (e.g., during query execution). Notes ----- - Only the first timeseries item is used (index ``TIMESERIES_FIRST_INDEX``). - Existing non‑imputed rows are never modified; only imputed rows are overwritten to preserve real measurements. - NumPy scalars are converted to native Python types for ORM safety. """assertdb_sessionisnotNone,"db_session must not be None."assertisinstance(coordinate,Coordinate),f"coordinate must be Coordinate instance, got {type(coordinate)}"ifpayloadisNone:return0,0properties=payload.get("properties")timeseries=properties.get("timeseries")ifpropertieselseNoneifnottimeseries:logger.debug(f"No timeseries data for coordinate ({coordinate.latitude}, {coordinate.longitude}).")return0,0try:ts_entry=timeseries[TIMESERIES_FIRST_INDEX]timestamp_str:str=ts_entry["time"]iftimestamp_str.endswith("Z"):timestamp_str=timestamp_str[:-1]+"+00:00"observation_time=datetime.fromisoformat(timestamp_str)details=ts_entry["data"]["instant"]["details"]observation_mapping:dict[str,Any]={"timestamp":observation_time,"latitude":payload["geometry"]["coordinates"][1],"longitude":payload["geometry"]["coordinates"][0],"air_temperature":details.get("air_temperature"),"wind_speed":details.get("wind_speed"),"wind_from_direction":details.get("wind_from_direction"),"cloud_area_fraction":details.get("cloud_area_fraction"),"precipitation_amount":ts_entry.get("data",{}).get("next_1_hours",{}).get("details",{}).get("precipitation_amount"),"is_imputed":False,}# Normalize wind direction fieldwind_direction=observation_mapping.pop("wind_from_direction",None)observation_mapping["wind_direction"]=wind_directionexisting_observation=(db_session.query(WeatherObservation).filter_by(timestamp=observation_mapping["timestamp"],latitude=observation_mapping["latitude"],longitude=observation_mapping["longitude"],).first())new_count=0updated_count=0ifexisting_observation:ifexisting_observation.is_imputed:forattr,valueinobservation_mapping.items():setattr(existing_observation,attr,value)updated_count=1logger.debug(f"Overwriting imputed observation at {observation_time} for "f"{coordinate.latitude},{coordinate.longitude}.")else:# Convert all numpy types in observation_mapping to native Python typesforkey,valueinobservation_mapping.items():ifhasattr(value,"item"):observation_mapping[key]=value.item()elifisinstance(value,(list,tuple)):observation_mapping[key]=[v.item()ifhasattr(v,"item")elsevforvinvalue]new_observation=WeatherObservation(**observation_mapping)db_session.add(new_observation)new_count=1returnnew_count,updated_countexcept(KeyError,IndexError,TypeError)asparse_error:logger.warning(f"Failed to parse weather payload for coordinate "f"({coordinate.latitude}, {coordinate.longitude}): {parse_error}. "f"Payload snippet: {str(payload)[:200]}")return0,0asyncdefrun_weather_ingestion_cycle_async()->None:"""Run a full ingestion pass for all stored coordinates. Coordinates are read from the database, payloads are fetched concurrently, and each payload is processed and staged. A single commit persists any changes. Returns ------- None Raises ------ Exception Unexpected errors are propagated after being logged. Known HTTP and database errors are caught and logged without raising. Notes ----- - Commits only when there is at least one new or updated observation. - Designed for periodic execution via APScheduler. See Also -------- app.weather_ingest.gather_weather_data : Concurrent fetching helper. app.weather_ingest.process_weather_payload : Payload parsing/staging. """logger.info("Running data ingestion cycle from weather API...")try:withSessionLocal()asdb_session:coordinates=db_session.query(Coordinate).all()ifnotcoordinates:logger.warning("No coordinates in database. Skipping weather data ingestion.")returnpayloads=awaitgather_weather_data(coordinates)total_new=0total_updated=0forcoordinate,payloadinzip(coordinates,payloads):new_count,updated_count=process_weather_payload(db_session,coordinate,payload)total_new+=new_counttotal_updated+=updated_countiftotal_new>0ortotal_updated>0:db_session.commit()logger.info(f"Ingestion complete. Added: {total_new}, "f"Updated: {total_updated} observations.")else:logger.info("Ingestion complete. No new or updated observations.")exceptClientErrorashttp_error:logger.error(f"HTTP error during ingestion cycle: {http_error}",exc_info=True)exceptSQLAlchemyErrorasdb_error:logger.error(f"Database error during ingestion cycle: {db_error}",exc_info=True)exceptExceptionasunexpected_error:logger.error(f"Unexpected error during ingestion cycle: {unexpected_error}",exc_info=True,)raisedeftrigger_weather_ingestion_cycle()->None:"""Synchronous wrapper around the asynchronous ingestion cycle. Intended for use by in‑process schedulers such as APScheduler. Returns ------- None Raises ------ Exception Propagates unexpected exceptions raised by the asynchronous cycle. Examples -------- >>> from app.weather_ingest import trigger_weather_ingestion_cycle # doctest: +SKIP >>> trigger_weather_ingestion_cycle() # doctest: +SKIP """logger.info("APScheduler triggering weather ingestion cycle.")asyncio.run(run_weather_ingestion_cycle_async())