diff --git a/vercye_ops/examples/run_config_example.yaml b/vercye_ops/examples/run_config_example.yaml index adae592..54ffd5b 100644 --- a/vercye_ops/examples/run_config_example.yaml +++ b/vercye_ops/examples/run_config_example.yaml @@ -87,6 +87,7 @@ apsim_params: era5_cache_dir: 'era5_cache' # Path to your directory to cache ERA5 data. Avoids fetching data from GEE multuple times. Should be global, outside of the basefolder, as this is shared with all studies. ee_project: '' # Earth Engine project name. Only required to be set if met_source is 'ERA5' as this data is fetched from Google Earth Engine. align_np_grid: True # Only when using NasaPower: Snap coordinates for which to query NasaPower met data to the NasaPower grid (0.5x0.625 deg resolution). Might lead to slight differences in radiation. Should be activated when running at field scale to avoid querying the same met data pixel multiple times. + use_nrt_forecast: True # Use ECMWF near real time (14days) forecast. If there are missing dates in the met_start-met_end will try to fill as many forecast days as possible up until met_end. Otherwise will add forecast from end_date. sowing_date_col: 'sowing_date' # Optional: If the shapefile/geojsons have a column containing the sowing date as YYYY-MM-DD, use this in the simulation. Set to null or '' if not using @@ -239,3 +240,4 @@ scripts: generate_interactive_vizualization: '../reporting/generate_interactive_visualization.py' agg_preds_multiyear: '../reporting/aggregate_multiyear_predictions.py' save_matched_sims: '../reporting/save_simulations.py' + nrt_forecast: '../met_data/fetch_ecmwf_nrt_forecast.py' diff --git a/vercye_ops/examples/run_config_template.yaml b/vercye_ops/examples/run_config_template.yaml index 47657fd..a0f68fe 100644 --- a/vercye_ops/examples/run_config_template.yaml +++ b/vercye_ops/examples/run_config_template.yaml @@ -224,6 +224,7 @@ scripts: generate_multiyear_lai_report: '../reporting/generate_lai_report.py' agg_preds_multiyear: '../reporting/aggregate_multiyear_predictions.py' save_matched_sims: '../reporting/save_simulations.py' + nrt_forecast: '../met_data/fetch_ecmwf_nrt_forecast.py' ######################################## # Other (Typically dont change) diff --git a/vercye_ops/met_data/fetch_ecmwf_nrt_forecast.py b/vercye_ops/met_data/fetch_ecmwf_nrt_forecast.py new file mode 100644 index 0000000..742021b --- /dev/null +++ b/vercye_ops/met_data/fetch_ecmwf_nrt_forecast.py @@ -0,0 +1,221 @@ +from pathlib import Path + +import click +import ee +import pandas as pd + +from vercye_ops.met_data.fetch_era5 import ( + build_ee_geometry, + clean_era5, + error_checking_function, + postprocess_for_apsim, + write_met_data_to_csv, +) +from vercye_ops.utils.init_logger import get_logger + +logger = get_logger() + + +def fetch_nrt_forecast(forecasting_date, lon, lat, polygon_path): + year = forecasting_date.year + month = forecasting_date.month + day = forecasting_date.day + logger.info(f"Fetching forecast from {year}, {month}, {day}") + + geometry, geo_geotype = build_ee_geometry(lon, lat, polygon_path) + + era5 = ( + ee.ImageCollection("ECMWF/NRT_FORECAST/IFS/OPER") + .filter(ee.filter.Filter.eq("creation_year", year)) + .filter(ee.filter.Filter.eq("creation_month", month)) + .filter(ee.filter.Filter.eq("creation_day", day)) + .filterBounds(geometry) + .select( + [ + "total_precipitation_sfc", + "temperature_2m_sfc", + "surface_solar_radiation_downwards_sfc", + "u_component_of_wind_10m_sfc", + "v_component_of_wind_10m_sfc", + ] + ) + .sort("forecast_hours") + ) + + def aggregate_daily(imgcol): + # min/max temps + temp_min = imgcol.select("temperature_2m_sfc").reduce(ee.Reducer.min()) + temp_max = imgcol.select("temperature_2m_sfc").reduce(ee.Reducer.max()) + + # wind means + u_mean = imgcol.select("u_component_of_wind_10m_sfc").mean() + v_mean = imgcol.select("v_component_of_wind_10m_sfc").mean() + + # cumulative -> daily diff needed + precip = imgcol.select("total_precipitation_sfc") + precip_diff = precip.max().subtract(precip.min()) + + solar = imgcol.select("surface_solar_radiation_downwards_sfc") + solar_diff = solar.max().subtract(solar.min()) + + return ( + temp_min.rename("temperature_2m_min") + .addBands(temp_max.rename("temperature_2m_max")) + .addBands(u_mean.rename("u_component_of_wind_10m")) + .addBands(v_mean.rename("v_component_of_wind_10m")) + .addBands(precip_diff.rename("total_precipitation_sum")) + .addBands(solar_diff.rename("surface_solar_radiation_downwards_sum")) + ) + + # make sequence in steps of 24 for daily + total_hours = era5.aggregate_max("forecast_hours").getInfo() + hour_steps = ee.List.sequence(0, total_hours - 1, 24) + + # build daily aggregates in required format + def daily_chunk(offset): + offset = ee.Number(offset) # ensure it's an ee.Number + img = aggregate_daily(era5.filter(ee.Filter.rangeContains("forecast_hours", offset, offset.add(23)))) + return img.set("forecast_offset", offset) + + daily_imgs = hour_steps.map(daily_chunk) + daily_ic = ee.ImageCollection(daily_imgs) + + def extract(image): + # compute valid date = forecasting_date + forecast_offset (in hours) + offset_hours = ee.Number(image.get("forecast_offset")) + valid_date = ee.Date(forecasting_date).advance(offset_hours, "hour") + date_str = valid_date.format("YYYY-MM-dd") + + reducer = ee.Reducer.first() if geo_geotype == "point" else ee.Reducer.mean() + values = image.reduceRegion(reducer, geometry, 1000) + return ee.Feature(None, values.set("date", date_str)) + + features = daily_ic.map(extract) + feature_collection = ee.FeatureCollection(features) + result = feature_collection.getInfo() + records = [f["properties"] for f in result["features"]] + + df = pd.DataFrame(records) + df["date"] = pd.to_datetime(df["date"]) + df["forecast"] = True + + return df + + +@click.command() +@click.option( + "--forecast_date", + type=click.DateTime(formats=["%Y-%m-%d"]), + help="End date for data collection in YYYY-MM-DD format.", +) +@click.option( + "--forecast_date_from_existing", + is_flag=True, + help="Instead of using the fixed forecast_date, derive the forecast date as the last date in an existing output_file.", +) +@click.option( + "--lon", + type=float, + required=False, + help="Longitude of the location. Must be provided if --polygon_path is not set and met_agg_method is centroid.", +) +@click.option( + "--lat", + type=float, + required=False, + help="Latitude of the location. Must be provided if --polygon_path is not set and met_agg_method is centroid.", +) +@click.option( + "--polygon_path", + type=click.Path(file_okay=True, dir_okay=False), + required=False, + help="Path to the regions polygon if using mean aggregation method.", + default=None, +) +@click.option( + "--met_agg_method", + type=click.Choice(["mean", "centroid"], case_sensitive=False), + help="Method to aggregate meteorological data in a ROI.", +) +@click.option( + "--ee_project", + type=str, + required=False, + help="Name of the Earth Engine Project in which to run the ERA5 processing. Only required when using --met_source era5.", +) +@click.option( + "--output_file", + type=click.Path(file_okay=True, dir_okay=False, exists=True), + required=True, + help="File where the forecasts should be saved. Will append by default if existing.", +) +@click.option( + "--overwrite", is_flag=True, help="If the output_file alraedy exists, data will not be appended but overwritten." +) +@click.option("--verbose", is_flag=True, help="Enable verbose logging.") +def cli( + forecast_date, + forecast_date_from_existing, + lon, + lat, + polygon_path, + met_agg_method, + ee_project, + output_file, + overwrite, + verbose, +): + """Fetches / appends ECMWF near real time meteorological forecasts in the required APSIM format.""" + if verbose: + logger.setLevel("INFO") + + if ee_project is None: + raise Exception("Setting --ee_project required when using ERA5 as the meteorological data source.") + + if (not forecast_date and not forecast_date_from_existing) or (forecast_date_from_existing and forecast_date): + raise Exception("Must EITHER set forecast_date or forecast_date_from_existing.") + + if forecast_date_from_existing and not Path(output_file).exists(): + raise Exception( + f"Forecast_date_from_existing chosen, but no file to derive the last date exists under {output_file}." + ) + + logger.info("Initializing google earth engine.") + ee.Initialize(project=ee_project) + + # Identify date from which to do the forecast + df_existing = None + if Path(output_file).exists() and not overwrite: + # Choose last valid date to do the forecast from + df_existing = pd.read_csv(output_file, index_col=0, parse_dates=True) + forecast_date = df_existing.index.max() + + if met_agg_method == "mean": + if not polygon_path: + raise ValueError("Must provided polygon_path when using mean aggregation.") + df = fetch_nrt_forecast(forecasting_date=forecast_date, lon=None, lat=None, polygon_path=polygon_path) + elif met_agg_method == "centroid": + if not lon or not lat: + raise ValueError("Must provide lat and lon when using centroid aggregation.") + df = fetch_nrt_forecast(forecasting_date=forecast_date, lon=lon, lat=lat, polygon_path=None) + else: + raise Exception("Invalid agg method provided") + + df = postprocess_for_apsim(df, end_date=None, is_forecast=True) + + # Load existing and append + if df_existing is not None: + df = df[df.index > forecast_date] + df = pd.concat([df_existing, df]) + + # Clean the data + df = clean_era5(df) + error_checking_function(df) + + # Save to csv + write_met_data_to_csv(df, output_file) + logger.info("Data successfully written to %s", output_file) + + +if __name__ == "__main__": + cli() diff --git a/vercye_ops/met_data/fetch_era5.py b/vercye_ops/met_data/fetch_era5.py index 12bf653..667d64a 100644 --- a/vercye_ops/met_data/fetch_era5.py +++ b/vercye_ops/met_data/fetch_era5.py @@ -14,17 +14,6 @@ logger = get_logger() -# Valid climate variables for the NASA POWER API -VALID_CLIMATE_VARIABLES = ["ALLSKY_SFC_SW_DWN", "T2M_MAX", "T2M_MIN", "T2M", "PRECTOTCORR", "WS2M"] -DEFAULT_CLIMATE_VARIABLES = [ - "ALLSKY_SFC_SW_DWN", - "T2M_MAX", - "T2M_MIN", - "T2M", - "PRECTOTCORR", - "WS2M", -] - def error_checking_function(df): """ @@ -124,19 +113,86 @@ def get_consecutive_date_chunks(dates): return chunks -def fetch_era5_data(start_date, end_date, ee_project, lon=None, lat=None, polygon_path=None): - """ - Fetch meteorological data from ECMWF ERA5. Adjust outputs to align with NasaPower feature names. - """ - logger.info("Fetching meteorological data from ERA5 trough google earth engine.") - logger.info("Initializing google earth engine.") - ee.Initialize(project=ee_project) +def postprocess_for_apsim(met_data, end_date, is_forecast=False): + df = met_data.rename( + columns={ + "total_precipitation_sum": "PRECTOTCORR", + "temperature_2m_max": "T2M_MAX", + "temperature_2m_min": "T2M_MIN", + "surface_solar_radiation_downwards_sum": "ALLSKY_SFC_SW_DWN", + "u_component_of_wind_10m": "u10", + "v_component_of_wind_10m": "v10", + } + ) - logger.info("Querying data.") + # Convert temperatures from Kelvin to Celsius + # Only convert from Kelvin if data is not forecast + if not is_forecast: + df["T2M_MAX"] = df["T2M_MAX"] - 273.15 + df["T2M_MIN"] = df["T2M_MIN"] - 273.15 + + # Convert solar radiation from J/m² to MJ/m² + df["ALLSKY_SFC_SW_DWN"] = df["ALLSKY_SFC_SW_DWN"] / 1_000_000 + # Convert rain from meters to millimeters + df["PRECTOTCORR"] = df["PRECTOTCORR"] * 1000 + + # Calculate wind speed from u and v components + df["u10"] = df["u10"].astype(float) + df["v10"] = df["v10"].astype(float) + df["WS2M"] = np.sqrt(df["u10"] ** 2 + df["v10"] ** 2) + + # Compute mean temperate + df["T2M"] = (df["T2M_MAX"] + df["T2M_MIN"]) / 2 + + # Keep only required columns for APSIM + df = df[["date", "ALLSKY_SFC_SW_DWN", "T2M_MAX", "T2M_MIN", "T2M", "PRECTOTCORR", "WS2M"]] + df.fillna({"ALLSKY_SFC_SW_DWN": 0, "T2M": 0, "T2M_MAX": 0, "T2M_MIN": 0, "PRECTOTCORR": 0, "WS2M": 0}, inplace=True) + + # Sanity CheckEnsure that we have continous data for every day from start_date to end_date + if end_date is not None: + expected_dates = pd.date_range(df["date"].min(), end_date, freq="D") + missing_dates = expected_dates.difference(df["date"]) + + if not missing_dates.empty: + logger.warning(f"Missing dates in the data: {len(missing_dates)}") + + # sort missing dates ascending + missing_dates = missing_dates.sort_values() + # check if it is the newest dates that are missing + if missing_dates[0] > df["date"].max(): + logger.warning("Missing dates are at the end of the data. Not filling them.") + else: + raise Exception("Missing dates - Not yet handled, this shouldnt occur.") + + # set date as index + df.set_index("date", inplace=True) + df.index = pd.to_datetime(df.index) + df = df.sort_index() + + # check for duplicates + if df.index.duplicated().any(): + raise ValueError("Duplicate dates found in the data.") + + return df + + +def has_missing_dates(met_data_df, last_date): + return met_data_df["date"].max() < last_date + + +def get_era5_for_apsim(start_date, end_date, lon=None, lat=None, polygon_path=None): + met_data = fetch_era5_data(start_date, end_date, lon, lat, polygon_path) + met_data_normalized = postprocess_for_apsim(met_data, end_date) + + return met_data_normalized + + +def build_ee_geometry(lon=None, lat=None, polygon_path=None): if polygon_path is None: if lat is None or lon is None: raise ValueError("Must provide either lat/lon or a polygon.") + geometry = ee.Geometry.Point([lon, lat]) geo_type = "point" else: @@ -152,14 +208,26 @@ def fetch_era5_data(start_date, end_date, ee_project, lon=None, lat=None, polygo geometry = ee.Geometry(geojson_dict) geo_type = "polygon" + return geometry, geo_type + + +def fetch_era5_data(start_date, end_date, lon=None, lat=None, polygon_path=None): + """ + Fetch meteorological data from ECMWF ERA5. Adjust outputs to align with NasaPower feature names. + """ + logger.info("Fetching meteorological data from ERA5 trough google earth engine.") + + logger.info("Querying data.") + + geometry, geo_type = build_ee_geometry(lon, lat, polygon_path) all_records = [] def split_date_range(start_date, end_date, chunk_years=10): - start = start_date - end = end_date + relativedelta(days=1) + start = pd.to_datetime(start_date) + end = pd.to_datetime(end_date) + pd.Timedelta(days=1) while start < end: next_start = min(start + relativedelta(years=chunk_years), end) - yield start.strftime("%Y-%m-%d"), next_start.strftime("%Y-%m-%d") + yield start, next_start start = next_start def extract(image): @@ -173,7 +241,7 @@ def extract(image): try: era5 = ( ee.ImageCollection("ECMWF/ERA5_LAND/DAILY_AGGR") - .filterDate(chunk_start, chunk_end) + .filterDate(ee.Date(chunk_start), ee.Date(chunk_end)) .filterBounds(geometry) .select( [ @@ -201,74 +269,12 @@ def extract(image): logger.info("Processing ERA5 data to required format.") - df["date"] = pd.to_datetime(df["date"]) - - df = df.rename( - columns={ - "total_precipitation_sum": "PRECTOTCORR", - "temperature_2m_max": "T2M_MAX", - "temperature_2m_min": "T2M_MIN", - "surface_solar_radiation_downwards_sum": "ALLSKY_SFC_SW_DWN", - "u_component_of_wind_10m": "u10", - "v_component_of_wind_10m": "v10", - } - ) - - # Convert temperatures from Kelvin to Celsius - df["T2M_MAX"] = df["T2M_MAX"] - 273.15 - df["T2M_MIN"] = df["T2M_MIN"] - 273.15 - - # Convert solar radiation from J/m² to MJ/m² - df["ALLSKY_SFC_SW_DWN"] = df["ALLSKY_SFC_SW_DWN"] / 1_000_000 - - # Convert rain from meters to millimeters - df["PRECTOTCORR"] = df["PRECTOTCORR"] * 1000 - - # Calculate wind speed from u and v components - df["u10"] = df["u10"].astype(float) - df["v10"] = df["v10"].astype(float) - df["WS2M"] = np.sqrt(df["u10"] ** 2 + df["v10"] ** 2) - - # Compute mean temperate - df["T2M"] = (df["T2M_MAX"] + df["T2M_MIN"]) / 2 - - # Keep only required columns for APSIM - df = df[["date", "ALLSKY_SFC_SW_DWN", "T2M_MAX", "T2M_MIN", "T2M", "PRECTOTCORR", "WS2M"]] - df.fillna( - {"ALLSKY_SFC_SW_DWN": 0, "T2M": 0, "T2M_MAX": 0, "T2M_MIN": 0, "PRECTOTCORR": 0, "WS2M": 0}, - inplace=True, - ) - - # Ensure that we have continous data for every day from start_date to end_date - expected_dates = pd.date_range(df["date"].min(), end_date, freq="D") - missing_dates = expected_dates.difference(df["date"]) - - if not missing_dates.empty: - logger.warning(f"Missing dates in the data: {len(missing_dates)}") - - # sort missing dates ascending - missing_dates = missing_dates.sort_values() - # check if it is the newest dates that are missing - if missing_dates[0] > df["date"].max(): - logger.warning("Missing dates are at the end of the data. Not filling them.") - else: - raise Exception("Missing dates - Not yet handled, this shouldnt occur.") - - # set date as index - df.set_index("date", inplace=True) - df.index = pd.to_datetime(df.index) - df = df.sort_index() - - # check for duplicates - if df.index.duplicated().any(): - raise ValueError("Duplicate dates found in the data.") - return df -def fetch_from_cache(start_date, end_date, lon, lat, polygon_path, ee_project, cache_fpath): +def fetch_era5_from_cache(start_date, end_date, cache_fpath): # Read existing data - df_existing = pd.read_csv(cache_fpath, index_col=0, parse_dates=True) + df_existing = pd.read_csv(cache_fpath, parse_dates=True, index_col=0) # Determine the date range to fetch existing_dates = df_existing.index @@ -276,11 +282,15 @@ def fetch_from_cache(start_date, end_date, lon, lat, polygon_path, ee_project, c missing_dates = all_dates.difference(existing_dates) # Identify blocks of missing dates - if not missing_dates.empty: + if missing_dates.empty: logger.info("No missing dates found. Using existing data.") df_filtered = df_existing.loc[start_date:end_date] - return df_filtered + return df_filtered, pd.DatetimeIndex([]) + + return df_existing, missing_dates + +def fill_missing_cache_dates(df_existing, missing_dates, lon, lat, polygon_path, cache_fpath): # Fetch data for the missing dates from_missing = missing_dates.min() to_missing = missing_dates.max() @@ -298,7 +308,7 @@ def fetch_from_cache(start_date, end_date, lon, lat, polygon_path, ee_project, c chunk[-1].date(), ) - chunk_data = fetch_era5_data(chunk[0], chunk[-1], ee_project, lon, lat, polygon_path) + chunk_data = get_era5_for_apsim(chunk[0], chunk[-1], lon, lat, polygon_path) missing_data.append(chunk_data) # Combine existing data with newly fetched data and bring in correct order @@ -311,10 +321,7 @@ def fetch_from_cache(start_date, end_date, lon, lat, polygon_path, ee_project, c logger.info("Updating cache file: %s", cache_fpath) write_met_data_to_csv(df_combined, cache_fpath) - # Return only the data from the requested date range - df_filtered = df_combined.loc[start_date:end_date] - - return df_filtered + return df_combined def validate_aggregation_options(met_agg_method): @@ -429,23 +436,26 @@ def cli( if cache_dir is not None: cache_region = f"{lon:.4f}_{lat:.4f}".replace(".", "_") - cache_fpath = Path(cache_dir) / f"{cache_region}_{met_agg_method}_nasapower.csv" + cache_fpath = Path(cache_dir) / f"{cache_region}_{met_agg_method}_era5.csv" if ee_project is None: raise Exception("Setting --ee_project required when using ERA5 as the meteorological data source.") + logger.info("Initializing google earth engine.") + ee.Initialize(project=ee_project) + if cache_dir is not None and Path(cache_fpath).exists() and not overwrite_cache: - logger.info( - "Cache File found for ERA5 region. Will fetch and append only missing dates to: \n%s", - cache_fpath, - ) - df = fetch_from_cache(start_date, end_date, lon, lat, polygon_path, ee_project, cache_fpath) + logger.info("Cache File found for ERA5 region. Will fetch and append only missing dates to: \n%s", cache_fpath) + df, missing_dates = fetch_era5_from_cache(start_date, end_date, cache_fpath) + if not missing_dates.empty: + df = fill_missing_cache_dates(df, missing_dates, lon, lat, polygon_path, cache_fpath) + df = df.loc[start_date:end_date] # Keep only the data from requested date range else: # Get mean or centroid data if met_agg_method == "mean": - df = fetch_era5_data(start_date, end_date, ee_project, None, None, polygon_path) + df = get_era5_for_apsim(start_date, end_date, None, None, polygon_path) elif met_agg_method == "centroid": - df = fetch_era5_data(start_date, end_date, ee_project, lon, lat, None) + df = get_era5_for_apsim(start_date, end_date, lon, lat, None) else: raise ValueError(f"Unsupported met_agg_method: {met_agg_method}") diff --git a/vercye_ops/snakemake/Snakefile b/vercye_ops/snakemake/Snakefile index fd2317b..c29bda2 100644 --- a/vercye_ops/snakemake/Snakefile +++ b/vercye_ops/snakemake/Snakefile @@ -218,7 +218,7 @@ rule fill_era5_cache: rule fetch_met_data: input: geometry = op.join(config['sim_study_head_dir'], '{year}', '{timepoint}', '{region}', '{region}.geojson'), - np_cached_data = op.join(config['sim_study_head_dir'], '{region}_np_cache_filled.txt') if config['apsim_params']['met_source'].lower() == 'nasapower' else [], + np_cached_data = op.join(config['sim_study_head_dir'], '{region}_np_cache_filled.txt') if config['apsim_params']['met_source'].lower() == 'nasa_power' else [], era5_cached_data = op.join(config['sim_study_head_dir'], '{region}_era5_cache_filled.txt') if config['apsim_params']['met_source'].lower() == 'era5' else [], params: met_start_date = lambda wildcards: config['apsim_params']['time_bounds'][int(wildcards.year)][wildcards.timepoint]['met_start_date'], @@ -237,7 +237,8 @@ rule fetch_met_data: log: 'logs_fetch_met_data/{year}_{timepoint}_{region}.log', output: - csv = op.join(config['sim_study_head_dir'], '{year}', '{timepoint}', '{region}', '{region}_met.csv'), + # Use intermediate path when using forecasts to allow appending to these and updating these + csv = op.join(config['sim_study_head_dir'], '{year}', '{timepoint}', '{region}', '{region}_met.csv') if not config['apsim_params']['use_nrt_forecast'] else op.join(config['sim_study_head_dir'], '{year}', '{timepoint}', '{region}', '{region}_non_forecasted_met.csv'), resources: era5_calls = 1 if config['apsim_params']['met_source'].lower() == 'era5' else 0, nasa_power_calls = 1 if config['apsim_params']['met_source'].lower() == 'nasa_power' else 0, @@ -287,6 +288,52 @@ rule fetch_met_data: fi """ +rule append_ecmwf_nrt_forecast: + input: + geometry = op.join(config['sim_study_head_dir'], '{year}', '{timepoint}', '{region}', '{region}.geojson'), + non_forecasted_met_data = op.join(config['sim_study_head_dir'], '{year}', '{timepoint}', '{region}', '{region}_non_forecasted_met.csv'), + params: + scrip_path_nrt_forecast = config['scripts']['nrt_forecast'], + head_dir = config['sim_study_head_dir'], + jq_load_statement = 'module load jq' if config['platform'] == 'umd' else '', + region_name = lambda wildcards: wildcards.region, + ee_project = config['apsim_params']['ee_project'], + met_agg_method = config['apsim_params']['met_agg_method'], + platform = config['platform'], + use_nrt_forecast = config['apsim_params']['use_nrt_forecast'] + log: + 'logs_fetch_met_data/{year}_{timepoint}_{region}_forecast.log', + output: + csv = op.join(config['sim_study_head_dir'], '{year}', '{timepoint}', '{region}', '{region}_met.csv') if config['apsim_params']['use_nrt_forecast'] else [], + resources: + era5_calls = 1 if config['apsim_params']['met_source'].lower() == 'era5' else 0, + retries: 5 if config['apsim_params']['met_source'].lower() == 'era5' else 2, # ERA5 on GEE can be flaky + shell: + """ + # Load geometry centroid + {params.jq_load_statement} + read LON LAT <<< $(cat {input.geometry} | jq -r '.features[].properties.centroid' | awk '{{gsub(/POINT \\(/, ""); gsub(/\\)/, ""); print $1, $2}}') + + # Set polygon path command only for mean aggregation + POLYGON_PATH="" + if [ "{params.met_agg_method}" = "mean" ]; then + POLYGON_PATH="--polygon_path {input.geometry}" + fi + + # Append forecast data + python3 {params.scrip_path_nrt_forecast} \ + --forecast_date_from_existing \ + --met_agg_method {params.met_agg_method} \ + $POLYGON_PATH \ + --lon $LON \ + --lat $LAT \ + --ee_project {params.ee_project} \ + --output_file {input.non_forecasted_met_data} + + # Move file to proper name to be picked up by snakemake + mv {params.non_forecasted_met_data} {output.csv} + """ + # Rule to generate .met files from weather CSVs rule construct_plot_met_files: input: