diff --git a/parm/config/gfs/config.aeroanl b/parm/config/gfs/config.aeroanl index 5ac03bd7ee..591d9490ee 100644 --- a/parm/config/gfs/config.aeroanl +++ b/parm/config/gfs/config.aeroanl @@ -7,10 +7,10 @@ echo "BEGIN: config.aeroanl" # define analysis resolution based on deterministic res case ${CASE} in - "C1152" | "C768" | "C384" | "C192") - CASE_ANL="C192" + "C1152" | "C768" | "C384") + CASE_ANL="C384" ;; - "C96" | "C48") + "C192" | "C96" | "C48") CASE_ANL=${CASE} ;; *) @@ -18,14 +18,16 @@ case ${CASE} in exit 4 esac export CASE_ANL -export OBS_LIST="${PARMgfs}/gdas/aero/obs/lists/gdas_aero.yaml.j2" +export JCB_ALGO_YAML_VAR=${PARMgfs}/gdas/aero/jcb-prototype_3dvar.yaml.j2 export STATICB_TYPE='diffusion' -export BERROR_YAML="${PARMgfs}/gdas/aero/berror/staticb_${STATICB_TYPE}.yaml.j2" +export BERROR_YAML="aero_background_error_static_${STATICB_TYPE}" export BERROR_DATA_DIR="${FIXgfs}/gdas/aero/clim_b" export CRTM_FIX_YAML="${PARMgfs}/gdas/aero_crtm_coeff.yaml.j2" export JEDI_FIX_YAML="${PARMgfs}/gdas/aero_jedi_fix.yaml.j2" +export JEDI_CONFIG_YAML="${PARMgfs}/gdas/aeroanl_jedi_config.yaml.j2" + export AERO_STAGE_VARIATIONAL_TMPL="${PARMgfs}/gdas/aero_stage_variational.yaml.j2" export AERO_FINALIZE_VARIATIONAL_TMPL="${PARMgfs}/gdas/aero_finalize_variational.yaml.j2" @@ -33,15 +35,11 @@ export io_layout_x=@IO_LAYOUT_X@ export io_layout_y=@IO_LAYOUT_Y@ export JEDIEXE="${EXECgfs}/gdas.x" -export BMATEXE="${EXECgfs}/gdasapp_chem_diagb.x" -export DIFFUSIONEXE="${EXECgfs}/gdas_fv3jedi_error_covariance_toolbox.x" if [[ "${DOIAU}" == "YES" ]]; then export aero_bkg_times="3,6,9" - export JEDIYAML="${PARMgfs}/gdas/aero/variational/3dvar_fgat_gfs_aero.yaml.j2" else export aero_bkg_times="6," # Trailing comma is necessary so this is treated as a list - export JEDIYAML="${PARMgfs}/gdas/aero/variational/3dvar_gfs_aero.yaml.j2" fi echo "END: config.aeroanl" diff --git a/parm/config/gfs/config.aeroanlgenb b/parm/config/gfs/config.aeroanlgenb index b41b22a524..d1f0ed10bd 100644 --- a/parm/config/gfs/config.aeroanlgenb +++ b/parm/config/gfs/config.aeroanlgenb @@ -8,9 +8,8 @@ echo "BEGIN: config.aeroanlgenb" # Get task specific resources source "${EXPDIR}/config.resources" aeroanlgenb -export BMATYAML="${PARMgfs}/gdas/aero/berror/aero_diagb.yaml.j2" -export DIFFUSIONYAML="${PARMgfs}/gdas/aero/berror/aero_diffusionparm.yaml.j2" -export INTERPYAML="${PARMgfs}/gdas/aero/berror/aero_interp.yaml.j2" +export JEDI_CONFIG_YAML="${PARMgfs}/gdas/aero_bmat_jedi_config.yaml.j2" +export JCB_BASE_YAML="${PARMgfs}/gdas/aero/jcb-base.yaml.j2" export AERO_BMATRIX_STAGE_TMPL="${PARMgfs}/gdas/aero_stage_bmatrix_bkg.yaml.j2" export AERO_BMATRIX_FINALIZE_TMPL="${PARMgfs}/gdas/aero_finalize_bmatrix_bkg.yaml.j2" export aero_diffusion_iter=10 diff --git a/parm/gdas/aero_bmat_jedi_config.yaml.j2 b/parm/gdas/aero_bmat_jedi_config.yaml.j2 new file mode 100644 index 0000000000..4f0a779335 --- /dev/null +++ b/parm/gdas/aero_bmat_jedi_config.yaml.j2 @@ -0,0 +1,19 @@ +aero_interpbkg: + rundir: '{{ DATA }}' + exe_src: '{{ EXECgfs }}/gdas.x' + jedi_args: ['fv3jedi', 'convertstate'] + mpi_cmd: '{{ APRUN_AEROANLGENB }}' + jcb_base_yaml: '{{ PARMgfs }}/gdas/aero/jcb-base.yaml.j2' + jcb_algo: aero_convert_background +aero_diagb: + rundir: '{{ DATA }}' + exe_src: '{{ EXECgfs }}/gdasapp_chem_diagb.x' + mpi_cmd: '{{ APRUN_AEROANLGENB }}' + jcb_base_yaml: '{{ PARMgfs }}/gdas/aero/jcb-base.yaml.j2' + jcb_algo: aero_gen_bmatrix_diagb +aero_diffusion: + rundir: '{{ DATA }}' + exe_src: '{{ EXECgfs }}/gdas_fv3jedi_error_covariance_toolbox.x' + mpi_cmd: '{{ APRUN_AEROANLGENB }}' + jcb_base_yaml: '{{ PARMgfs }}/gdas/aero/jcb-base.yaml.j2' + jcb_algo: aero_gen_bmatrix_diffusion diff --git a/parm/gdas/aero_finalize_bmatrix_bkg.yaml.j2 b/parm/gdas/aero_finalize_bmatrix_bkg.yaml.j2 index b33f280945..d7addee30d 100644 --- a/parm/gdas/aero_finalize_bmatrix_bkg.yaml.j2 +++ b/parm/gdas/aero_finalize_bmatrix_bkg.yaml.j2 @@ -4,9 +4,9 @@ {% set background_time = current_cycle | add_to_datetime(offset_td) %} copy: ### copy YAMLs used -{% set yaml_list = ['chem_diagb.yaml', 'chem_diffusion.yaml'] %} +{% set yaml_list = ['aero_diagb.yaml', 'aero_diffusion.yaml', 'aero_interpbkg.yaml'] %} {% for fname in yaml_list %} -- ["{{ DATA }}/{{ HEAD }}{{ fname }}", "{{ COMOUT_CHEM_BMAT }}/{{ HEAD }}{{ fname }}"] +- ["{{ DATA }}/{{ fname }}", "{{ COMOUT_CHEM_BMAT }}/{{ HEAD }}{{ fname }}"] {% endfor %} ### copy stddev files to ROTDIR {% for tile in range(1, ntiles+1) %} diff --git a/parm/gdas/aero_finalize_variational.yaml.j2 b/parm/gdas/aero_finalize_variational.yaml.j2 index 7dadd36291..67deb41301 100644 --- a/parm/gdas/aero_finalize_variational.yaml.j2 +++ b/parm/gdas/aero_finalize_variational.yaml.j2 @@ -12,7 +12,7 @@ mkdir: - "{{ COMOUT_ATMOS_RESTART }}" copy: ## copy variational YAML to ROTDIR -- ["{{ DATA }}/{{ APREFIX }}aerovar.yaml", "{{ COMOUT_CHEM_ANALYSIS }}/{{ APREFIX }}aerovar.yaml"] +- ["{{ DATA }}/aeroanlvar.yaml", "{{ COMOUT_CHEM_ANALYSIS }}/{{ APREFIX }}aerovar.yaml"] ## copy increments {% for tile in range(1,ntiles+1) %} - ["{{ DATA }}/anl/aeroinc.{{ current_cycle | to_fv3time }}.fv_tracer.res.tile{{ tile }}.nc", "{{ COMOUT_CHEM_ANALYSIS }}/aeroinc.{{ current_cycle | to_fv3time }}.fv_tracer.res.tile{{ tile }}.nc"] diff --git a/parm/gdas/aero_stage_bmatrix_bkg.yaml.j2 b/parm/gdas/aero_stage_bmatrix_bkg.yaml.j2 index 9005b9ff12..18f5598bd8 100644 --- a/parm/gdas/aero_stage_bmatrix_bkg.yaml.j2 +++ b/parm/gdas/aero_stage_bmatrix_bkg.yaml.j2 @@ -12,6 +12,7 @@ mkdir: - "{{ DATA }}/stddev" - "{{ DATA }}/clm_stddev" - "{{ DATA }}/diffusion" +- "{{ DATA }}/rescale" copy: ###################################### # copy deterministic background files @@ -30,9 +31,15 @@ copy: - ["{{ COM_ATMOS_RESTART_TMPL | replace_tmpl(tmpl_dict) }}/{{ background_time | to_fv3time }}.{{ ftype }}.tile{{ tile }}.nc", "{{ DATA }}/bkg/{{ background_time | to_fv3time }}.{{ ftype }}.tile{{ tile }}.nc"] {% endfor %} {% endfor %} +###################################### # copy climatological stddev files ###################################### {% for tile in range(1, ntiles+1) %} - ["{{ BERROR_DATA_DIR }}/stddev.fv_tracer.res.tile{{ tile }}.nc", "{{ DATA }}/clm_stddev/stddev.fv_tracer.res.tile{{ tile }}.nc"] {% endfor %} - +###################################### +# copy stddev rescaling factor files +###################################### +{% for tile in range(1, ntiles+1) %} +- ["{{ BERROR_DATA_DIR }}/rescale.fv_tracer.res.tile{{ tile }}.nc", "{{ DATA }}/rescale/rescale.fv_tracer.res.tile{{ tile }}.nc"] +{% endfor %} diff --git a/parm/gdas/aeroanl_jedi_config.yaml.j2 b/parm/gdas/aeroanl_jedi_config.yaml.j2 new file mode 100644 index 0000000000..4171f94676 --- /dev/null +++ b/parm/gdas/aeroanl_jedi_config.yaml.j2 @@ -0,0 +1,7 @@ +aeroanlvar: + rundir: '{{ DATA }}' + exe_src: '{{ EXECgfs }}/gdas.x' + mpi_cmd: '{{ APRUN_AEROANL }}' + jedi_args: ['fv3jedi', 'variational'] + jcb_base_yaml: '{{ PARMgfs }}/gdas/aero/jcb-base.yaml.j2' + jcb_algo_yaml: '{{ JCB_ALGO_YAML_VAR }}' diff --git a/scripts/exgdas_aero_analysis_generate_bmatrix.py b/scripts/exgdas_aero_analysis_generate_bmatrix.py index 0d8389c40d..124ca4e413 100755 --- a/scripts/exgdas_aero_analysis_generate_bmatrix.py +++ b/scripts/exgdas_aero_analysis_generate_bmatrix.py @@ -18,10 +18,8 @@ # Take configuration from environment and cast it as python dictionary config = cast_strdict_as_dtypedict(os.environ) - # Instantiate the aerosol variance and diffusion correlation tasks - AeroB = AerosolBMatrix(config) - AeroB.initialize() - AeroB.interpBackground() - AeroB.computeVariance() - AeroB.computeDiffusion() - AeroB.finalize() + # Create an instance of the AerosolBMatrix task + aeroBMat = AerosolBMatrix(config) + aeroBMat.initialize() + aeroBMat.execute() + aeroBMat.finalize() diff --git a/scripts/exglobal_aero_analysis_finalize.py b/scripts/exglobal_aero_analysis_finalize.py index e9464b47e5..19adcc4c38 100755 --- a/scripts/exglobal_aero_analysis_finalize.py +++ b/scripts/exglobal_aero_analysis_finalize.py @@ -21,4 +21,6 @@ # Instantiate the aerosol analysis task AeroAnl = AerosolAnalysis(config) + + # Finalize JEDI aerosol variational analysis AeroAnl.finalize() diff --git a/scripts/exglobal_aero_analysis_initialize.py b/scripts/exglobal_aero_analysis_initialize.py index 3a57dc8401..e3699f23e5 100755 --- a/scripts/exglobal_aero_analysis_initialize.py +++ b/scripts/exglobal_aero_analysis_initialize.py @@ -21,4 +21,6 @@ # Instantiate the aerosol analysis task AeroAnl = AerosolAnalysis(config) + + # Initialize JEDI aerosol variational analysis AeroAnl.initialize() diff --git a/scripts/exglobal_aero_analysis_variational.py b/scripts/exglobal_aero_analysis_variational.py index dd5bb4f65a..1a99d3c5d7 100755 --- a/scripts/exglobal_aero_analysis_variational.py +++ b/scripts/exglobal_aero_analysis_variational.py @@ -19,4 +19,6 @@ # Instantiate the aerosol analysis task AeroAnl = AerosolAnalysis(config) - AeroAnl.variational() + + # Execute JEDI variational analysis + AeroAnl.execute('aeroanlvar') diff --git a/sorc/gdas.cd b/sorc/gdas.cd index d9b40c9bde..a2ea3770ae 160000 --- a/sorc/gdas.cd +++ b/sorc/gdas.cd @@ -1 +1 @@ -Subproject commit d9b40c9bde1e5e70451f309a5e010267aaadba00 +Subproject commit a2ea3770aeb9d4308bde51bb1d8c9c94cc9534c8 diff --git a/ush/python/pygfs/__init__.py b/ush/python/pygfs/__init__.py index 9f290fafd3..6b7b7eb4c9 100644 --- a/ush/python/pygfs/__init__.py +++ b/ush/python/pygfs/__init__.py @@ -2,7 +2,6 @@ import os from .task.analysis import Analysis -from .task.bmatrix import BMatrix from .task.aero_emissions import AerosolEmissions from .task.aero_analysis import AerosolAnalysis from .task.aero_bmatrix import AerosolBMatrix diff --git a/ush/python/pygfs/task/aero_analysis.py b/ush/python/pygfs/task/aero_analysis.py index 0389e109a1..de9360a9ac 100644 --- a/ush/python/pygfs/task/aero_analysis.py +++ b/ush/python/pygfs/task/aero_analysis.py @@ -5,34 +5,49 @@ import gzip import tarfile from logging import getLogger -from typing import Dict, List, Any +from pprint import pformat +from netCDF4 import Dataset +from typing import Dict, List from wxflow import (AttrDict, FileHandler, add_to_datetime, to_fv3time, to_timedelta, - chdir, to_fv3time, - YAMLFile, parse_j2yaml, save_as_yaml, - logit, - Executable, - WorkflowException) -from pygfs.task.analysis import Analysis + Task, + YAMLFile, parse_j2yaml, + logit) +from pygfs.jedi import Jedi logger = getLogger(__name__.split('.')[-1]) -class AerosolAnalysis(Analysis): +class AerosolAnalysis(Task): """ - Class for global aerosol analysis tasks + Class for JEDI-based global aerosol analysis tasks """ @logit(logger, name="AerosolAnalysis") def __init__(self, config): + """Constructor global aero analysis task + + This method will construct a global aero analysis task. + This includes: + - extending the task_config attribute AttrDict to include parameters required for this task + - instantiate the Jedi attribute object + + Parameters + ---------- + config: Dict + dictionary object containing task configuration + + Returns + ---------- + None + """ super().__init__(config) _res = int(self.task_config['CASE'][1:]) _res_anl = int(self.task_config['CASE_ANL'][1:]) _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2) - _jedi_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config['cyc']:02d}z.aerovar.yaml") # Create a local dictionary that is repeatedly used across this class local_dict = AttrDict( @@ -50,72 +65,94 @@ def __init__(self, config): 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'jedi_yaml': _jedi_yaml, + 'aero_obsdatain_path': f"{self.task_config.DATA}/obs/", + 'aero_obsdataout_path': f"{self.task_config.DATA}/diags/", + 'BKG_TSTEP': "PT3H" # FGAT } ) # Extend task_config with local_dict self.task_config = AttrDict(**self.task_config, **local_dict) + # Create dictionary of Jedi objects + expected_keys = ['aeroanlvar'] + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + @logit(logger) - def initialize(self: Analysis) -> None: + def initialize(self) -> None: """Initialize a global aerosol analysis This method will initialize a global aerosol analysis using JEDI. This includes: + - initialize JEDI applications + - staging observation files + - staging bias correction files - staging CRTM fix files - staging FV3-JEDI fix files - staging B error files - staging model backgrounds - - generating a YAML file for the JEDI executable - creating output directories """ - super().initialize() + + # initialize JEDI variational application + logger.info(f"Initializing JEDI variational DA application") + self.jedi_dict['aeroanlvar'].initialize(self.task_config) + + # stage observations + logger.info(f"Staging list of observation files generated from JEDI config") + obs_dict = self.jedi_dict['aeroanlvar'].render_jcb(self.task_config, 'aero_obs_staging') + FileHandler(obs_dict).sync() + logger.debug(f"Observation files:\n{pformat(obs_dict)}") + + # # stage bias corrections + # logger.info(f"Staging list of bias correction files") + # bias_dict = self.jedi_dict['aeroanlvar'].render_jcb(self.task_config, 'aero_bias_staging') + # if bias_dict['copy'] is None: + # logger.info(f"No bias correction files to stage") + # else: + # bias_dict['copy'] = Jedi.remove_redundant(bias_dict['copy']) + # FileHandler(bias_dict).sync() + # logger.debug(f"Bias correction files:\n{pformat(bias_dict)}") + + # # extract bias corrections + # Jedi.extract_tar_from_filehandler_dict(bias_dict) # stage CRTM fix files logger.info(f"Staging CRTM fix files from {self.task_config.CRTM_FIX_YAML}") - crtm_fix_list = parse_j2yaml(self.task_config.CRTM_FIX_YAML, self.task_config) - FileHandler(crtm_fix_list).sync() + crtm_fix_dict = parse_j2yaml(self.task_config.CRTM_FIX_YAML, self.task_config) + FileHandler(crtm_fix_dict).sync() + logger.debug(f"CRTM fix files:\n{pformat(crtm_fix_dict)}") # stage fix files logger.info(f"Staging JEDI fix files from {self.task_config.JEDI_FIX_YAML}") - jedi_fix_list = parse_j2yaml(self.task_config.JEDI_FIX_YAML, self.task_config) - FileHandler(jedi_fix_list).sync() + jedi_fix_dict = parse_j2yaml(self.task_config.JEDI_FIX_YAML, self.task_config) + FileHandler(jedi_fix_dict).sync() + logger.debug(f"JEDI fix files:\n{pformat(jedi_fix_dict)}") # stage files from COM and create working directories logger.info(f"Staging files prescribed from {self.task_config.AERO_STAGE_VARIATIONAL_TMPL}") - aero_var_stage_list = parse_j2yaml(self.task_config.AERO_STAGE_VARIATIONAL_TMPL, self.task_config) - FileHandler(aero_var_stage_list).sync() - - # generate variational YAML file - logger.debug(f"Generate variational YAML file: {self.task_config.jedi_yaml}") - save_as_yaml(self.task_config.jedi_config, self.task_config.jedi_yaml) - logger.info(f"Wrote variational YAML to: {self.task_config.jedi_yaml}") + aero_var_stage_dict = parse_j2yaml(self.task_config.AERO_STAGE_VARIATIONAL_TMPL, self.task_config) + FileHandler(aero_var_stage_dict).sync() + logger.debug(f"Staging from COM:\n{pformat(aero_var_stage_dict)}") @logit(logger) - def variational(self: Analysis) -> None: + def execute(self, jedi_dict_key: str) -> None: + """Execute JEDI application of aero analysis - chdir(self.task_config.DATA) - - exec_cmd = Executable(self.task_config.APRUN_AEROANL) - exec_name = os.path.join(self.task_config.DATA, 'gdas.x') - exec_cmd.add_default_arg(exec_name) - exec_cmd.add_default_arg('fv3jedi') - exec_cmd.add_default_arg('variational') - exec_cmd.add_default_arg(self.task_config.jedi_yaml) + Parameters + ---------- + jedi_dict_key + key specifying particular Jedi object in self.jedi_dict - try: - logger.debug(f"Executing {exec_cmd}") - exec_cmd() - except OSError: - raise OSError(f"Failed to execute {exec_cmd}") - except Exception: - raise WorkflowException(f"An error occured during execution of {exec_cmd}") + Returns + ---------- + None + """ - pass + self.jedi_dict[jedi_dict_key].execute() @logit(logger) - def finalize(self: Analysis) -> None: + def finalize(self) -> None: """Finalize a global aerosol analysis This method will finalize a global aerosol analysis using JEDI. @@ -145,6 +182,9 @@ def finalize(self: Analysis) -> None: logger.info('Adding increments to RESTART files') self._add_fms_cube_sphere_increments() + # tar up bias correction files + # NOTE TODO + # copy files back to COM logger.info(f"Copying files to COM based on {self.task_config.AERO_FINALIZE_VARIATIONAL_TMPL}") aero_var_final_list = parse_j2yaml(self.task_config.AERO_FINALIZE_VARIATIONAL_TMPL, self.task_config) @@ -161,7 +201,7 @@ def clean(self): super().clean() @logit(logger) - def _add_fms_cube_sphere_increments(self: Analysis) -> None: + def _add_fms_cube_sphere_increments(self) -> None: """This method adds increments to RESTART files to get an analysis """ if self.task_config.DOIAU: @@ -176,46 +216,32 @@ def _add_fms_cube_sphere_increments(self: Analysis) -> None: # get list of increment vars incvars_list_path = os.path.join(self.task_config['PARMgfs'], 'gdas', 'aeroanl_inc_vars.yaml') incvars = YAMLFile(path=incvars_list_path)['incvars'] - super().add_fv3_increments(inc_template, bkg_template, incvars) + self.add_fv3_increments(inc_template, bkg_template, incvars) @logit(logger) - def get_bkg_dict(self, task_config: Dict[str, Any]) -> Dict[str, List[str]]: - """Compile a dictionary of model background files to copy - - This method constructs a dictionary of FV3 RESTART files (coupler, core, tracer) - that are needed for global aerosol DA and returns said dictionary for use by the FileHandler class. + def add_fv3_increments(self, inc_file_tmpl: str, bkg_file_tmpl: str, incvars: List) -> None: + """Add cubed-sphere increments to cubed-sphere backgrounds Parameters ---------- - task_config: Dict - a dictionary containing all of the configuration needed for the task - - Returns - ---------- - bkg_dict: Dict - a dictionary containing the list of model background files to copy for FileHandler + inc_file_tmpl : str + template of the FV3 increment file of the form: 'filetype.tile{tilenum}.nc' + bkg_file_tmpl : str + template of the FV3 background file of the form: 'filetype.tile{tilenum}.nc' + incvars : List + List of increment variables to add to the background """ - bkg_dict = {} - return bkg_dict - - @logit(logger) - def get_berror_dict(self, config: Dict[str, Any]) -> Dict[str, List[str]]: - """Compile a dictionary of background error files to copy - This method will construct a dictionary of BUMP background error files - for global aerosol DA and return said dictionary for use by the FileHandler class. - This dictionary contains coupler and fv_tracer files - for correlation and standard deviation as well as NICAS localization. - - Parameters - ---------- - config: Dict - a dictionary containing all of the configuration needed - - Returns - ---------- - berror_dict: Dict - a dictionary containing the list of background error files to copy for FileHandler - """ - berror_dict = {} - return berror_dict + for itile in range(1, self.task_config.ntiles + 1): + inc_path = inc_file_tmpl.format(tilenum=itile) + bkg_path = bkg_file_tmpl.format(tilenum=itile) + with Dataset(inc_path, mode='r') as incfile, Dataset(bkg_path, mode='a') as rstfile: + for vname in incvars: + increment = incfile.variables[vname][:] + bkg = rstfile.variables[vname][:] + anl = bkg + increment + rstfile.variables[vname][:] = anl[:] + try: + rstfile.variables[vname].delncattr('checksum') # remove the checksum so fv3 does not complain + except (AttributeError, RuntimeError): + pass # checksum is missing, move on diff --git a/ush/python/pygfs/task/aero_bmatrix.py b/ush/python/pygfs/task/aero_bmatrix.py index c652bad558..572eaaf9dc 100644 --- a/ush/python/pygfs/task/aero_bmatrix.py +++ b/ush/python/pygfs/task/aero_bmatrix.py @@ -2,31 +2,43 @@ import os from logging import getLogger -from typing import List, Dict, Any, Union +from typing import List, Dict -from wxflow import (AttrDict, FileHandler, rm_p, - add_to_datetime, to_fv3time, to_timedelta, - to_fv3time, chdir, Executable, WorkflowException, - parse_j2yaml, save_as_yaml, logit) -from pygfs.task.bmatrix import BMatrix +from wxflow import (AttrDict, FileHandler, + add_to_datetime, to_timedelta, + parse_j2yaml, logit, Task) +from pygfs.jedi import Jedi logger = getLogger(__name__.split('.')[-1]) -class AerosolBMatrix(BMatrix): +class AerosolBMatrix(Task): """ Class for global aerosol BMatrix tasks """ @logit(logger, name="AerosolBMatrix") - def __init__(self, config: Dict[str, Any]) -> None: + def __init__(self, config): + """Constructor global aero analysis bmatrix task + + This method will construct a global aero bmatrix task object. + This includes: + - extending the task_config attribute AttrDict to include parameters required for this task + - instantiate the Jedi attribute object + + Parameters + ---------- + config: Dict + dictionary object containing task configuration + + Returns + ---------- + None + """ super().__init__(config) _res = int(self.task_config['CASE'][1:]) _res_anl = int(self.task_config['CASE_ANL'][1:]) - - _bmat_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config['cyc']:02d}z.chem_diagb.yaml") - _diffusion_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config['cyc']:02d}z.chem_diffusion.yaml") - _convertstate_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config['cyc']:02d}z.chem_convertstate.yaml") + _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2) # Create a local dictionary that is repeatedly used across this class local_dict = AttrDict( @@ -38,257 +50,95 @@ def __init__(self, config: Dict[str, Any]) -> None: 'npx_anl': _res_anl + 1, 'npy_anl': _res_anl + 1, 'npz_anl': self.task_config['LEVS'] - 1, + 'AERO_WINDOW_BEGIN': _window_begin, + 'AERO_WINDOW_LENGTH': f"PT{self.task_config['assim_freq']}H", 'aero_bkg_fhr': map(int, str(self.task_config['aero_bkg_times']).split(',')), 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'bmat_yaml': _bmat_yaml, - 'diffusion_yaml': _diffusion_yaml, - 'convertstate_yaml': _convertstate_yaml, + 'aero_obsdatain_path': f"{self.task_config.DATA}/obs/", + 'aero_obsdataout_path': f"{self.task_config.DATA}/diags/", } ) # task_config is everything that this task should need self.task_config = AttrDict(**self.task_config, **local_dict) - @logit(logger) - def initialize(self: BMatrix) -> None: - super().initialize() - # stage fix files - logger.info(f"Staging JEDI fix files from {self.task_config.JEDI_FIX_YAML}") - jedi_fix_list = parse_j2yaml(self.task_config.JEDI_FIX_YAML, self.task_config) - FileHandler(jedi_fix_list).sync() - - # stage backgrounds - logger.info(f"Staging backgrounds prescribed from {self.task_config.AERO_BMATRIX_STAGE_TMPL}") - aero_bmat_stage_list = parse_j2yaml(self.task_config.AERO_BMATRIX_STAGE_TMPL, self.task_config) - FileHandler(aero_bmat_stage_list).sync() - - # generate convert state YAML file - logger.info(f"Generate convert state YAML file: {self.task_config.convertstate_yaml}") - self.task_config.convertstate_config = parse_j2yaml(self.task_config.INTERPYAML, - self.task_config, - searchpath=self.gdasapp_j2tmpl_dir) - save_as_yaml(self.task_config.convertstate_config, self.task_config.convertstate_yaml) - logger.info(f"Wrote convert state YAML to: {self.task_config.convertstate_yaml}") - - # generate diagb YAML file - logger.info(f"Generate bmat YAML file: {self.task_config.bmat_yaml}") - self.task_config.bmat_config = parse_j2yaml(self.task_config.BMATYAML, - self.task_config, - searchpath=self.gdasapp_j2tmpl_dir) - save_as_yaml(self.task_config.bmat_config, self.task_config.bmat_yaml) - logger.info(f"Wrote bmat YAML to: {self.task_config.bmat_yaml}") - - # generate diffusion parameters YAML file - logger.info(f"Generate diffusion YAML file: {self.task_config.diffusion_yaml}") - self.task_config.diffusion_config = parse_j2yaml(self.task_config.DIFFUSIONYAML, - self.task_config, - searchpath=self.gdasapp_j2tmpl_dir) - save_as_yaml(self.task_config.diffusion_config, self.task_config.diffusion_yaml) - logger.info(f"Wrote diffusion YAML to: {self.task_config.diffusion_yaml}") - - # link executable to run directory - self.link_bmatexe() - self.link_diffusion_exe() - self.link_jediexe() - - @logit(logger) - def interpBackground(self) -> None: - chdir(self.task_config.DATA) - - exec_cmd = Executable(self.task_config.APRUN_AEROANLGENB) - exec_name = os.path.join(self.task_config.DATA, 'gdas.x') - exec_cmd.add_default_arg(exec_name) - exec_cmd.add_default_arg('fv3jedi') - exec_cmd.add_default_arg('convertstate') - exec_cmd.add_default_arg(self.task_config.convertstate_yaml) - - try: - logger.debug(f"Executing {exec_cmd}") - exec_cmd() - except OSError: - raise OSError(f"Failed to execute {exec_cmd}") - except Exception: - raise WorkflowException(f"An error occured during execution of {exec_cmd}") - - pass - - @logit(logger) - def computeVariance(self) -> None: - - chdir(self.task_config.DATA) - - exec_cmd = Executable(self.task_config.APRUN_AEROANLGENB) - exec_name = os.path.join(self.task_config.DATA, 'gdasapp_chem_diagb.x') - exec_cmd.add_default_arg(exec_name) - exec_cmd.add_default_arg(self.task_config.bmat_yaml) - - try: - logger.debug(f"Executing {exec_cmd}") - exec_cmd() - except OSError: - raise OSError(f"Failed to execute {exec_cmd}") - except Exception: - raise WorkflowException(f"An error occured during execution of {exec_cmd}") - - pass + # Create dictionary of Jedi objects + expected_keys = ['aero_interpbkg', 'aero_diagb', 'aero_diffusion'] + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) @logit(logger) - def computeDiffusion(self) -> None: - - chdir(self.task_config.DATA) - - exec_cmd_diffusion = Executable(self.task_config.APRUN_AEROANLGENB) - exec_name_diffusion = os.path.join(self.task_config.DATA, 'gdas_fv3jedi_error_covariance_toolbox.x') - exec_cmd_diffusion.add_default_arg(exec_name_diffusion) - exec_cmd_diffusion.add_default_arg(self.task_config.diffusion_yaml) + def initialize(self: Task) -> None: + """Initialize a global aerosol B-matrix - try: - logger.debug(f"Executing {exec_cmd_diffusion}") - exec_cmd_diffusion() - except OSError: - raise OSError(f"Failed to execute {exec_cmd_diffusion}") - except Exception: - raise WorkflowException(f"An error occured during execution of {exec_cmd_diffusion}") - - pass - - @logit(logger) - def finalize(self) -> None: - super().finalize() - # save files to COMOUT - logger.info(f"Saving files to COMOUT based on {self.task_config.AERO_BMATRIX_FINALIZE_TMPL}") - aero_bmat_finalize_list = parse_j2yaml(self.task_config.AERO_BMATRIX_FINALIZE_TMPL, self.task_config) - FileHandler(aero_bmat_finalize_list).sync() - - @logit(logger) - def link_jediexe(self) -> None: - """ - - This method links a JEDI executable to the run directory + This method will initialize a global aerosol B-Matrix. + This includes: + - staging the determinstic backgrounds + - staging fix files + - initializing the JEDI applications Parameters ---------- - Task: GDAS task - - Returns - ---------- None - """ - exe_src = self.task_config.JEDIEXE - - # TODO: linking is not permitted per EE2. Needs work in JEDI to be able to copy the exec. - logger.info(f"Link executable {exe_src} to DATA/") - logger.warn("Linking is not permitted per EE2.") - exe_dest = os.path.join(self.task_config.DATA, os.path.basename(exe_src)) - if os.path.exists(exe_dest): - rm_p(exe_dest) - os.symlink(exe_src, exe_dest) - - return exe_dest - - @logit(logger) - def link_bmatexe(self) -> None: - """ - - This method links a JEDI executable to the run directory - - Parameters - ---------- - Task: GDAS task Returns ---------- None """ - exe_src = self.task_config.BMATEXE - # TODO: linking is not permitted per EE2. Needs work in JEDI to be able to copy the exec. - logger.info(f"Link executable {exe_src} to DATA/") - logger.warn("Linking is not permitted per EE2.") - exe_dest = os.path.join(self.task_config.DATA, os.path.basename(exe_src)) - if os.path.exists(exe_dest): - rm_p(exe_dest) - os.symlink(exe_src, exe_dest) + # stage fix files + logger.info(f"Staging JEDI fix files from {self.task_config.JEDI_FIX_YAML}") + jedi_fix_list = parse_j2yaml(self.task_config.JEDI_FIX_YAML, self.task_config) + FileHandler(jedi_fix_list).sync() + + # stage backgrounds + logger.info(f"Staging backgrounds prescribed from {self.task_config.AERO_BMATRIX_STAGE_TMPL}") + aero_bmat_stage_list = parse_j2yaml(self.task_config.AERO_BMATRIX_STAGE_TMPL, self.task_config) + FileHandler(aero_bmat_stage_list).sync() - return + # initialize JEDI applications + self.jedi_dict['aero_interpbkg'].initialize(self.task_config) + self.jedi_dict['aero_diagb'].initialize(self.task_config) + self.jedi_dict['aero_diffusion'].initialize(self.task_config) @logit(logger) - def link_diffusion_exe(self) -> None: - """ + def execute(self) -> None: + """Generate the full B-matrix - This method links a JEDI (fv3jedi_error_covariance_toolbox.x) - executable to the run directory + This method will generate the full B-matrix according to the configuration. + This includes: + - running all JEDI applications required to generate the B-matrix Parameters ---------- - Task: GDAS task + None Returns ---------- None """ - exe_src_diffusion = self.task_config.DIFFUSIONEXE + # interpolate backgrounds to analysis resolution + self.jedi_dict['aero_interpbkg'].execute() - # TODO: linking is not permitted per EE2. Needs work in JEDI to be able to copy the exec. - logger.info(f"Link executable {exe_src_diffusion} to DATA/") - logger.warn("Linking is not permitted per EE2.") - exe_dest_diffusion = os.path.join(self.task_config.DATA, os.path.basename(exe_src_diffusion)) - if os.path.exists(exe_dest_diffusion): - rm_p(exe_dest_diffusion) - os.symlink(exe_src_diffusion, exe_dest_diffusion) + # variance partitioning + self.jedi_dict['aero_diagb'].execute() - return + # diffusion + self.jedi_dict['aero_diffusion'].execute() @logit(logger) - def get_bkg_dict(self, task_config: Dict[str, Any]) -> Dict[str, List[str]]: - """Compile a dictionary of model background files to copy + def finalize(self) -> None: + """Finalize a global aerosol bmatrix - This method constructs a dictionary of FV3 RESTART files (coupler, core, tracer) - that are needed for global aerosol DA and returns said dictionary for use by the FileHandler class. + This method will finalize a global aerosol bmatrix using JEDI. + This includes: + - copying the bmatrix files to COM + - copying YAMLs to COM - Parameters - ---------- - task_config: Dict - a dictionary containing all of the configuration needed for the task - - Returns - ---------- - bkg_dict: Dict - a dictionary containing the list of model background files to copy for FileHandler """ - # NOTE for now this is FV3 RESTART files and just assumed to be fh006 - - # get FV3 RESTART files, this will be a lot simpler when using history files - rst_dir = task_config.COM_ATMOS_RESTART_PREV - run_dir = os.path.join(task_config['DATA'], 'bkg') - - # Start accumulating list of background files to copy - bkglist = [] - - # if using IAU, we can use FGAT - bkgtimes = [] - begintime = task_config.previous_cycle - for fcsthr in task_config.aero_bkg_fhr: - bkgtimes.append(add_to_datetime(begintime, to_timedelta(f"{fcsthr}H"))) - - # now loop over background times - for bkgtime in bkgtimes: - # aerosol DA needs coupler - basename = f'{to_fv3time(bkgtime)}.coupler.res' - bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - # aerosol DA only needs core/tracer - for ftype in ['core', 'tracer']: - template = f'{to_fv3time(bkgtime)}.fv_{ftype}.res.tile{{tilenum}}.nc' - for itile in range(1, task_config.ntiles + 1): - basename = template.format(tilenum=itile) - bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - bkg_dict = { - 'mkdir': [run_dir], - 'copy': bkglist, - } - return bkg_dict + # save files to COMOUT + logger.info(f"Saving files to COMOUT based on {self.task_config.AERO_BMATRIX_FINALIZE_TMPL}") + aero_bmat_finalize_list = parse_j2yaml(self.task_config.AERO_BMATRIX_FINALIZE_TMPL, self.task_config) + FileHandler(aero_bmat_finalize_list).sync() diff --git a/ush/python/pygfs/task/bmatrix.py b/ush/python/pygfs/task/bmatrix.py deleted file mode 100644 index d0edba2358..0000000000 --- a/ush/python/pygfs/task/bmatrix.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python3 - -import os -from logging import getLogger -from typing import List, Dict, Any, Union - -from wxflow import (parse_j2yaml, FileHandler, logit, - Task, Executable, WorkflowException) - -logger = getLogger(__name__.split('.')[-1]) - - -class BMatrix(Task): - """Parent class for GDAS BMatrix tasks - - The BMatrix class is the parent class for all - Global Data Assimilation System (GDAS) BMatrix tasks - """ - def __init__(self, config: Dict[str, Any]) -> None: - super().__init__(config) - # Store location of GDASApp jinja2 templates - self.gdasapp_j2tmpl_dir = os.path.join(self.task_config.PARMgfs, 'gdas') - - def initialize(self) -> None: - super().initialize() - - def finalize(self) -> None: - super().finalize()