Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions aiida_flexpart/workflows/child_meteo_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
"""Prepare meteorological data for
Flexpart simulstions."""

from aiida import engine, orm
from aiida_shell import launch_shell_job
from aiida_flexpart.utils import get_simulation_period

#possible models
cosmo_models = ['cosmo7', 'cosmo1', 'kenda1']
ECMWF_models = ['IFS_GL_05', 'IFS_GL_1', 'IFS_EU_02', 'IFS_EU_01']

class TransferMeteoWorkflow(engine.WorkChain):
"""Multi-dates workflow for transfering the necessary
Meteorological data for the subsequent Flexpart
simulations."""

@classmethod
def define(cls, spec):
"""Specify inputs and outputs."""
super().define(spec)

spec.input('check_meteo_cosmo_code', valid_type=orm.AbstractCode)
spec.input('check_meteo_ifs_code', valid_type=orm.AbstractCode)
spec.input('simulation_dates',
valid_type=orm.List,
help='A list of the starting dates of the simulations')
spec.input('gribdir', valid_type=orm.Str, required=True)

spec.input('model', valid_type=orm.List, required=True)
spec.input('model_offline', valid_type=orm.List, required=True)
spec.input('offline_integration_time', valid_type=orm.Int)
spec.input('integration_time', valid_type=orm.Int)
spec.input('command', valid_type=orm.Dict)

# Workflow outline
spec.outline(
cls.setup,
engine.while_(cls.condition)(
cls.prepare_meteo_folder,
engine.if_(cls.offline)(
cls.prepare_meteo_folder
),
cls.add_index
),
)

def setup(self):
self.ctx.index = 0
self.ctx.command = self.inputs.command
self.ctx.simulation_dates = self.inputs.simulation_dates

def condition(self):
self.ctx.offline = False
return self.ctx.index < len(self.ctx.simulation_dates)

def offline(self):
if self.inputs.offline_integration_time > 0:
self.ctx.offline = True
return True
return False

def add_index(self):
self.ctx.index += 1

def prepare_meteo_folder(self):
"""prepare meteo folder"""
model_list = self.inputs.model
code_ = self.inputs.check_meteo_cosmo_code
age_class_ = self.inputs.integration_time.value * 3600

if all(mod in ECMWF_models
for mod in self.inputs.model) and self.inputs.model:
code_ = self.inputs.check_meteo_ifs_code

if self.ctx.offline:
age_class_ = self.inputs.offline_integration_time.value * 3600
model_list = self.inputs.model_offline
code_ = self.inputs.check_meteo_ifs_code

e_date, s_date = get_simulation_period(
self.ctx.simulation_dates[self.ctx.index], age_class_,
self.ctx.command.get_dict()['release_duration'],
self.ctx.command.get_dict()['simulation_direction'])
self.report(f'preparing meteo from {s_date} to {e_date}')

node_list = []
for mod in model_list:
self.report(f'transfering {mod} meteo')
_, node = launch_shell_job(
code_,
arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a',
nodes={
'sdate': orm.Str(s_date),
'edate': orm.Str(e_date),
'gribdir': self.inputs.gribdir,
'model': orm.Str(mod)
})
node_list.append(node)

if all(node.is_finished_ok for node in node_list):
self.report('ALL meteo OK')
290 changes: 290 additions & 0 deletions aiida_flexpart/workflows/child_sim_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
# -*- coding: utf-8 -*-
"""Flexpart multi-dates WorkChain."""
from aiida import engine, plugins, orm
from aiida_flexpart.workflows.child_meteo_workflow import TransferMeteoWorkflow

# plugins
FlexpartCosmoCalculation = plugins.CalculationFactory("flexpart.cosmo")
FlexpartIfsCalculation = plugins.CalculationFactory("flexpart.ifs")
FlexpartPostCalculation = plugins.CalculationFactory("flexpart.post")

# possible models
cosmo_models = ["cosmo7", "cosmo1", "kenda1"]
ECMWF_models = ["IFS_GL_05", "IFS_GL_1", "IFS_EU_02", "IFS_EU_01"]


class FlexpartSimWorkflow(engine.WorkChain):
"""Flexpart multi-dates workflow"""

@classmethod
def define(cls, spec):
"""Specify inputs and outputs."""
super().define(spec)

# Codes
spec.input("fcosmo_code", valid_type=orm.AbstractCode)
spec.input("fifs_code", valid_type=orm.AbstractCode)
spec.input("post_processing_code", valid_type=orm.AbstractCode)

#extras
spec.input('name', valid_type=str, non_db=True, required=False)

# Basic Inputs
spec.expose_inputs(TransferMeteoWorkflow)
spec.input("date", valid_type=orm.Int, required=False)

# Model settings
# Command is exposed form TransferMeteoWorkflow
spec.input("input_phy", valid_type=orm.Dict)
spec.input("release_settings", valid_type=orm.Dict)
spec.input("locations", valid_type=orm.Dict,
help="Dictionary of locations properties."
)

# Meteo related inputs
spec.input(
"meteo_inputs",
valid_type=orm.Dict,
required=False,
help="Meteo models input params.",
)
spec.input(
"meteo_inputs_offline",
valid_type=orm.Dict,
required=False,
help="Meteo models input params.",
)
spec.input(
"meteo_path",
valid_type=orm.List,
required=False,
help="Path to the folder containing the meteorological input data.",
)
spec.input(
"meteo_path_offline",
valid_type=orm.List,
required=False,
help="Path to the folder containing the meteorological input data.",
)

# Others
spec.input("outgrid", valid_type=orm.Dict)
spec.input("outgrid_nest", valid_type=orm.Dict, required=False)
spec.input("species", valid_type=orm.RemoteData, required=True)
spec.input_namespace(
"land_use",
valid_type=orm.RemoteData,
required=False,
dynamic=True,
help="#TODO",
)
spec.input_namespace(
"land_use_ifs", valid_type=orm.RemoteData, required=False, dynamic=True
)
spec.expose_inputs(
FlexpartCosmoCalculation,
include=["metadata.options"],
namespace="flexpartcosmo",
)
spec.expose_inputs(
FlexpartIfsCalculation,
include=["metadata.options"],
namespace="flexpartifs",
)
spec.expose_inputs(
FlexpartPostCalculation,
include=["metadata.options"],
namespace="flexpartpost",
)
spec.outputs.dynamic = True

#exit codes
spec.exit_code(400, 'ERROR_CALCULATION_FAILED',
'the previous calculation did not finish successfully')

spec.outline(
cls.setup,

engine.if_(cls.run_cosmo)(
cls.run_cosmo_simulation,
cls.inspect_calculation
),
engine.if_(cls.run_ifs)(
cls.run_ifs_simulation,
cls.inspect_calculation
),

cls.post_processing,
cls.results,
)

def run_cosmo(self):
"""run cosmo simulation"""
if all(mod in cosmo_models for mod in self.inputs.model) and self.inputs.model:
return True
return False

def run_ifs(self):
"""run ifs simulation"""
if (
all(mod in ECMWF_models for mod in self.inputs.model)
or all(mod in ECMWF_models for mod in self.inputs.model_offline)
and self.inputs.model
and self.inputs.model_offline
):
return True
return False

def inspect_calculation(self):
if not self.ctx.calculations[-1].is_finished_ok:
self.report('ERROR calculation did not finish ok')
return self.exit_codes.ERROR_CALCULATION_FAILED
self.report('calculation successfull')

def setup(self):

self.ctx.simulation_date = self.inputs.simulation_dates[self.inputs.date.value]
self.ctx.integration_time = self.inputs.integration_time
self.ctx.offline_integration_time = self.inputs.offline_integration_time

# model settings
self.ctx.release_settings = self.inputs.release_settings
self.ctx.command = self.inputs.command
self.ctx.input_phy = self.inputs.input_phy
self.ctx.locations = self.inputs.locations

# others
self.ctx.outgrid = self.inputs.outgrid
self.ctx.species = self.inputs.species
self.ctx.land_use = self.inputs.land_use
if 'name' in self.inputs:
out_n = 'None'
if 'outgrid_nest' in self.inputs:
out_n = self.inputs.outgrid_nest.get_dict()
self.node.base.extras.set(
self.inputs.name, {
'command': self.ctx.command.get_dict(),
'input_phy': self.ctx.input_phy.get_dict(),
'release': self.ctx.release_settings.get_dict(),
'locations': self.ctx.locations.get_dict(),
'integration_time': self.ctx.integration_time.value,
'offline_integration_time':
self.ctx.offline_integration_time.value,
'model': self.inputs.model,
'model_offline': self.inputs.model_offline,
'outgrid': self.inputs.outgrid.get_dict(),
'outgrid_nest': out_n
})

def post_processing(self):
"""post processing"""
self.report("starting post-processsing")
builder = FlexpartPostCalculation.get_builder()
builder.code = self.inputs.post_processing_code
builder.input_dir = self.ctx.calculations[-1].outputs.remote_folder

if self.ctx.offline_integration_time > 0:

builder.input_dir = self.ctx.calculations[-2].outputs.remote_folder
builder.input_offline_dir = self.ctx.calculations[-1].outputs.remote_folder

builder.metadata.options = self.inputs.flexpartpost.metadata.options

running = self.submit(builder)
self.to_context(calculations=engine.append_(running))

def run_cosmo_simulation(self):
"""Run calculations for equation of state."""

self.report(f"starting flexpart cosmo {self.ctx.simulation_date}")

builder = FlexpartCosmoCalculation.get_builder()
builder.code = self.inputs.fcosmo_code

# update command file
new_dict = self.ctx.command.get_dict()
new_dict["simulation_date"] = self.ctx.simulation_date
new_dict["age_class"] = self.inputs.integration_time * 3600
new_dict.update(self.inputs.meteo_inputs)

# model settings
builder.model_settings = {
"release_settings": self.ctx.release_settings,
"locations": self.ctx.locations,
"command": orm.Dict(dict=new_dict),
"input_phy": self.ctx.input_phy,
}

builder.outgrid = orm.Dict(
list(self.ctx.outgrid.get_dict().values())[0])
if 'outgrid_nest' in self.inputs:
builder.outgrid_nest = orm.Dict(
list(self.inputs.outgrid_nest.get_dict().values())[0])
builder.species = self.ctx.species
builder.land_use = self.ctx.land_use
builder.meteo_path = self.inputs.meteo_path

# Walltime, memory, and resources.
builder.metadata.description = "Test workflow to submit a flexpart calculation"
builder.metadata.options = self.inputs.flexpartcosmo.metadata.options

# Ask the workflow to continue when the results are ready and store them in the context
running = self.submit(builder)
self.to_context(calculations=engine.append_(running))

def run_ifs_simulation(self):
"""Run calculations for equation of state."""
# Set up calculation.
self.report(f"running flexpart ifs for {self.ctx.simulation_date}")
builder = FlexpartIfsCalculation.get_builder()
builder.code = self.inputs.fifs_code

# changes in the command file
new_dict = self.ctx.command.get_dict()
new_dict["simulation_date"] = self.ctx.simulation_date

if self.ctx.offline_integration_time > 0:
new_dict["age_class"] = self.ctx.offline_integration_time * 3600
new_dict["dumped_particle_data"] = True

self.ctx.parent_calc_folder = self.ctx.calculations[
-1
].outputs.remote_folder
builder.parent_calc_folder = self.ctx.parent_calc_folder
self.report(f"starting from: {self.ctx.parent_calc_folder}")

builder.meteo_path = self.inputs.meteo_path_offline
new_dict.update(self.inputs.meteo_inputs_offline)

else:
new_dict["age_class"] = self.inputs.integration_time * 3600
builder.meteo_path = self.inputs.meteo_path
new_dict.update(self.inputs.meteo_inputs)

# model settings
builder.model_settings = {
"release_settings": self.ctx.release_settings,
"locations": self.ctx.locations,
"command": orm.Dict(dict=new_dict),
}

builder.outgrid = orm.Dict(
list(self.ctx.outgrid.get_dict().values())[0])
if 'outgrid_nest' in self.inputs:
builder.outgrid_nest = orm.Dict(
list(self.inputs.outgrid_nest.get_dict().values())[0])
builder.species = self.ctx.species
builder.land_use = self.inputs.land_use_ifs

# Walltime, memory, and resources.
builder.metadata.description = "Test workflow to submit a flexpart calculation"
builder.metadata.options = self.inputs.flexpartifs.metadata.options

# Ask the workflow to continue when the results are ready and store them in the context
running = self.submit(builder)
self.to_context(calculations=engine.append_(running))

def results(self):
"""Process results."""
for indx, calculation in enumerate(self.ctx.calculations):
self.out(f"calculation_{indx}_output_file", calculation.outputs.output_file)
Loading