diff --git a/.github/ci-config.yml b/.github/ci-config.yml index 73fc5eb..8c766b7 100644 --- a/.github/ci-config.yml +++ b/.github/ci-config.yml @@ -3,6 +3,7 @@ dependencies: | ecmwf/eckit ecmwf/fckit ecmwf/atlas + ecmwf/eccodes dependency_cmake_options: | ecmwf/atlas: "-DENABLE_FORTRAN=ON" dependency_branch: develop diff --git a/.github/ci-hpc-config.yml b/.github/ci-hpc-config.yml index 08e6baa..586856f 100644 --- a/.github/ci-hpc-config.yml +++ b/.github/ci-hpc-config.yml @@ -1,11 +1,41 @@ -build: - modules: - - ninja - dependencies: - - ecmwf/ecbuild@develop - - ecmwf/eckit@develop - - ecmwf/fckit@develop - - ecmwf/atlas@develop - dependency_cmake_options: - - ecmwf/atlas:-DENABLE_FORTRAN=ON - parallel: 64 +matrix: + - mpi_on + - mpi_off + +mpi_on: + build: + modules: + - ninja + - openmpi + modules_package: + - atlas:openmpi + - eckit:openmpi + dependencies: + - ecmwf/ecbuild@develop + - ecmwf/eckit@develop + - ecmwf/fckit@develop + - ecmwf/atlas@develop + - ecmwf/eccodes@develop + dependency_cmake_options: + - ecmwf/atlas:-DENABLE_FORTRAN=ON + parallel: 64 + ntasks: 16 + env: + - CTEST_PARALLEL_LEVEL=1 + - OMPI_MCA_rmaps_base_oversubscribe=1 + - ECCODES_SAMPLES_PATH=$ECCODES_DIR/share/eccodes/samples + - ECCODES_DEFINITION_PATH=$ECCODES_DIR/share/eccodes/definitions + +mpi_off: + build: + modules: + - ninja + dependencies: + - ecmwf/ecbuild@develop + - ecmwf/eckit@develop + - ecmwf/fckit@develop + - ecmwf/atlas@develop + - ecmwf/eccodes@develop + dependency_cmake_options: + - ecmwf/atlas:-DENABLE_FORTRAN=ON + parallel: 64 diff --git a/CMakeLists.txt b/CMakeLists.txt index d80ae06..a8629b3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,6 +20,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) ### dependencies and options ecbuild_find_package( NAME eckit VERSION 1.18 REQUIRED ) ecbuild_find_package( NAME atlas ) +ecbuild_find_package( NAME eccodes ) ecbuild_add_option( FEATURE BUILD_TOOLS DEFAULT ON @@ -47,6 +48,15 @@ ecbuild_info("FCKIT_FOUND ${fckit_FOUND}") ecbuild_info("FCKIT_LIBRARIES ${FCKIT_LIBRARIES}") ecbuild_info("FCKIT_INCLUDE_DIRS ${FCKIT_INCLUDE_DIRS}") +############## PRECISION +ecbuild_add_option( FEATURE NWP_EMULATOR_SINGLE_PRECISION + DESCRIPTION "Single precision Atlas fields" + DEFAULT OFF ) + +if( HAVE_NWP_EMULATOR_SINGLE_PRECISION ) + list(APPEND NWP_EMULATOR_DEFINITIONS WITH_NWP_EMULATOR_SINGLE_PRECISION ) +endif() + add_subdirectory( src ) add_subdirectory( tests ) add_subdirectory( examples ) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ab7dca1..b043758 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -8,3 +8,5 @@ # does it submit to any jurisdiction. add_subdirectory(plume) + +add_subdirectory(nwp_emulator) diff --git a/src/nwp_emulator/CMakeLists.txt b/src/nwp_emulator/CMakeLists.txt new file mode 100644 index 0000000..c2de63d --- /dev/null +++ b/src/nwp_emulator/CMakeLists.txt @@ -0,0 +1,43 @@ +# (C) Copyright 2025- ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation nor +# does it submit to any jurisdiction. +ecbuild_add_library( + TARGET plume_nwp_emulator + INSTALL_HEADERS LISTED + HEADER_DESTINATION + ${INSTALL_INCLUDE_DIR}/nwp_emulator + SOURCES + data_reader.h + grib_file_reader.h + config_reader.h + nwp_definitions.h + nwp_data_provider.h + nwp_data_provider.cc + grib_file_reader.cc + config_reader.cc + config_reader_funcs.cc + PUBLIC_INCLUDES + $ + $ + PRIVATE_INCLUDES + "${MPI_INCLUDE_DIRS}" + DEFINITIONS + ${NWP_EMULATOR_DEFINITIONS} + PUBLIC_LIBS + plume_plugin + plume_plugin_manager + eccodes + eckit +) + +ecbuild_add_executable( TARGET nwp_emulator_run.x + SOURCES + nwp_emulator.cc + LIBS + plume_nwp_emulator +) diff --git a/src/nwp_emulator/README.md b/src/nwp_emulator/README.md new file mode 100644 index 0000000..3e38685 --- /dev/null +++ b/src/nwp_emulator/README.md @@ -0,0 +1,106 @@ +# Plume Numerical Weather Prediction Model Emulator + +This tool is here to help you with your Plume plugins development. +You can use it to emulate model runs with Plume loading the plugins from your configuration. + +## Features + +- Dry run: The NWP emulator can run without Plume to develop new features, or to test your data generation to ensure it fits the purpose of your plugin test. +- Plume run: The NWP emulator runs Plume with the passed configuration. It can be used to test the Plume framweork, or test Plume plugins. +- GRIB as input: The NWP emulator can either take a directory containing GRIB files or a GRIB file as input source. It will attempt to serve the data in each file as if it were a single model step. The number of emulator steps run depend on the number of GRIB files in the source folder, or a user-defined maximum number of steps. This approach allows a developer to test a plugin with real data. *Note:* the emulator is not looking recursively into the source folder, any subdirectory will be ignored. +- Config as input: The NWP emulator can take a configuration file that sets its run parameters (number of steps, grid, number of levels, fields...). The data is synthetically generated by a set of available functions, see [below](#configuration-source) for more details. + +## Usage + +Once installed (check out [Plume install guide](../../README.md/#installation) for more info), the emulator tool can be used as follows: +```bash +mpirun -np 2 nwp_emulator_run [--grib-src= | --config-src=] [--plume-cfg=] [OPTION]... [--help] +``` +To make a dry run, omit the `--plume-cfg` flag. + +## Quick start +### GRIB source + +The GRIB files are served alphabetically so your source folder should be organised to ensure the data you expect is served in the order you expect. Validation is run on all the source files to make sure they all contain the same fields and levels on the same grid. Check out the [MARS documentation](https://confluence.ecmwf.int/display/UDOC/MARS+user+documentation) if you need a hand to retrieve historical data. + +### Configuration source + +Currently, the emulator supports five functions for data generation: +- Vortex Rollup (provided by the [Atlas library](https://confluence.ecmwf.int/display/ATLAS)) +- Random (uniform or normal distributions) +- Step +- Cardinal Sine +- Gaussian + +All functions accept a focus area which can be translated between each time step by a user-defined factor. +Validation is run on the configuration file to ensure options are valid, but it lies with the user to make sure the selected options achieve their purpose. + +Here is an example of YAML configuration file you can use to generate model data, which demonstrate the options that each generation function accept: + +```yaml +emulator: + n_steps: 5 + grid_identifier: "N80" + vertical_levels: 5 + # can provide area here if all population methods should use a non global area by default + # range for lon is assumed [-180,180] and lat [-90,90] + fields: + 100u: # use the field short name + levtype: "sfc" + apply: + vortex_rollup: + area: [71.5, -25, 34.5, 45] + time_variation: 1.1 + 100v: + levtype: "sfc" + apply: + vortex_rollup: + time_variation: 1.1 + u: # no specified levtype means not surface + apply: # functions applied in the given order + levels: + "1": + random: # no specified area means global + distribution: "uniform" + step: + area: [71.5, -25, 34.5, 45] # rectangle represented by NW and SE (lat,lon) coordinates + value: 10.0 + variation: 1.0 + translation: [1.0, 1.0] # degrees of translation of the area per time step (lat, lon) + "2": + sinc: + modes: 3 # number of peaks or sinks in the generation area + min: -1.0 + max: 10.0 + spread: 10.0 + sink: false + "3:": + gaussian: + modes: 2 + min: 0.0 + max: 1.0 + max_stddev: 3.0 + sink: true + v: "u" + t: + apply: + levels: # this key should not be used for sfc + "2": # when using levels, they should range from 1- + random: + area: [71.5, -25, 34.5, 45] + distribution: "uniform" + min: 10.0 + max: 35.0 + "1,3": + step: + probability: 0.1 + value: 45.0 + "4:": + random: + distribution: "normal" + mean: 20.0 + stddev: 20.0 +``` + +## License +© 2025 ECMWF. All rights reserved. \ No newline at end of file diff --git a/src/nwp_emulator/config_reader.cc b/src/nwp_emulator/config_reader.cc new file mode 100644 index 0000000..c32f57f --- /dev/null +++ b/src/nwp_emulator/config_reader.cc @@ -0,0 +1,446 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#include +#include +#include + +#include "eckit/config/LocalConfiguration.h" +#include "eckit/config/YAMLConfiguration.h" +#include "eckit/exception/Exceptions.h" + +#include "config_reader.h" +#include "nwp_definitions.h" + +namespace nwp_emulator { +ConfigReader::ConfigReader(const eckit::PathName& inputPath, size_t rank, size_t root, int stepCountLimit) : + rank_(rank), root_(root), DataReader(stepCountLimit) { + if (inputPath.extension() != ".yml" && inputPath.extension() != ".yaml") { + eckit::Log::error() << "Source '" << inputPath << "'should be a YAML file, exit..." << std::endl; + eckit::mpi::comm().abort(1); + } + eckit::YAMLConfiguration config(inputPath); + config.get("emulator", readerConfig_); + gridName_ = readerConfig_.getString("grid_identifier"); + modelLevels_ = 1; + try { + stepCountLimit_ = readerConfig_.getInt("n_steps"); + } + catch (eckit::Exception&) { + eckit::Log::warning() << "No steps found in config, will use default (" << stepCountLimit_ << ")..." + << std::endl; + } + try { + modelLevels_ = readerConfig_.getInt("vertical_levels"); + } + catch (eckit::Exception&) { + eckit::Log::warning() << "No vertical levels number found in config, will use 1 as default..." << std::endl; + } + + if (!readerConfig_.has("fields") || !readerConfig_.isSubConfiguration("fields")) { + eckit::Log::error() << "No fields configured, exit..." << std::endl; + eckit::mpi::comm().abort(1); + } + eckit::LocalConfiguration fields = readerConfig_.getSubConfiguration("fields"); + for (const std::string& fieldName : fields.keys()) { + eckit::LocalConfiguration fieldConfig; + if (fields.isSubConfiguration(fieldName)) { + fields.get(fieldName, fieldConfig); + } + else if (fields.isString(fieldName) && fields.isSubConfiguration(fields.getString(fieldName))) { + // The field uses the same configuration as the field specified by its key (short name) + fields.get(fields.getString(fieldName), fieldConfig); + } + else { + eckit::Log::error() << "Field '" << fieldName << "' must be either a subconfiguration or a valid " + << "field with subconfiguration, exit..." << std::endl; + eckit::mpi::comm().abort(1); + } + if (!fieldConfig.has("levtype")) { + for (int level = 0; level < modelLevels_; level++) { + params_.push_back(fieldName + ",ml," + std::to_string(level + 1)); + } + } + else { + params_.push_back(fieldName + ",sfc,0"); + } + } + if (rank_ == root_ && !validateConfig()) { // validation only done by root + eckit::mpi::comm().abort(1); + } +} + +void ConfigReader::setReaderArea(const atlas::Field& lonlat) { + readerArea_ = lonlat.clone(); +} + +bool ConfigReader::done() { + return step_ >= stepCountLimit_; +} + +bool ConfigReader::nextMessage(std::string& shortName, std::string& levtype, std::string& level, + std::vector& data) { + if (!readerArea_) { + eckit::Log::error() << "The reader " << rank_ << " area is not set, please set before requesting data" + << ", exit..." << std::endl; + eckit::mpi::comm().abort(1); + } + if (index_ == params_.size() || done()) { + index_ = 0; + ++step_; + return false; + } + std::stringstream ss(params_[index_]); + std::getline(ss, shortName, ','); + std::getline(ss, levtype, ','); + std::getline(ss, level, ','); + + data.resize(0); + data.resize(readerArea_.shape(0)); + + std::string fieldConfigStr = "fields."; + eckit::LocalConfiguration fieldConfig; + if (readerConfig_.isString("fields." + shortName)) { + fieldConfigStr += readerConfig_.getString("fields." + shortName) + ".apply"; + } + else { + fieldConfigStr += shortName + ".apply"; + } + if (readerConfig_.has(fieldConfigStr + ".levels")) { + std::string levelKey = getLevelConfigKey(level, fieldConfigStr + ".levels"); + fieldConfigStr += ".levels." + levelKey; + } + readerConfig_.get(fieldConfigStr, fieldConfig); + for (const std::string& func : fieldConfig.keys()) { + // no check for key existence as func should be validated at this point + functionTable[func](fieldConfig.getSubConfiguration(func), data); + } + + ++index_; + return true; +} + +std::string ConfigReader::getLevelConfigKey(const std::string& level, const std::string& levelConfigStr) { + // level keys can be as follow: "1", "1,3,5", "1:3", "3:", ":4" + int nlevel = std::stoi(level); + eckit::LocalConfiguration levelConfig; + readerConfig_.get(levelConfigStr, levelConfig); + for (const std::string& key : levelConfig.keys()) { + if (key.find(',') != std::string::npos) { // key is a sequence of levels + std::stringstream ss(key); + std::string keyLevel; + while (std::getline(ss, keyLevel, ',')) { + if (std::stoi(keyLevel) == nlevel) { + return key; + } + } + } + else if (key.find(':') != std::string::npos) { // key is a range + size_t colonPos = key.find(':'); + int lowerBound = 1; // level 0 is reserved for surface + if (colonPos != 0) { // range is not open at the beginning + lowerBound = std::stoi(key.substr(0, colonPos)); + } + if (nlevel >= lowerBound) { + if (colonPos == key.length() - 1) { + return key; // key is an open range at the end + } + int upperBound = std::stoi(key.substr(colonPos + 1)); + if (nlevel <= upperBound) { + return key; // key is a closed range + } + } + } + else if (nlevel == std::stoi(key)) { // key represents a single level + return key; + } + } + return ""; // should never happen if the config has been properly validated +} + +//---------------------------------------------------------------------------------------------------------------------- +// Validation methods & utils +//---------------------------------------------------------------------------------------------------------------------- +bool ConfigReader::validateConfig() { + if (readerConfig_.has("area") && !validateArea(readerConfig_.getDoubleVector("area"))) { + eckit::Log::error() << "Provided focus area must respect the format [lat, lon, lat, lon] " + << " representing its Northwest and Southeast corners, exit..." << std::endl; + return false; + } + + eckit::LocalConfiguration fields = readerConfig_.getSubConfiguration("fields"); + for (const std::string& fieldName : fields.keys()) { + if (!fields.isSubConfiguration(fieldName)) { + continue; // only validate actual subconfigurations + } + eckit::LocalConfiguration fieldConfig = fields.getSubConfiguration(fieldName); + // 1. General checks + if (!fieldConfig.has("apply") || !fieldConfig.isSubConfiguration("apply")) { + eckit::Log::error() << "Field '" << fieldName << "' has no 'apply' configuration, exit..." << std::endl; + return false; + } + if (fieldConfig.has("levtype") && fieldConfig.has("apply.levels")) { + eckit::Log::error() << "Field '" << fieldName + << " is a surface field, it cannot have a 'levels' key, exit..." << std::endl; + return false; + } + if (fieldConfig.has("apply.levels") && modelLevels_ == 1) { + eckit::Log::error() << "Field '" << fieldName << " cannot use the levels key because the emulator " + << "has a single level, exit..." << std::endl; + return false; + } + std::vector functionConfigs; + if (!fieldConfig.has("apply.levels")) { + functionConfigs.push_back(fieldConfig.getSubConfiguration("apply")); + } + else { + if (!fieldConfig.isSubConfiguration("apply.levels")) { + eckit::Log::error() << "Field '" << fieldName << "' does not have a configuration for levels, exit..." + << std::endl; + return false; + } + if (!validateLevelKeys(fieldConfig.getSubConfiguration("apply.levels"))) { + eckit::Log::error() << "Field '" << fieldName << "' has invalid level keys." + << " Make sure to use single level, sequence or range, " + << "and that each level is covered by a single key, exit..." << std::endl; + return false; + } + for (const std::string& level : fieldConfig.getSubConfiguration("apply.levels").keys()) { + functionConfigs.push_back(fieldConfig.getSubConfiguration("apply.levels." + level)); + } + } + + // 2. Function specific checks + for (const auto& functionConfig : functionConfigs) { + for (const std::string& functionName : functionConfig.keys()) { + // Cross functions + if (functionConfig.has(functionName + ".area") && + !validateArea(functionConfig.getDoubleVector(functionName + ".area"))) { + eckit::Log::error() << "Area provided for '" << functionName << "' for field '" << fieldName + << "' is invalid, exit..." << std::endl; + return false; + } + if (functionConfig.has(functionName + ".translation")) { + if (!functionConfig.isFloatingPointList(functionName + ".translation") || + functionConfig.getDoubleVector(functionName + ".translation").size() != 2) { + eckit::Log::error() << "The 'translation' key in function '" << functionName << "' of field '" + << fieldName << "' does not respect the format [lat_trans, lon_trans], " + << "exit..." << std::endl; + return false; + } + } + // Function specific + if (functionTable.find(functionName) == functionTable.end() || + validationTable.find(functionName) == validationTable.end()) { + eckit::Log::error() << "Function name '" << functionName << "' is invalid, valid names are " + << "[vortex_rollup, random, step, sinc, gaussian], exit..." << std::endl; + return false; + } + if (!validationTable[functionName](functionConfig.getSubConfiguration(functionName))) { + eckit::Log::error() << "Options provided for '" << functionName << "' for field '" << fieldName + << "' are invalid, exit..." << std::endl; + return false; + } + } + } + } + eckit::Log::info() << "The emulator has accepted the provided configuration, proceeding with run..." << std::endl; + return true; +} + +bool ConfigReader::validateArea(const std::vector& area) { + if (area.size() != 4) { + return false; + } + // The lats and lons are in the expected ranges of [-90,90] and [-180,180] + for (int lon = 1; lon < 4; lon += 2) { + if (area[lon] < -180.0 || area[lon] > 180.0) { + return false; + } + } + for (int lat = 0; lat < 4; lat += 2) { + if (area[lat] < -90.0 || area[lat] > 90.0) { + return false; + } + } + // The points are provided from North to South + if (area[0] < area[2]) { + return false; + } + return true; +} + +bool ConfigReader::validateLevelKeys(const eckit::LocalConfiguration& config) { + std::unordered_set levelsCovered; // Keep track of levels represented by the level keys + std::regex singleLevelRegex("^(\\d+)$"); // Keys: "1" "12" + std::regex levelSequenceRegex("^\\d+(?:,\\d+)+$"); // Keys: "1,2,10" + std::regex levelRangeRegex("^(\\d+):(\\d+)$|^(\\d+):$|^:(\\d+)$"); // Keys: ":5" "2:5" "5:" + for (const std::string& level : config.keys()) { + std::smatch matches; + if (std::regex_search(level, matches, singleLevelRegex)) { + int nlevel = std::stoi(matches.str(0)); + if (nlevel > modelLevels_ || nlevel <= 0 || !levelsCovered.insert(nlevel).second) { + return false; + } + } + else if (std::regex_search(level, matches, levelSequenceRegex)) { + std::stringstream ss(level); + std::string number; + while (std::getline(ss, number, ',')) { + int nlevel = std::stoi(number); + if (nlevel > modelLevels_ || nlevel <= 0 || !levelsCovered.insert(nlevel).second) { + return false; + } + } + } + else if (std::regex_search(level, matches, levelRangeRegex)) { + if (!matches.str(1).empty()) { // it is a closed range + int lowerBound = std::stoi(matches.str(1)); + int upperBound = std::stoi(matches.str(2)); + if (lowerBound >= upperBound || lowerBound <= 0 || upperBound > modelLevels_) { + return false; + } + for (size_t i = lowerBound; i <= upperBound; i++) { + if (!levelsCovered.insert(i).second) { + return false; + } + } + } + else { // it is an open range + int bound = matches.str(3).empty() ? std::stoi(matches.str(4)) : std::stoi(matches.str(3)); + if (bound > modelLevels_ || bound <= 0) { + return false; + } + if (matches.str(3).empty()) { // it is a begin open range + for (int i = 1; i <= bound; i++) { + if (!levelsCovered.insert(i).second) { + return false; + } + } + } + else { + for (int i = bound; i <= modelLevels_; i++) { + if (!levelsCovered.insert(i).second) { + return false; + } + } + } + } + } + else { // the level key should match at least one of the three regular expressions + return false; + } + } + return true; +} + +bool ConfigReader::validateVortexRollupOpts(const eckit::LocalConfiguration& options) { + if (!options.isFloatingPoint("time_variation")) { + return false; + } + return true; +} + +bool ConfigReader::validateRandomOpts(const eckit::LocalConfiguration& options) { + if (options.empty()) { // Random has only optional options + return true; + } + if (options.has("distribution") && !options.isString("distribution")) { + return false; + } + auto distribution = options.getString("distribution", "uniform"); + if (distribution == "uniform") { + if ((options.has("min") && !options.isFloatingPoint("min")) || + (options.has("max") && !options.isFloatingPoint("max"))) { + return false; + } + auto min = options.getDouble("min", 0.0); + auto max = options.getDouble("max", 1.0); + if (max < min) { // min should be inferior to max + return false; + } + } + else if (distribution == "bernoulli") { + if ((options.has("probability") && !options.isFloatingPoint("probability")) || + (options.has("value") && !options.isFloatingPoint("value"))) { + return false; + } + auto probability = options.getDouble("probability", 0.5); + if (probability < 1e-6 || probability > 1.0) { // probability must belong to [0,1] + return false; + } + } + else if (distribution == "normal") { + if ((options.has("mean") && !options.isFloatingPoint("mean")) || + (options.has("stddev") && !options.isFloatingPoint("stddev"))) { + return false; + } + auto stddev = options.getDouble("stddev", 1.0); + if (stddev < 1e-6) { // standard deviation must be positive + return false; + } + } + else { + eckit::Log::error() << "Distribution '" << distribution << "' is not supported, pick between 'bernoulli', " + << "'uniform', or 'normal', exit..." << std::endl; + return false; + } + return true; +} + +bool ConfigReader::validateStepOpts(const eckit::LocalConfiguration& options) { + if (!options.isFloatingPoint("value")) { // checks both for existence and type + return false; + } + if (options.has("probability") && !options.isFloatingPoint("probability")) { + return false; + } + if (options.has("probability") && + (options.getDouble("probability") < 1e-6 || options.getDouble("probability") > 1.0)) { + return false; // probability must belong to [0,1] + } + if (options.has("variation") && !options.isFloatingPoint("variation")) { + return false; + } + return true; +} + +bool ConfigReader::validateCardinalSineOpts(const eckit::LocalConfiguration& options) { + if (!options.isIntegral("modes") || options.getInt("modes") < 1) { + return false; + } + if (options.has("sink") && !options.isBoolean("sink")) { + return false; + } + if ((options.has("spread") && !options.isFloatingPoint("spread")) || + (options.has("min") && !options.isFloatingPoint("min")) || + (options.has("max") && !options.isFloatingPoint("max"))) { + return false; + } + auto spread = options.getDouble("spread", 10.0); + auto min = options.getDouble("min", 0.0); + auto max = options.getDouble("max", 1.0); + if (max < min || spread < 1e-6) { + return false; + } + return true; +} + +bool ConfigReader::validateGaussianOpts(const eckit::LocalConfiguration& options) { + if ((options.has("max_stddev") && !options.isFloatingPoint("max_stddev"))) { + return false; + } + auto stddev = options.getDouble("max_stddev", 1.0); + eckit::LocalConfiguration tmpConfig(options); + tmpConfig.set("spread", stddev); + return validateCardinalSineOpts(tmpConfig); +} +//---------------------------------------------------------------------------------------------------------------------- +} // namespace nwp_emulator \ No newline at end of file diff --git a/src/nwp_emulator/config_reader.h b/src/nwp_emulator/config_reader.h new file mode 100644 index 0000000..c4877dc --- /dev/null +++ b/src/nwp_emulator/config_reader.h @@ -0,0 +1,245 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#pragma once + +#include +#include +#include +#include + +#include "atlas/field/Field.h" +#include "atlas/util/Point.h" +#include "eckit/config/LocalConfiguration.h" +#include "eckit/filesystem/PathName.h" +#include "eckit/log/Log.h" +#include "eckit/mpi/Comm.h" + +#include "data_reader.h" + +namespace nwp_emulator { + +/** + * @brief A structure that represents a rectangular area on the world map. + * + * This struct holds the coordinates of 2 points lon,lat points. It assumes the values passed describe a + * valid rectangle. + */ +struct FocusArea { + double west, east, south, north; + + /** + * @brief Constructs a focus area. + * + * The list initialisation is incompatible with the use of unique pointers, hence the need for a constructor. + * + * @param west The west longitude of the area box. + * @param east The east longitude of the area box. + * @param south The south latitude of the area box. + * @param north The north latitude of the area box. + */ + FocusArea(double west, double east, double south, double north) : + west(west), east(east), south(south), north(north) {} + + /** + * @brief Determines whether a certain point belongs to the rectangle described by the area. + * + * This method supports an area containing the longitude 0/360 so it is possible that the west longitude + * is not smaller than the east longitude. + * + * @param p The atlas point. It is assumed that latitudes range between [-90,90], and longitudes [0,360]. + */ + bool contains(const atlas::PointLonLat& p) { + if (west < east) { + return (p.lat() >= south && p.lat() <= north && p.lon() >= west && p.lon() <= east); + } + return (p.lat() >= south && p.lat() <= north && ((p.lon() >= west && p.lon() <= 360.0) || (p.lon() <= east))); + } +}; + +/** + * @class ConfigReader + * @brief Reads a configuration and generates data based on it that it serves as Atlas fields. + * + * Supports parallel environments. Each process is responsible for generating its own share of the data. + * This class provides inputs for a NWP emulator generated from a set of functions that can be broaden as needed. + * It currently supports five generation functions: vortex rollup, random (uniform or normal), step, cardinal sine, + * and gaussian. The configuration file is used at construction to determine the emulator parameters : + * number of emulator steps, grid, vertical levels, parameters offered by the model, and how each field should + * be populated. An example of configuration file can be found in the README. + */ +class ConfigReader final : public DataReader { +private: + eckit::LocalConfiguration readerConfig_; + atlas::Field readerArea_; + int modelLevels_; + + size_t rank_; + size_t root_; + + /** + * @brief Finds the subconfiguration key that matches a particular level. + * + * This method is used by the reader to resolve in the configuration where the instructions are to populate + * a given level for a given field. + * + * @param level The model level to find the key for. + * @param levelConfigStr The path in the configuration where the possible level keys are. + * + * @return The configuration key covering to the level. + */ + std::string getLevelConfigKey(const std::string& level, const std::string& levelConfigStr); + + /** + * @brief Randomly sample (lon,lat) points based on provided options. + * + * Only the root reader does the sampling, and broadcasts the result so every process uses a consistent + * set of centres for a given time step. Options contain a number of centres, and an area. + * + * @param options The configuration containing information for the sampling. + * @param[out] centres The vector of sampled lon,lat points. + */ + void sampleNCentres(const eckit::LocalConfiguration& options, std::vector& centres); + + /** + * @brief Sets a focus area on the map based on the provided options. + * + * Options can contain an area which is assumed to be formatted as [NW_lat, NW_lon, SE_lat, SE_lon], + * and a translation vector if the area should be displaced at every time step. The struct returned has + * a convenience method to determine belonging. + * + * @param options The configuration containing information about the area. + * + * @return The rectangular area of the map or a null pointer if the whole globe should be considered. + */ + std::unique_ptr setFocusArea(const eckit::LocalConfiguration& options); + + /// Fills the values vector with a vortex rollup parameterised by the given options. + void applyVortexRollup(const eckit::LocalConfiguration& options, std::vector& values); + + /// Fills the values vector with random values parameterised by the given options. + void applyRandom(const eckit::LocalConfiguration& options, std::vector& values); + + /// Fills the values vector with a step value parameterised by the given options. + void applyStep(const eckit::LocalConfiguration& options, std::vector& values); + + /// Fills the values vector with a cardinal sine parameterised by the given options. + void applyCardinalSine(const eckit::LocalConfiguration& options, std::vector& values); + + /// Fills the values vector with a gaussian parameterised by the given options. + void applyGaussian(const eckit::LocalConfiguration& options, std::vector& values); + + /** + * @brief Validates the passed emulator configuration. + * + * Runs a deeper validation than the one run directly in the constructor. Checks the the fields are configured + * properly. Some checks include (but are not limited too, see the code for detailed comments): + * - 'apply' has at least one key and all keys are valid functions. + * - All levels do not require to have an 'apply' key, but at least one should (other levels are 0s). + * - All level keys are valid (0 is invalid, ml keys should cover 1- at most). + * - The 'levels' key usage is correct (incompatible with surface fields or emulators with one model level). + * - Each function has the options it needs & they are valid (value/types). + * - Distributions asked are all supported. + * - Areas are well defined (NW,SE and lat [-90,90] and lon [-180,180]) + * - ... + * + * @return True if the configuration can be used by the reader to setup and generate data for the emulator. + * False otherwise. + */ + bool validateConfig(); + + /// Validates that areas configured respect the [NW_lat, NW_lon, SE_lat, SE_lon] format and ranges. + bool validateArea(const std::vector& area); + + /// Validates that level keys are either single levels ("1"), sequences ("1,2,5"), or ranges (":3", "2:5", "2:") + bool validateLevelKeys(const eckit::LocalConfiguration& config); + + /// Validates options provided for 'vortex_rollup' apply function. + bool validateVortexRollupOpts(const eckit::LocalConfiguration& options); + + /// Validates options provided for 'random' apply function. + bool validateRandomOpts(const eckit::LocalConfiguration& options); + + /// Validates options provided for 'step' apply function. + bool validateStepOpts(const eckit::LocalConfiguration& options); + + /// Validates options provided for 'sinc' apply function. + bool validateCardinalSineOpts(const eckit::LocalConfiguration& options); + + /// Validates options provided for 'gaussian' apply function. + bool validateGaussianOpts(const eckit::LocalConfiguration& options); + + /// Lookup tables for functions and validation functions + std::unordered_map&)>> + functionTable = { + {"step", std::bind(&ConfigReader::applyStep, this, std::placeholders::_1, std::placeholders::_2)}, + {"random", std::bind(&ConfigReader::applyRandom, this, std::placeholders::_1, std::placeholders::_2)}, + {"vortex_rollup", + std::bind(&ConfigReader::applyVortexRollup, this, std::placeholders::_1, std::placeholders::_2)}, + {"sinc", std::bind(&ConfigReader::applyCardinalSine, this, std::placeholders::_1, std::placeholders::_2)}, + {"gaussian", std::bind(&ConfigReader::applyGaussian, this, std::placeholders::_1, std::placeholders::_2)}}; + + std::unordered_map> validationTable = { + {"step", std::bind(&ConfigReader::validateStepOpts, this, std::placeholders::_1)}, + {"random", std::bind(&ConfigReader::validateRandomOpts, this, std::placeholders::_1)}, + {"vortex_rollup", std::bind(&ConfigReader::validateVortexRollupOpts, this, std::placeholders::_1)}, + {"sinc", std::bind(&ConfigReader::validateCardinalSineOpts, this, std::placeholders::_1)}, + {"gaussian", std::bind(&ConfigReader::validateGaussianOpts, this, std::placeholders::_1)}}; + +public: + /** + * @brief Constructs a ConfigReader object and sets up emulator params. + * + * Reads or derives emulator options from the configuration, which should be a YAML file. + * All readers follow a parameter representation as follows: ",,". + * + * @param inputPath The path to the emulator configuration file. + * @param rank The rank of the current process. + * @param root The rank of the root process. + * @param stepCountLimit The number of maximum steps to run in the emulator. Defaults to 100. + * + * @note The constructor aborts the execution in case the configuration is invalid. Relevant information + * to correct the configuration should be output. + */ + ConfigReader(const eckit::PathName& inputPath, size_t rank, size_t root, int stepCountLimit = 100); + + /** + * @brief Generates data for the next field and level based on its configuration. + * + * This method generates data for the next parameter in the `params_` vector and increases the index. + * Once all the parameters have had data generated, it increases the model step and resets the index. + * Depending on the generation function, limited communication may be required between the processes, + * but they are all responsible for generating data for their own partition. + * + * @param[out] shortName The name of the parameter that data is generated for for the caller use. + * @param[out] levtype The levtype of the parameter for the caller use. + * @param[out] level The level of the parameter for the caller use. + * @param[out] data The vector that contains the raw values for the emulator field. + * + * @return true if the generation is successful, false otherwise or when the model step is complete. + */ + bool nextMessage(std::string& shortName, std::string& levtype, std::string& level, + std::vector& data) override; + + /// Returns true when the reader has generated data for all the steps. + bool done() override; + + /** + * @brief Stores the lon,lat field for which the current partition is responsible. (Setter) + * + * This field is used by the generation functions to generate the expected number of data points, + * and for direct use if functions image depend on the longitude and latitude. + * + * @param lonlat The lonlat field to clone (partitions keep the same area throughout the execution). + */ + void setReaderArea(const atlas::Field& lonlat) override; +}; +} // namespace nwp_emulator \ No newline at end of file diff --git a/src/nwp_emulator/config_reader_funcs.cc b/src/nwp_emulator/config_reader_funcs.cc new file mode 100644 index 0000000..1560b0f --- /dev/null +++ b/src/nwp_emulator/config_reader_funcs.cc @@ -0,0 +1,312 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#include +#include +#include + +#include "atlas/array/ArrayView.h" +#include "atlas/util/CoordinateEnums.h" +#include "atlas/util/function/VortexRollup.h" + +#include "config_reader.h" +#include "nwp_definitions.h" + +/** + * @brief Computes the cardinal sine of a 2D lon,lat point. + * + * @param x The point repreented by its longitude and latitude. + * @param centre The point where the peak of the function is located. + * @param spread The spread of the function blob (the bigger spread the tighter the blob). + * + * @return The sinc value at the given point as double or float. + */ +FIELD_TYPE_REAL sinc2d(atlas::PointLonLat x, atlas::PointLonLat centre, double spread) { + double sphericalDist = std::acos(std::sin(x.lat() * M_PI / 180.0) * std::sin(centre.lat() * M_PI / 180.0) + + std::cos(x.lat() * M_PI / 180.0) * std::cos(centre.lat() * M_PI / 180.0) * + std::cos((x.lon() - centre.lon()) * M_PI / 180.0)); + if (sphericalDist <= 1e-6) { // sinc(0) = 1 + return 1.0; + } + return std::sin(M_PI * sphericalDist * spread) / (M_PI * sphericalDist * spread); +} + +/** + * @brief Computes the gaussian of a 2D lon,lat point. + * + * @param x The point repreented by its longitude and latitude. + * @param mu The point where the peak of the function is located. + * @param sigma The spread of the function blob. + * + * @return The gaussian value at the given point as double or float. + */ +FIELD_TYPE_REAL gaussian2d(atlas::PointLonLat x, atlas::PointLonLat mu, std::pair sigma) { + return std::exp(-(0.5 * std::pow((x.lat() - mu.lat()) / sigma.second, 2) + + 0.5 * std::pow((x.lon() - mu.lon()) / sigma.first, 2))); +} + +namespace nwp_emulator { +//---------------------------------------------------------------------------------------------------------------------- +// Data generation functions & utils +//---------------------------------------------------------------------------------------------------------------------- +void ConfigReader::sampleNCentres(const eckit::LocalConfiguration& options, std::vector& centres) { + auto nmodes = options.getInt("modes"); + std::vector tmp(nmodes * 2); // Use a temporary centres storage as PointLonLat can't be broadcast + if (rank_ == root_) { // Only the root does the random sampling so it's consistent across partitions + std::random_device rd; + std::mt19937 generator(rd()); + double minLat = -90.0, maxLat = 90.0, minLon = 0.0, maxLon = 360.0; + std::vector areaCoords; + if (options.has("area")) { + areaCoords = options.getDoubleVector("area"); + } + else if (readerConfig_.has("area")) { + areaCoords = readerConfig_.getDoubleVector("area"); + } + if (!areaCoords.empty()) { + // Assumes the user defined area does not spread over the wraping 0/360 longitude + // Would need two lon ranges to support spreading over lon 0 + minLon = areaCoords[1] + 180.0; // west + maxLon = areaCoords[3] + 180.0; // east + minLat = areaCoords[2]; // south + maxLat = areaCoords[0]; // north + } + std::uniform_real_distribution distLat(minLat, maxLat); + std::uniform_real_distribution distLon(minLon, maxLon); + for (int i = 0; i < nmodes; i++) { + tmp[2 * i] = distLon(generator); + tmp[2 * i + 1] = distLat(generator); + } + } + eckit::mpi::comm().broadcast(tmp, root_); // Broadcast the sampled centres + for (int i = 0; i < nmodes; i++) { + centres.push_back(atlas::PointLonLat{tmp[2 * i], tmp[2 * i + 1]}); + } +} + +std::unique_ptr ConfigReader::setFocusArea(const eckit::LocalConfiguration& options) { + std::vector translation{0.0, 0.0}; + if (options.has("translation")) { + translation = options.getDoubleVector("translation"); + translation[0] *= step_; + translation[1] *= step_; + } + std::vector areaCoords; + std::unique_ptr focusArea = nullptr; + if (options.has("area")) { + areaCoords = options.getDoubleVector("area"); + } + else if (readerConfig_.has("area")) { + areaCoords = readerConfig_.getDoubleVector("area"); + } + if (!areaCoords.empty()) { + // clamp latitudes to avoid misinterpreting the area when it touches a pole + double north = std::max(-90.0, std::min(90.0, areaCoords[0] + translation[0])); + double south = std::max(-90.0, std::min(90.0, areaCoords[2] + translation[0])); + // normalise longitudes between [0, 360] + double west = std::fmod(areaCoords[1] + translation[1] + 180.0, 360.0); + double east = std::fmod(areaCoords[3] + translation[1] + 180.0, 360.0); + if (west < 1e-6) { + west += 360.0; + } + if (east < 1e-6) { + east += 360.0; + } + focusArea = std::make_unique(west, east, south, north); + } + return focusArea; +} + +void ConfigReader::applyVortexRollup(const eckit::LocalConfiguration& options, std::vector& values) { + // options are time_variation + area (optional) + auto timeVariation = options.getDouble("time_variation") * step_; + auto lonlat = atlas::array::make_view(readerArea_); + std::unique_ptr focusArea = setFocusArea(options); + for (size_t i_pt = 0; i_pt < values.size(); i_pt++) { + atlas::PointLonLat p{lonlat(i_pt, atlas::LON), lonlat(i_pt, atlas::LAT)}; + if (focusArea && !focusArea->contains(p)) { + continue; + } + // populate only the values if (lon,lat) belongs to the focus area or there is no mask + values[i_pt] = atlas::util::function::vortex_rollup(p.lon(), p.lat(), timeVariation); + } +} + +void ConfigReader::applyRandom(const eckit::LocalConfiguration& options, std::vector& values) { + // options are all optional: area, distribution, distribution specific options + auto lonlat = atlas::array::make_view(readerArea_); + std::unique_ptr focusArea = options.getBool("use_area", true) ? setFocusArea(options) : nullptr; + + std::random_device rd; + std::mt19937 generator(rd()); + auto distribution_type = options.getString("distribution", "uniform"); + std::vector randomValues(values.size()); + // 1. Generate random values for the whole field covered by the partition + if (distribution_type == "bernoulli") { // This one is used for random step + auto probability = options.getDouble("probability", 0.5); + FIELD_TYPE_REAL successValue = options.getDouble("value", 1.0); + std::bernoulli_distribution distribution(probability); + std::generate(randomValues.begin(), randomValues.end(), + [&]() { return distribution(generator) ? successValue : 0.0; }); + } + else if (distribution_type == "uniform") { + auto min = options.getDouble("min", 0.0); + auto max = options.getDouble("max", 1.0); + std::uniform_real_distribution distribution(min, max); + std::generate(randomValues.begin(), randomValues.end(), + [&]() { return static_cast(distribution(generator)); }); + } + else if (distribution_type == "normal") { + auto mean = options.getDouble("mean", 0.5); + auto stddev = options.getDouble("stddev", 1.0); + std::normal_distribution distribution(mean, stddev); + std::generate(randomValues.begin(), randomValues.end(), + [&]() { return static_cast(distribution(generator)); }); + } + + // Apply randomly generated values only on the desired area + for (size_t i_pt = 0; i_pt < values.size(); i_pt++) { + atlas::PointLonLat p{lonlat(i_pt, atlas::LON), lonlat(i_pt, atlas::LAT)}; + if (focusArea && !focusArea->contains(p)) { + continue; + } + values[i_pt] = randomValues[i_pt]; + } +} + +void ConfigReader::applyStep(const eckit::LocalConfiguration& options, std::vector& values) { + // options are value, area (optional), if area then translation (optional) + // probability (optional), if set only a proportion of points in the area will receive the step + FIELD_TYPE_REAL stepValue = static_cast(options.getDouble("value")); + FIELD_TYPE_REAL variation = static_cast(options.getDouble("variation", 0.0)); + auto lonlat = atlas::array::make_view(readerArea_); + std::unique_ptr focusArea = setFocusArea(options); + + std::vector randomValues(values.size()); + if (options.has("probability")) { + // use a separate field of values to make sure no undesirable changes are applied to the final field + eckit::LocalConfiguration randomOpts(options); + randomOpts.set("distribution", "bernoulli"); + randomOpts.set("use_area", false); + applyRandom(randomOpts, randomValues); + } + + for (size_t i_pt = 0; i_pt < values.size(); i_pt++) { + atlas::PointLonLat p{lonlat(i_pt, atlas::LON), lonlat(i_pt, atlas::LAT)}; + if (focusArea && !focusArea->contains(p)) { + continue; + } + // Point should be assigned a value + if (options.has("probability")) { + values[i_pt] = std::abs(randomValues[i_pt]) > 1e-6 ? randomValues[i_pt] + variation * step_ : 0.0; + } + else { + values[i_pt] = stepValue + variation * step_; + } + } +} + +void ConfigReader::applyCardinalSine(const eckit::LocalConfiguration& options, std::vector& values) { + // options are modes, min value (defaults to 0), and max value (defaults to 1) + // sink (true if the blobs should not be peaks), spread is how big the sinc blobs can be + auto lonlat = atlas::array::make_view(readerArea_); + std::unique_ptr focusArea = setFocusArea(options); + + auto nmodes = options.getInt("modes"); + auto spread = options.getDouble("spread", 10.0); + auto sink = options.getBool("sink", false); + FIELD_TYPE_REAL min = options.getDouble("min", 0.0); + FIELD_TYPE_REAL max = options.getDouble("max", 1.0); + + // 1. Randomly determine N (nmodes) centres + std::vector modesLonLat; + sampleNCentres(options, modesLonLat); + + // 2. Randomly sample N spreads + std::vector spreads(nmodes); + if (rank_ == root_) { + std::random_device rd; + std::mt19937 generator(rd()); + // The blob size is inverse proportional to the spread + std::uniform_real_distribution distribution(90.0 / spread, 180.0 / spread); + for (size_t i = 0; i < nmodes; i++) { + spreads[i] = distribution(generator); + } + } + eckit::mpi::comm().broadcast(spreads, root_); + + // 3. Compute sinc + for (size_t i_pt = 0; i_pt < values.size(); i_pt++) { + atlas::PointLonLat p{lonlat(i_pt, atlas::LON), lonlat(i_pt, atlas::LAT)}; + if (focusArea && !focusArea->contains(p)) { + continue; + } + FIELD_TYPE_REAL value = 0.0; + for (size_t i = 0; i < nmodes; i++) { + value += sinc2d(p, modesLonLat[i], spreads[i]); + } + if (sink) { // scale and sign the result + values[i_pt] = min + (nmodes - value) * (max - min) / (1.21723 * nmodes); + } + else { + values[i_pt] = min + ((value + 0.21723 * nmodes) * (max - min)) / (1.21723 * nmodes); + } + } +} +void ConfigReader::applyGaussian(const eckit::LocalConfiguration& options, std::vector& values) { + // options are modes, max standard deviation (defaults to 1), sink (true if the blobs should not be peaks) + // min value (defaults to 0), and max value (defaults to 1) + auto lonlat = atlas::array::make_view(readerArea_); + std::unique_ptr focusArea = setFocusArea(options); + + auto nmodes = options.getInt("modes"); + auto sink = options.getBool("sink", false); + FIELD_TYPE_REAL min = options.getDouble("min", 0.0); + FIELD_TYPE_REAL max = options.getDouble("max", 1.0); + auto max_stddev = options.getDouble("max_stddev", 1.0); + + // 1. Randomly determine N (nmodes) centres + std::vector modesLonLat; + sampleNCentres(options, modesLonLat); + + // 2. Randomly sample N standard deviations + std::vector stddevs(2 * nmodes); + if (rank_ == root_) { + std::random_device rd; + std::mt19937 generator(rd()); + std::uniform_real_distribution distribution(1.0, max_stddev); + for (size_t i = 0; i < nmodes; i++) { + stddevs[2 * i] = distribution(generator); + stddevs[2 * i + 1] = distribution(generator); + } + } + eckit::mpi::comm().broadcast(stddevs, root_); + + // 3. Compute Gaussian + FIELD_TYPE_REAL value = 0.0; + for (size_t i_pt = 0; i_pt < values.size(); i_pt++) { + atlas::PointLonLat p{lonlat(i_pt, atlas::LON), lonlat(i_pt, atlas::LAT)}; + if (focusArea && !focusArea->contains(p)) { + continue; + } + value = 0.0; + for (size_t i = 0; i < nmodes; i++) { + value += gaussian2d(p, modesLonLat[i], {stddevs[2 * i], stddevs[2 * i + 1]}); + } + if (sink) { // scale and sign the result + values[i_pt] = min + (nmodes - value) * (max - min) / nmodes; + } + else { + values[i_pt] = min + value * (max - min) / nmodes; + } + } +} +//---------------------------------------------------------------------------------------------------------------------- +} // namespace nwp_emulator \ No newline at end of file diff --git a/src/nwp_emulator/data_reader.h b/src/nwp_emulator/data_reader.h new file mode 100644 index 0000000..3af6a7a --- /dev/null +++ b/src/nwp_emulator/data_reader.h @@ -0,0 +1,73 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#pragma once + +#include +#include + +#include "atlas/field/Field.h" +#include "eckit/exception/Exceptions.h" +#include "nwp_definitions.h" + +namespace nwp_emulator { + +/** + * @class DataReader + * @brief Base class for handling different types of data sources for the emulator. + */ +class DataReader { +protected: + std::string gridName_; + std::vector params_; + + int stepCountLimit_; ///< Limitation on the number of steps the emulator can run + int step_{0}; ///< Number of model steps + int count_{0}; ///< Number of messages for the current step + int index_{0}; ///< Index of the considered message at the current step + +public: + DataReader(int stepCountLimit) : stepCountLimit_(stepCountLimit) {} + virtual ~DataReader() = default; + + /** + * @brief Sets the reader lon,lat area if necessary for data generation. + * + * @param lonlat The lonlat field defininf the partition grid points. + */ + virtual void setReaderArea(const atlas::Field& lonlat) { + throw eckit::NotImplemented("This data reader does not support area definition", Here()); + } + + /** + * @brief Provides the raw values for a single levelin the next message from the data source. + * + * Each of the inheriting readers will have a different way to generate raw model data for the provider, + * which can use the parameters to map the raw values to a specific field, levtype and level. + * + * @param[out] shortName The name of the parameter represented by the raw values for the caller use. + * @param[out] levtype The levtype of the parameter for the caller use. + * @param[out] level The level of the levtype for the caller use. + * @param[out] data The vector that contains the raw values for the field. + * + * @return true if the data generation is successful, false otherwise. + */ + virtual bool nextMessage(std::string& shortName, std::string& levtype, std::string& level, + std::vector& data) = 0; + + /// Returns true when the reader has read model data for all the steps. + virtual bool done() = 0; + + /// Getters + const std::string& getGridName() const { return gridName_; } + int getStep() const { return step_; } + const std::vector& getParams() const { return params_; } +}; +} // namespace nwp_emulator \ No newline at end of file diff --git a/src/nwp_emulator/grib_file_reader.cc b/src/nwp_emulator/grib_file_reader.cc new file mode 100644 index 0000000..0396e00 --- /dev/null +++ b/src/nwp_emulator/grib_file_reader.cc @@ -0,0 +1,335 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#include +#include +#include +#include +#include + +#include "eckit/exception/Exceptions.h" +#include "eckit/log/Log.h" +#include "eckit/mpi/Comm.h" + +#include "grib_file_reader.h" +#include "nwp_definitions.h" + +namespace nwp_emulator { +void codesHandleDeleter::operator()(codes_handle* h) { + if (h) { + codes_handle_delete(h); + } +} + +void fileDeleter::operator()(FILE* f) { + if (f) { + fclose(f); + } +} + +GRIBFileReader::GRIBFileReader(const eckit::PathName& inputPath, size_t rank, size_t root, int stepCountLimit) : + rank_(rank), root_(root), DataReader(stepCountLimit) { + if (rank_ != root_) { + gridName_ = ""; + params_ = {}; + srcFilenames_ = {}; + } + else { + setupReader(inputPath, rank_ == root_); + validateSrcFiles(rank_ == root_); + } + // Broadcast emulator source params from main reader (root) + // 1. Grid identifier + size_t gridNameSize = gridName_.size(); + eckit::mpi::comm().broadcast(gridNameSize, root_); + if (rank_ != root_) { + gridName_.resize(gridNameSize); + } + eckit::mpi::comm().broadcast(gridName_.begin(), gridName_.end(), root_); + // 2. Field names & metadata (parameters) + std::string paramBuffer; + if (rank_ == root_) { + for (const auto& param : params_) { + paramBuffer += param + ";"; + } + paramBuffer.pop_back(); // remove last ';' separator + } + size_t paramSize = paramBuffer.size(); + eckit::mpi::comm().broadcast(paramSize, root_); + if (rank_ != root_) { + paramBuffer.resize(paramSize); + } + // broadcast a single string to limit communication + eckit::mpi::comm().broadcast(paramBuffer.begin(), paramBuffer.end(), root_); + if (rank_ != root_) { + std::stringstream ss(paramBuffer); + std::string param; + while (std::getline(ss, param, ';')) { + params_.push_back(param); + } + } + // 3. Number of model steps (files in the source folder) + size_t fileCount = srcFilenames_.size(); + eckit::mpi::comm().broadcast(fileCount, root_); + if (rank_ != root_) { + srcFilenames_.resize(fileCount); + } +} + +void GRIBFileReader::setupReader(const eckit::PathName& inputPath, bool isRoot) { + if (!isRoot) { + eckit::Log::warning() << "Only root process is allowed to read, skipping..." << std::endl; + return; + } + eckit::PathName srcPath; + std::vector skippedDirs; + if (!inputPath.exists()) { + eckit::Log::error() << inputPath << " does not exist" << std::endl; + eckit::mpi::comm().abort(1); + } + std::vector listFilenames; + // Open source directory and store source file names + // Each file is assumed to store data for a single model step + // (regardless of the actual time in the messages) + if (!inputPath.isDir()) { + inputPath.dirName().children(listFilenames, skippedDirs); // Use the dirname as dir and file as source + } + else { + inputPath.children(listFilenames, skippedDirs); // input path is a dir + } + // Remove any file that is not GRIB + for (size_t i = 0; i < listFilenames.size(); ++i) { + if ((listFilenames[i].extension() != ".grib") && (listFilenames[i].extension() != ".grib1") && + (listFilenames[i].extension() != ".grib2")) { + continue; + } + srcFilenames_.push_back(listFilenames[i]); + } + if (srcFilenames_.empty()) { + eckit::Log::error() << "No GRIB files remained in the source folder after filtering, exit..." << std::endl; + eckit::mpi::comm().abort(1); + } + // Steps should be stored in alphabetical order + std::sort(srcFilenames_.begin(), srcFilenames_.end()); + srcPath = eckit::PathName(srcFilenames_[0]); + if (srcFilenames_.size() > stepCountLimit_) { + eckit::Log::warning() << "The emulator is limited to " << stepCountLimit_ << " steps, " + << srcFilenames_.size() - stepCountLimit_ + << " steps from the source directory will be skipped" << std::endl; + } + else { + eckit::Log::info() << "The emulator will attempt to run " << srcFilenames_.size() << " steps" << std::endl; + } + if (!skippedDirs.empty()) { + eckit::Log::info() << "Any file in children directories will be ignored..." << std::endl; + } + // For later consistency checks & emulator setup : + openGribFile(srcPath, true); + eckit::Log::info() << "Number of messages : " << std::to_string(count_) << std::endl; + char buffer[64]; + size_t size = sizeof(buffer); + // 1. Lock the grid name from the passed GRIB file + if (codes_get_string(grib(), "gridName", buffer, &size) == GRIB_NOT_FOUND) { + eckit::Log::error() << "Grid type unsupported at the moment, exit" << std::endl; + eckit::mpi::comm().abort(1); + } + eckit::Log::info() << "Grid identifier : " << std::string(buffer) << std::endl; + gridName_ = std::string(buffer); + // 2. Lock the parameter names + std::string paramMd, _; + eckit::Log::info() << "Params (name,levtype,level) : "; + for (int i = 0; i < count_ - 1; ++i) { + readMsgMetadata(_, paramMd); + params_.push_back(paramMd); + eckit::Log::info() << params_[i] << "; "; + if (!resetGribHandle()) { + eckit::Log::error() << " Check source file " << srcPath << std::endl; + eckit::mpi::comm().abort(1); + } + } + readMsgMetadata(_, paramMd); + params_.push_back(paramMd); + eckit::Log::info() << paramMd; + std::set paramSet(params_.begin(), params_.end()); + if (paramSet.size() != params_.size()) { + eckit::Log::error() << "(param, levtype, level) combinations must be unique in each source file as they " + << " represent a single time step, exit..." << std::endl; + eckit::mpi::comm().abort(1); + } + eckit::Log::info() << std::endl; + closeGribFile(); +} + +void GRIBFileReader::validateSrcFiles(bool isRoot) { + if (!isRoot) { + eckit::Log::warning() << "Only root process is allowed to read, skipping..." << std::endl; + return; + } + std::string gridName, paramMd; + // Do not sort the `params_` directly as levels should be provided as they are encoded in the source file + std::vector sortedParams = params_; + std::sort(sortedParams.begin(), sortedParams.end()); + std::vector fileParams; + for (const auto& filename : srcFilenames_) { + fileParams.clear(); + openGribFile(filename, true); + for (int i = 0; i < count_ - 1; ++i) { + readMsgMetadata(gridName, paramMd); + if (gridName != gridName_) { + eckit::Log::error() << "Grid in " << filename << " is different from setup (" << gridName_ + << "), exit..." << std::endl; + eckit::mpi::comm().abort(1); + } + fileParams.push_back(paramMd); + if (!resetGribHandle()) { + eckit::Log::error() << " Check source file " << filename << std::endl; + eckit::mpi::comm().abort(1); + } + } + readMsgMetadata(gridName, paramMd); // read the last message in the file + if (gridName != gridName_) { + eckit::Log::error() << "Grid in " << filename << " is different from setup (" << gridName_ << "), exit..." + << std::endl; + eckit::mpi::comm().abort(1); + } + fileParams.push_back(paramMd); + + std::sort(fileParams.begin(), fileParams.end()); + if (fileParams != sortedParams) { + eckit::Log::error() << "Parameters and levels in " << filename + << " are inconsistent with other source files, exit..." << std::endl; + eckit::mpi::comm().abort(1); + } + closeGribFile(); + } +} + +void GRIBFileReader::readMsgMetadata(std::string& gridName, std::string& paramMd) { + char buffer[64]; + size_t size = sizeof(buffer); + gridName = ""; + int gridStatus = codes_get_string(grib(), "gridName", buffer, &size); + if (gridStatus != GRIB_NOT_FOUND) { + gridName = std::string(buffer); + } + paramMd = ""; + std::vector keywords = {"shortName", "levtype", "level"}; + for (const auto& keyword : keywords) { + std::memset(buffer, 0, 64); + size = sizeof(buffer); + codes_get_string(grib(), keyword, buffer, &size); + paramMd.append(buffer); + paramMd.append(","); // separator + } + paramMd.pop_back(); // remove last separator +} + +GRIBFileReader::~GRIBFileReader() { + closeGribFile(); +} + +bool GRIBFileReader::openGribFile(const eckit::PathName& path, bool exit) { + // No check for existence here because the paths passed belong to the src dir + // It is assumed they will not be deleted during runtime + // No check for file format as checked during construction + currentFile_.reset(fopen(path.asString().c_str(), "rb")); + if (!currentFile_) { + eckit::Log::error() << "Could not open file " << path << std::endl; + if (exit) { + eckit::mpi::comm().abort(1); + } + return false; // defer error handling + } + index_ = 0; + codes_count_in_file(nullptr, currentFile(), &count_); + if (!resetGribHandle()) { + if (exit) { + eckit::mpi::comm().abort(1); + } + return false; + } + return true; +} + +bool GRIBFileReader::resetGribHandle() { + int err; + // This calls the deleter of the previous handle first + grib_.reset(codes_handle_new_from_file(nullptr, currentFile(), PRODUCT_GRIB, &err)); + if (!grib_) { + eckit::Log::error() << "Could not create grib handle. Error: " << err << std::endl; + return false; + } + return true; +} + +void GRIBFileReader::closeGribFile() { + count_ = 0; + index_ = 0; + // Calls the custom destructors + currentFile_.reset(nullptr); + grib_.reset(nullptr); +} + +bool GRIBFileReader::done() { + return (step_ >= srcFilenames_.size() || step_ >= stepCountLimit_); +} + +bool GRIBFileReader::nextMessage(std::string& shortName, std::string& levtype, std::string& level, + std::vector& data) { + if ((index_ == count_ && index_ != 0) || done()) { + // All messages in the current file have been decoded, increase step + // or all files in source dir have been read, return + closeGribFile(); + ++step_; + return false; + } + // Open next file if beginning of the step + int result = 1; + if (rank_ == root_ && !currentFile_) { + if (!openGribFile(srcFilenames_[step_], false)) { + eckit::Log::error() << "An error occurred while reading: " << srcFilenames_[step_] << std::endl; + result = 0; + } + } + eckit::mpi::comm().broadcast(result, root_); // use int as bool can't be broadcast by eckit mpi + if (!result) { + closeGribFile(); + ++step_; + return false; + } + // Decode the next message metadata and raw values + if (rank_ == root_) { + std::string _, paramMd; + readMsgMetadata(_, paramMd); + std::stringstream ss(paramMd); + std::getline(ss, shortName, ','); + std::getline(ss, levtype, ','); + std::getline(ss, level, ','); + + size_t nbOfValues; + codes_get_size(grib(), "values", &nbOfValues); + data.resize(nbOfValues); + if constexpr (std::is_same_v) { + ASSERT((std::is_same_v)); + codes_get_double_array(grib(), "values", reinterpret_cast(data.data()), &nbOfValues); + } + else { + ASSERT((std::is_same_v)); + codes_get_float_array(grib(), "values", reinterpret_cast(data.data()), &nbOfValues); + } + } + ++index_; + eckit::mpi::comm().broadcast(count_, root_); + if (rank_ == root_ && index_ < count_) { + return resetGribHandle(); // Load the next message + } + return true; +} +} // namespace nwp_emulator \ No newline at end of file diff --git a/src/nwp_emulator/grib_file_reader.h b/src/nwp_emulator/grib_file_reader.h new file mode 100644 index 0000000..529c47a --- /dev/null +++ b/src/nwp_emulator/grib_file_reader.h @@ -0,0 +1,161 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#pragma once + +#include +#include +#include + +#include "eccodes.h" +#include "eckit/filesystem/PathName.h" + +#include "data_reader.h" + +namespace nwp_emulator { + +/** + * @brief A structure for a custom deleter to manage codes handles. + * + * This deleter allows to use codes handles pointers as unique pointers for safer memory usage. + */ +struct codesHandleDeleter { + void operator()(codes_handle* h); +}; + +/** + * @brief A structure for a custom deleter to manage files. + * + * This deleter allows to use C-style file pointers as unique pointers for safer memory usage. + * `std::fstream` is not compatible with eccodes, hence the need to stick to `FILE`. + */ +struct fileDeleter { + void operator()(FILE* f); +}; + +/** + * @class GRIBFileReader + * @brief Handles reading of GRIB data and serves them as Atlas fields. Supports parallel environments. + * + * This class provides inputs for a NWP emulator sourced from historical data stored in GRIB format. + * In parallel, only the root process is responsible for doing the reading to limit IO and unstability + * due to concurrent access to source files. The main reader construction determines the emulator + * parameters : folder from which to source the GRIB files (each file is considered as a single model step), + * number of emulator steps (smallest between file count and default/user limit), parameters offered by the + * model & grid (derived from the first file in the source folder content). + */ +class GRIBFileReader final : public DataReader { +private: + std::vector srcFilenames_; + std::unique_ptr currentFile_ = nullptr; + std::unique_ptr grib_ = nullptr; + + size_t rank_; + size_t root_; + + /** + * @brief Opens the current file, and loads the first GRIB message. + * + * This method stores the number of messages in the currently open file in count_. + * + * @param path The path of the file to open. + * @param exit Stops the execution if the to true and an error occurs. + * + * @return true if opening and decoding of first message successful, false otherwise. + */ + bool openGribFile(const eckit::PathName& path, bool exit); + + /// Closes current file, and resets pointers. + void closeGribFile(); + + /// Gets the grib handle pointer. + codes_handle* grib() { return grib_.get(); } + + /// Gets the file pointer. + FILE* currentFile() { return currentFile_.get(); } + + /** + * @brief Resets the GRIB handle. Decodes the next message in the current file. + * + * This method first resets the handle pointer using the deleter, making sure memory is properly deallocated. + * + * @return True if successful, false if there was an error loading the next message or if the current file + * has had all its messages decoded already. + */ + bool resetGribHandle(); + + /// Called from the constructor by the main reader + void setupReader(const eckit::PathName& path, bool isRoot); + + /** + * @brief Checks that all the source files comply with emulator requirements. + * + * Checks that all source files contain the same grid, parameters, level types and levels. + * + * @param isRoot Only the root process can run the validation. + * + * @note Exits the execution if the input is not valid. + */ + void validateSrcFiles(bool isRoot); + + /** + * @brief Decodes GRIB message metadata relevant to the emulator. + * + * Parameters are represented by a string built as follow: ,,. + * Each string must be unique as each file source is expected to represent a single time step. + * + * @param[out] gridName String to decode the grid name into. + * @param[out] paramMd Metadata relevant to emulator options for the parameter stored in the message. + */ + void readMsgMetadata(std::string& gridName, std::string& paramMd); + +public: + /** + * @brief Constructs a GRIBFileReader object and sets up emulator params if it is the main reader. + * + * The secondary readers, which only receive their share of the data from the provider, + * initialise the emulator options to empty defaults and wait for the main reader to broadcast + * the necessary params. + * + * @param path The path to the source folder (or any file within it). + * @param rank The rank of the current process. + * @param root The rank of the root process. + * @param stepCountLimit The number of maximum steps to run in the emulator. Defaults to 100. + * Any source file beyond that number will be skipped. + * + * @note The constructor aborts the execution in case the path passed does not allow to properly + * perform the emulator setup. + */ + GRIBFileReader(const eckit::PathName& inputPath, size_t rank, size_t root, int stepCountLimit = 100); + + /// Makes sure open files have been closed, and pointers not dereferenced already before destroying. + ~GRIBFileReader(); + + /** + * @brief Decodes the GRIB data from the next message in the current source file. + * + * This method reads the next GRIB file in the source folder if there isn't one currently open, + * The step_ and index_ counters are updated for all processes (main and secondary) to ensure termination. + * + * @param[out] shortName The name of the parameter decoded in the message for the caller use. + * @param[out] levtype The levtype of the message for the caller use. + * @param[out] level The level of the message for the caller use. + * @param[out] data The vector that contains the raw values for the decoded field. + * + * @return true if the decoding is successful, false otherwise. + */ + bool nextMessage(std::string& shortName, std::string& levtype, std::string& level, + std::vector& data) override; + + /// Returns true when the GRIB file reader is done or has reached its limit + bool done() override; +}; + +} // namespace nwp_emulator diff --git a/src/nwp_emulator/nwp_data_provider.cc b/src/nwp_emulator/nwp_data_provider.cc new file mode 100644 index 0000000..908f0b0 --- /dev/null +++ b/src/nwp_emulator/nwp_data_provider.cc @@ -0,0 +1,178 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#include + +#include "nwp_data_provider.h" + +namespace nwp_emulator { +NWPDataProvider::NWPDataProvider(const DataSourceType& sourceType, const eckit::PathName& inputPath, size_t rank, + size_t root, size_t nprocs) : + sourceType_(sourceType), rank_(rank), root_(root), nprocs_(nprocs) { + switch (sourceType_) { + case DataSourceType::GRIB: + eckit::Log::info() << "Emulator will use GRIB files as data source from " << inputPath << std::endl; + dataReader = std::make_unique(inputPath, rank, root); + break; + case DataSourceType::CONFIG: + eckit::Log::info() << "Emulator will use config as source type from" << inputPath << std::endl; + dataReader = std::make_unique(inputPath, rank, root); + break; + default: + eckit::Log::error() << "Emulator source type '" << static_cast(sourceType_) << "' not supported, exit..." << std::endl; + eckit::mpi::comm().abort(1); + } + + // Some logging for sanity + eckit::Log::info() << "Process " << rank << " has grid name " << dataReader->getGridName() << std::endl; + eckit::Log::info() << "Process " << rank << " has params " << dataReader->getParams() << std::endl; + + gridName_ = dataReader->getGridName(); + // Parse parameters into a dictionary structure + for (const auto& paramMd : dataReader->getParams()) { + std::stringstream ss(paramMd); + std::string shortName, levtype, level; + + // Read each part separated by comma + std::getline(ss, shortName, ','); + std::getline(ss, levtype, ','); + std::getline(ss, level, ','); + + params_[shortName][levtype].push_back(level); + } + + // Find the maximum number of levels across all params + size_t nlevels = 1; + for (const auto& levtypePair : params_) { + size_t paramlevels = 0; + for (const auto& levelPair : levtypePair.second) { + paramlevels += levelPair.second.size(); + } + nlevels = std::max(nlevels, paramlevels); + } + std::map fieldsMd; + for (const auto& levtypePair : params_) { + fieldsMd[levtypePair.first] = 1; // 2D fields receive 1 level + if (levtypePair.second.size() > 1 || levtypePair.second.begin()->second.size() > 1) { + // All 3D fields receive the same number of levels even if some levels remain empty for some fields + fieldsMd[levtypePair.first] = nlevels; + } + } + + // Atlas grid & function space + eckit::mpi::comm().barrier(); // Make sure all processes start creating their function space together + atlas::Grid grid(gridName_); + // Use the default partitioner for the given grid type + atlas::grid::Distribution distribution(grid, atlas::util::Config("type", grid.partitioner().getString("type")) | + atlas::util::Config("bands", nprocs_)); + fs_ = atlas::functionspace::StructuredColumns(grid, distribution, atlas::util::Config("levels", nlevels)); + if (sourceType_ == DataSourceType::CONFIG) { + dataReader->setReaderArea(fs_.lonlat()); + } + + // Create model fields + for (const auto& fieldMd : fieldsMd) { + auto field = modelFieldSet_.add(fs_.createField(atlas::option::name(fieldMd.first) | + atlas::option::levels(fieldMd.second))); + std::vector levelOrder; + for (const auto& levelPair : params_[field.name()]) { + levelOrder.push_back(levelPair.first); // preferred order should be sfc, ml then other types + } + std::sort(levelOrder.begin(), levelOrder.end(), [](const std::string& a, const std::string& b) { + // Priority for "sfc" and "ml", sort other levtypes alphabetically + if (a == "sfc") { + return true; + } + if (a == "ml") { + if (b == "sfc") { + return false; + } + return true; + } + return a < b; + }); + // This piece of metadata is used by the provider to populate the right levels, it should not be used by plugins + field.metadata().set("levelOrder", levelOrder); + std::cout << "Process " << rank_ << " created field '" << fieldMd.first << "' with shape " << field.shape() + << std::endl; + } +} + +bool NWPDataProvider::getStepData() { + if (dataReader->done()) { + return false; + } + switch (sourceType_) { + case DataSourceType::GRIB: + return GlobalStepData(); + case DataSourceType::CONFIG: + return LocalStepData(); + } + return false; +} + +bool NWPDataProvider::GlobalStepData() { + // Create a temporary global field set to populate all the data and gather model fields into it + atlas::FieldSet tmpGlobalFieldSet; + for (size_t i = 0; i < modelFieldSet_.size(); ++i) { + auto tmpField = tmpGlobalFieldSet.add(fs_.createField( + atlas::option::name(modelFieldSet_[i].name()) | atlas::option::datatype(modelFieldSet_[i].datatype()) | + atlas::option::levels(modelFieldSet_[i].levels()) | + atlas::option::variables(modelFieldSet_[i].variables()) | atlas::option::global(root_))); + tmpField.metadata().set("levelOrder", modelFieldSet_[i].metadata().getStringVector("levelOrder")); + } + fs_.gather(modelFieldSet_, tmpGlobalFieldSet); + + // Populate the tmp global field set with data from the reader + std::string shortName, levtype, level; + std::vector values; + while (dataReader->nextMessage(shortName, levtype, level, values)) { + // We don't verify what params are passed because the setup step in the reader allows the execution only + // if all source files have exactly the same set of fields on the same grid + if (rank_ == root_) { + auto source = atlas::array::make_view(tmpGlobalFieldSet.field(shortName)); + int level_idx = findLevelIndex(tmpGlobalFieldSet.field(shortName), levtype, level); + for (atlas::idx_t i_pt = 0; i_pt < tmpGlobalFieldSet.field(shortName).shape(0); i_pt++) { + source(i_pt, level_idx) = values[i_pt]; + } + } + } + fs_.scatter(tmpGlobalFieldSet, modelFieldSet_); + return true; +} + +bool NWPDataProvider::LocalStepData() { + std::string shortName, levtype, level; + std::vector values; + while (dataReader->nextMessage(shortName, levtype, level, values)) { + auto source = atlas::array::make_view(modelFieldSet_.field(shortName)); + int level_idx = findLevelIndex(modelFieldSet_.field(shortName), levtype, level); + for (atlas::idx_t i_pt = 0; i_pt < modelFieldSet_.field(shortName).shape(0); i_pt++) { + source(i_pt, level_idx) = values[i_pt]; + } + } + return true; +} + +int NWPDataProvider::findLevelIndex(atlas::Field& field, std::string levtype, std::string level) { + int level_idx = + std::distance(params_[field.name()][levtype].begin(), + std::find(params_[field.name()][levtype].begin(), params_[field.name()][levtype].end(), level)); + // level_idx + size of previous levtypes in levelOrder + std::vector levelOrder = field.metadata().getStringVector("levelOrder"); + for (int type_idx = 0; type_idx < levelOrder.size(); type_idx++) { + if (levelOrder[type_idx] == levtype) { + break; + } + level_idx += params_[field.name()][levelOrder[type_idx]].size(); + } + return level_idx; +} +} // namespace nwp_emulator \ No newline at end of file diff --git a/src/nwp_emulator/nwp_data_provider.h b/src/nwp_emulator/nwp_data_provider.h new file mode 100644 index 0000000..2d88a94 --- /dev/null +++ b/src/nwp_emulator/nwp_data_provider.h @@ -0,0 +1,135 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#pragma once +#include + +#include "eckit/config/LocalConfiguration.h" +#include "eckit/config/YAMLConfiguration.h" +#include "eckit/filesystem/PathName.h" +#include "eckit/log/Log.h" + +#include "atlas/field/FieldSet.h" +#include "atlas/functionspace/StructuredColumns.h" +#include "atlas/grid.h" +#include "atlas/grid/Distribution.h" + +#include "data_reader.h" +#include "config_reader.h" +#include "grib_file_reader.h" + +namespace nwp_emulator { + +/** + * @enum DataSourceType + * @brief Enumeration of the supported types of data source for the emulator. + */ +enum class DataSourceType +{ + GRIB, + CONFIG, + INVALID +}; + +/** + * @class NWPDataProvider + * @brief Requests model data from a data reader object, and provides it to the emulator as Atlas fields. + * + * Atlas is used to reproduce the partionioned environment of a NWP model run, as it has utilities to + * scatter data across processes, and it is the data structure used to run Plume. + * The parameters that the emulator will propose at each time step are locked at setup and live in params_ + * which stores information about the paramaters, level types and levels like so : + * params_ = {"u" : {"sfc" : ["0"], "ml" : ["1","137"], "pl": ["850"]}} + * In this example, the u component of the wind field has 4 levels in total in the emulator. + */ +class NWPDataProvider final { +private: + const DataSourceType sourceType_; + std::unique_ptr dataReader = nullptr; + std::unordered_map>> params_; + + std::string gridName_; + atlas::functionspace::StructuredColumns fs_; + atlas::FieldSet modelFieldSet_; + + size_t rank_; + size_t root_; + size_t nprocs_; + + /** + * @brief Requests that each reader generates its own share of the model data. + * + * This method can be used only with compatible data sources. Having all processes reading and extracting + * data from files like the GRIB reader is doing may be unstable or suboptimal. Conversely, the config reader + * might be able to leverage other workers as the data is generated from a function. + * + * @return true if model data has been successfully generated for the step, false otherwise. + */ + bool LocalStepData(); + + /** + * @brief Requests to the root reader to generate and scatter the global model data. + * + * This method can be used with all data sources, but might be performing less efficiently than the data + * source type may allow, because a single process is doing the actual data generation work, and there are + * extra steps of gathering and scattering data across the partitioned fields. + * + * @return true if model data has been successfully generated for the step, false otherwise. + */ + bool GlobalStepData(); + + /** + * @brief Utility method to find where the raw values passed by the reader should be stored. + * + * It is assumed that the source data can have multiple level types and levels. Ideally, the surface level + * should always have index 0, then next levels should be model levels (even though a 1:1 mapping cannot be + * assumed because we may have ml 1,5,10 which will result in a 3-model levels emulator), then other levels + * come in alphabetical order. In the above example for params_, the levels map to these indices : + * sfc,0 -> 0 ; ml,1 -> 1 ; ml,137 -> 2 ; pl,850 -> 3 + * + * @param field The field for which to find the level index. + * @param levtype The name of the level type. + * @param level The name of the level. + * + * @return int The index of the second shape of the Atlas field where the data should go. + */ + int findLevelIndex(atlas::Field& field, std::string levtype, std::string level); + +public: + /** + * @brief Constructs a NWPDataProvider object. + * + * Maintains an instance of a reader corresponding to the user provided setup. Instantiates all + * the model Atlas function space, and fields. + * + * @param sourceType The type of data source to use for this emulator run. + * @param inputPath The path to the config or source directory for the data reader. + * @param rank The rank of the current process. + * @param root The rank of the root process. + * @param nprocs The total number of processors that run the emulator. + * + * @note The constructor aborts the execution unless its setup and the data reader setups are successful. + */ + NWPDataProvider(const DataSourceType& sourceType, const eckit::PathName& inputPath, size_t rank, size_t root, + size_t nprocs); + + /** + * @brief Populates Atlas fields with data for the next emulator step. + * + * @return bool True if there is still more emulator steps to run, false once all the data has been provided + * for the current emulator run. + */ + bool getStepData(); + + /// Getters + atlas::FieldSet& getModelFieldSet() { return modelFieldSet_; } + int getStep() const { return dataReader->getStep(); } +}; +} // namespace nwp_emulator \ No newline at end of file diff --git a/src/nwp_emulator/nwp_definitions.h b/src/nwp_emulator/nwp_definitions.h new file mode 100644 index 0000000..8129a40 --- /dev/null +++ b/src/nwp_emulator/nwp_definitions.h @@ -0,0 +1,18 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#pragma once + + +#ifdef WITH_NWP_EMULATOR_SINGLE_PRECISION +#define FIELD_TYPE_REAL float +#else +#define FIELD_TYPE_REAL double +#endif \ No newline at end of file diff --git a/src/nwp_emulator/nwp_emulator.cc b/src/nwp_emulator/nwp_emulator.cc new file mode 100644 index 0000000..d54b3f3 --- /dev/null +++ b/src/nwp_emulator/nwp_emulator.cc @@ -0,0 +1,166 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#include +#include + +#include "eckit/config/YAMLConfiguration.h" +#include "eckit/filesystem/PathName.h" +#include "eckit/mpi/Comm.h" + +#include "atlas/library.h" +#include "atlas/option/Options.h" +#include "atlas/parallel/mpi/mpi.h" +#include "atlas/runtime/AtlasTool.h" + +#include "plume/Manager.h" +#include "plume/data/ModelData.h" +#include "plume/data/ParameterCatalogue.h" + +#include "nwp_data_provider.h" +#include "nwp_definitions.h" + +using namespace nwp_emulator; + +/** + * @class NWPEmulator + * @brief Emulates a model run and makes data available at each time step to facilitate Plume and plugins testing. + */ +class NWPEmulator final : public atlas::AtlasTool { + + int execute(const Args& args) override; + std::string briefDescription() override { return "NWP model emulator to facilitate Plume & plugins development"; } + std::string usage() override { + return name() + + " [--grib-src= | --config-src=] [--plume-cfg=] [OPTION]... [--help]\n" + " --plume-cfg is optional, pass it to run Plume, else the emulator will do a dry run\n"; + } + + int numberOfPositionalArguments() override { return -1; } + int minimumPositionalArguments() override { return 0; } + +public: + NWPEmulator(int argc, char** argv) : dataSourceType_(DataSourceType::INVALID), atlas::AtlasTool(argc, argv) { + add_option(new SimpleOption("grib-src", "Path to GRIB files source")); + add_option(new SimpleOption("config-src", "Path to emulator config file")); + add_option(new SimpleOption("plume-cfg", "Path to Plume configuration")); + } + +private: + std::string dataSourcePath_; + DataSourceType dataSourceType_; + + /// Plume members if needed + std::string plumeConfigPath_; + plume::data::ModelData plumeData_; + + /** + * @brief Sets up Plume framework and load plugins compatible with model params. + * + * @param dataProvider The object that provides the data for the Atlas fields offered by the emulated model. + * + * @return true if Plume configuration, negotiation and feeding are successful, false otherwise. + */ + bool setupPlume(NWPDataProvider& dataProvider); + + /** + * @brief Update necessary parameters offered to Plume other than Atlas fields, and run the plugins for the step. + * + * @param step The internal model step number. + */ + void runPlume(int step); +}; + +int NWPEmulator::execute(const Args& args) { + // Emulator configuration + std::string gribSrcArg; + if (args.get("grib-src", gribSrcArg)) { + dataSourcePath_ = gribSrcArg; + dataSourceType_ = DataSourceType::GRIB; + } + std::string configSrcPath; + if (args.get("config-src", configSrcPath)) { + if (dataSourceType_ != DataSourceType::INVALID) { + eckit::Log::error() << "Usage : " << usage() << std::endl; + return 1; + } + dataSourcePath_ = configSrcPath; + dataSourceType_ = DataSourceType::CONFIG; + } + if (dataSourcePath_.empty()) { + eckit::Log::error() << "Usage : " << usage() << std::endl; + return 1; + } + args.get("plume-cfg", plumeConfigPath_); + + size_t root = 0; + size_t nprocs = eckit::mpi::comm().size(); + size_t rank = eckit::mpi::comm().rank(); + + NWPDataProvider dataProvider(dataSourceType_, eckit::PathName{dataSourcePath_}, rank, root, nprocs); + + // Plume loading if a Plume configuration file has been passed, emulator dry run otherwise + if (!plumeConfigPath_.empty()) { + eckit::Log::info() << "The emulator will run Plume with configuration '" << plumeConfigPath_ << "'" + << std::endl; + setupPlume(dataProvider); + } + + // Run the emulator + while (dataProvider.getStepData()) { + // This is a model step + if (!plumeConfigPath_.empty()) { + runPlume(dataProvider.getStep()); + } + eckit::mpi::comm().barrier(); + } + + // Tear down where appropriate and wait for all processes before finishing + if (!plumeConfigPath_.empty()) { + plume::Manager::teardown(); + } + + eckit::mpi::comm().barrier(); + std::cout << "Process " << rank << " finished..." << std::endl; + eckit::Log::info() << "Emulator run completed..." << std::endl; + return 0; +} + +bool NWPEmulator::setupPlume(NWPDataProvider& dataProvider) { + plume::Manager::configure(eckit::YAMLConfiguration(eckit::PathName(plumeConfigPath_))); + + plume::Protocol offers; /// Define data offered by Plume + offers.offerInt("NSTEP", "always", "Simulation Step"); + auto fields = dataProvider.getModelFieldSet(); + for (const auto& field: fields) { + offers.offerAtlasField(field.name(), "on-request", field.name()); + } + plume::Manager::negotiate(offers); + plumeData_.createInt("NSTEP", 0); /// Initialise parameters + for (auto& field: fields) { + if (plume::Manager::isParamRequested(field.name())) { + plumeData_.provideAtlasFieldShared(field.name(), field.get()); + } + } + + plume::Manager::feedPlugins(plumeData_); + return true; +} + +void NWPEmulator::runPlume(int step) { + plumeData_.updateInt("NSTEP", step); + plume::Manager::run(); +} + +int main(int argc, char** argv) { + setenv("ATLAS_LOG_FILE", "true", 1); + NWPEmulator emulator(argc, argv); + return emulator.start(); +} \ No newline at end of file diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 909b748..df3b438 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -7,5 +7,10 @@ # granted to it by virtue of its status as an intergovernmental organisation nor # does it submit to any jurisdiction. +if( NOT DEFINED MPI_SLOTS ) + set( MPI_SLOTS 9999 ) +endif() + add_subdirectory(core) -add_subdirectory(api) \ No newline at end of file +add_subdirectory(api) +add_subdirectory(nwp_emulator) \ No newline at end of file diff --git a/tests/nwp_emulator/CMakeLists.txt b/tests/nwp_emulator/CMakeLists.txt new file mode 100644 index 0000000..6b6d0be --- /dev/null +++ b/tests/nwp_emulator/CMakeLists.txt @@ -0,0 +1,63 @@ +# (C) Copyright 2025- ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation nor +# does it submit to any jurisdiction. +# Test files repository on Nexus +set(ECBUILD_DOWNLOAD_BASE_URL https://get.ecmwf.int/repository/plume-test-data) + +set(test_nwp_emulator_files_dir ${CMAKE_BINARY_DIR}/tests/nwp_emulator/data) +add_custom_target( + make_nwp_emulator_test_data_dir ALL COMMAND ${CMAKE_COMMAND} -E make_directory ${test_nwp_emulator_files_dir} +) + +ecbuild_get_test_multidata( + TARGET test_nwp_emulator_files + NAMES + model_data_1.grib + model_data_2.grib + DIRNAME + nwp_emulator + DIRLOCAL + ${test_nwp_emulator_files_dir} + NOCHECK +) + +ecbuild_add_test( + TARGET plume_test_nwp_grib + SOURCES test_grib_reader.cc + LIBS plume_nwp_emulator + ENVIRONMENT TEST_DATA_DIR=${test_nwp_emulator_files_dir} + MPI 3 + CONDITION eckit_HAVE_MPI + TEST_DEPENDS test_nwp_emulator_files +) + +ecbuild_add_test( + TARGET plume_test_nwp_config + SOURCES test_config_reader.cc + LIBS plume_nwp_emulator + ENVIRONMENT TEST_DATA_DIR=${CMAKE_CURRENT_SOURCE_DIR}/data/ + MPI 3 + CONDITION eckit_HAVE_MPI +) + +ecbuild_add_library( + TARGET nwp_emulator_test_plugin + SOURCES + nwp_emulator_plugin.h + nwp_emulator_plugin.cc + PRIVATE_LIBS + plume_plugin +) + +ecbuild_add_test( + TARGET plume_test_nwp_tool + LIBS simple_plugin nwp_emulator_test_plugin + COMMAND nwp_emulator_run.x + ARGS --config-src=${CMAKE_CURRENT_SOURCE_DIR}/data/valid_config.yml + --plume-cfg=${CMAKE_CURRENT_SOURCE_DIR}/data/plume_config.yml +) \ No newline at end of file diff --git a/tests/nwp_emulator/data/invalid_config.yml b/tests/nwp_emulator/data/invalid_config.yml new file mode 100644 index 0000000..83742c9 --- /dev/null +++ b/tests/nwp_emulator/data/invalid_config.yml @@ -0,0 +1 @@ +no_emulator_key: -1 \ No newline at end of file diff --git a/tests/nwp_emulator/data/plume_config.yml b/tests/nwp_emulator/data/plume_config.yml new file mode 100644 index 0000000..544090d --- /dev/null +++ b/tests/nwp_emulator/data/plume_config.yml @@ -0,0 +1,14 @@ +{ + "plugins": [ + { + "name": "SimplePlugin", + "lib": "simple_plugin", + "plugincore-config": {} + }, + { + "name": "NWPEmulatorPlugin", + "lib": "nwp_emulator_test_plugin", + "plugincore-config": {} + } + ] +} \ No newline at end of file diff --git a/tests/nwp_emulator/data/valid_config.yml b/tests/nwp_emulator/data/valid_config.yml new file mode 100644 index 0000000..9cc7550 --- /dev/null +++ b/tests/nwp_emulator/data/valid_config.yml @@ -0,0 +1,39 @@ +emulator: + n_steps: 2 + grid_identifier: "N80" + vertical_levels: 5 + fields: + 100u: + levtype: "sfc" + apply: + vortex_rollup: + area: [71.5, -25, 34.5, 45] + time_variation: 1.1 + u: + apply: + levels: + "2": + random: + distribution: "uniform" + min: 1.0 + max: 2.0 + step: + area: [71.5, -25, 34.5, 45] + value: 10.0 + variation: 1.0 + translation: [1.0, 1.0] + "1,3": + sinc: + modes: 3 + min: -1.0 + max: 10.0 + spread: 10.0 + sink: false + "4:": + gaussian: + modes: 2 + min: 1.0 + max: 2.0 + max_stddev: 3.0 + sink: true + v: "u" \ No newline at end of file diff --git a/tests/nwp_emulator/nwp_emulator_plugin.cc b/tests/nwp_emulator/nwp_emulator_plugin.cc new file mode 100644 index 0000000..0153f7b --- /dev/null +++ b/tests/nwp_emulator/nwp_emulator_plugin.cc @@ -0,0 +1,46 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#include "nwp_emulator_plugin.h" +#include + + +namespace nwp_emulator_test_plugin { + +REGISTER_LIBRARY(NWPEmulatorPlugin) + +NWPEmulatorPlugin::NWPEmulatorPlugin() : Plugin("NWPEmulatorPlugin"){}; + +NWPEmulatorPlugin::~NWPEmulatorPlugin(){}; + +const NWPEmulatorPlugin& NWPEmulatorPlugin::instance() { + static NWPEmulatorPlugin instance; + return instance; +} +//-------------------------------------------------------------- + + +// NWPEmulatorPluginCore +static plume::PluginCoreBuilder runnable_plugincore_FooBuilder_; + +NWPEmulatorPluginCore::NWPEmulatorPluginCore(const eckit::Configuration& conf) : PluginCore(conf) {} + +NWPEmulatorPluginCore::~NWPEmulatorPluginCore() {} + +void NWPEmulatorPluginCore::run() { + eckit::Log::info() << "Consuming parameters " << modelData().getAtlasFieldShared("100u").name() << ", " + << modelData().getAtlasFieldShared("u").name() << ", " + << modelData().getAtlasFieldShared("v").name() << std::endl; +} + +//-------------------------------------------------------------- + + +} // namespace nwp_emulator_test_plugin diff --git a/tests/nwp_emulator/nwp_emulator_plugin.h b/tests/nwp_emulator/nwp_emulator_plugin.h new file mode 100644 index 0000000..79e41c7 --- /dev/null +++ b/tests/nwp_emulator/nwp_emulator_plugin.h @@ -0,0 +1,55 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#include +#include "plume/Plugin.h" +#include "plume/PluginCore.h" + +namespace nwp_emulator_test_plugin { + + +// ------ Foo runnable plugincore that self-registers! ------- +class NWPEmulatorPluginCore : public plume::PluginCore { +public: + NWPEmulatorPluginCore(const eckit::Configuration& conf); + ~NWPEmulatorPluginCore(); + void run() override; + constexpr static const char* type() { return "nwpemulator-plugincore"; } +}; +// ------------------------------------------------------ + +// ------------------------------------------------------ +class NWPEmulatorPlugin : public plume::Plugin { + +public: + NWPEmulatorPlugin(); + ~NWPEmulatorPlugin(); + + plume::Protocol negotiate() override { + plume::Protocol protocol; + protocol.requireAtlasField("100u"); + protocol.requireAtlasField("u"); + protocol.requireAtlasField("v"); + + return protocol; + } + + // Return the static instance + static const NWPEmulatorPlugin& instance(); + + std::string version() const override { return "0.0.1-NWPEmulator"; } + + std::string gitsha1(unsigned int count) const override { return "undefined"; } + + virtual std::string plugincoreName() const override { return NWPEmulatorPluginCore::type(); } +}; +// ------------------------------------------------------ + +} // namespace nwp_emulator_test_plugin diff --git a/tests/nwp_emulator/test_config_reader.cc b/tests/nwp_emulator/test_config_reader.cc new file mode 100644 index 0000000..7ed33c3 --- /dev/null +++ b/tests/nwp_emulator/test_config_reader.cc @@ -0,0 +1,118 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#include +#include + +#include "atlas/array/ArrayShape.h" +#include "atlas/util/function/VortexRollup.h" +#include "eckit/filesystem/PathName.h" +#include "eckit/mpi/Comm.h" +#include "eckit/testing/Test.h" + +#include "nwp_emulator/nwp_data_provider.h" +#include "nwp_emulator/nwp_definitions.h" + +using namespace eckit::testing; +using namespace nwp_emulator; + +namespace eckit { +namespace test { +CASE("test_focus_area") { + atlas::PointLonLat p1{10.12, 65.59}, p2{187.10, 50.73}; + + // area crossing the longitude 0/360 + FocusArea areaA{326.70, 47.91, 48.35, 72.98}; + EXPECT(areaA.contains(p1)); + EXPECT_NOT(areaA.contains(p2)); + + // area not crossing the longitude 0/360 + FocusArea areaB{155.0, 225.0, 34.5, 71.5}; + EXPECT(areaB.contains(p2)); + EXPECT_NOT(areaB.contains(p1)); +} +CASE("test_reader_setup") { + std::string configPath(std::getenv("TEST_DATA_DIR")); + ConfigReader* dataReader; + EXPECT_NO_THROW( + dataReader = new ConfigReader(eckit::PathName{configPath + "valid_config.yml"}, eckit::mpi::comm().rank(), 0);); + // The parameters should be provided in the order they are in the configuration + std::vector expectedParams{"100u,sfc,0", "u,ml,1", "u,ml,2", "u,ml,3", "u,ml,4", "u,ml,5", + "v,ml,1", "v,ml,2", "v,ml,3", "v,ml,4", "v,ml,5"}; + EXPECT_EQUAL(dataReader->getGridName(), "N80"); + EXPECT_EQUAL(dataReader->getParams(), expectedParams); + delete dataReader; +} +CASE("test_invalid_config") { + std::string configPath(std::getenv("TEST_DATA_DIR")); + EXPECT_THROWS( + ConfigReader dataReader(eckit::PathName{configPath + "invalid_config.yml"}, eckit::mpi::comm().rank(), 0)); +} +CASE("test_data_generation") { + std::string configPath(std::getenv("TEST_DATA_DIR")); + NWPDataProvider dataProvider(DataSourceType::CONFIG, eckit::PathName{configPath + "valid_config.yml"}, + eckit::mpi::comm().rank(), 0, eckit::mpi::comm().size()); + + std::vector expectedFields{"100u", "u", "v"}; + EXPECT_EQUAL(dataProvider.getModelFieldSet().field_names(), expectedFields); + + // With N80 there is 35718 grid points, so 11906 per partition + EXPECT_EQUAL(dataProvider.getModelFieldSet().field("u").shape(), atlas::array::make_shape(11906, 5)); + EXPECT_EQUAL(dataProvider.getModelFieldSet().field("v").shape(), atlas::array::make_shape(11906, 5)); + EXPECT_EQUAL(dataProvider.getModelFieldSet().field("100u").shape(), atlas::array::make_shape(11906, 1)); + + auto lonlat = + atlas::array::make_view(dataProvider.getModelFieldSet().field("u").functionspace().lonlat()); + FocusArea europe{155.0, 225.0, 34.5, 71.5}; + + int iterCount = 0; // avoid infinite while loop + while (dataProvider.getStepData()) { + iterCount++; + EXPECT_MSG(iterCount < 3, [=]() { + std::cerr << "The data provider has iterated through all steps, it should return false now..." << std::endl; + };); + + // 1. Vortex rollup & area mask + auto field100u = atlas::array::make_view(dataProvider.getModelFieldSet().field("100u")); + // Point outside of the focus area should remain zeros + EXPECT_NOT(europe.contains(atlas::PointLonLat{lonlat(0, 0), lonlat(0, 1)})); + EXPECT(std::abs(field100u(0, 0)) < 1e-4); + + if (eckit::mpi::comm().rank() == 0) { + // Europe is entirely owned by partition 0 + EXPECT(europe.contains(atlas::PointLonLat{lonlat(1132, 0), lonlat(1132, 1)})); + EXPECT(std::abs(field100u(1132, 0) - atlas::util::function::vortex_rollup(lonlat(1132, 0), lonlat(1132, 1), + (iterCount - 1) * 1.1)) < 1e-4); + } + auto fieldu = atlas::array::make_view(dataProvider.getModelFieldSet().field("u")); + // 2. Random + EXPECT(fieldu(0, 1) - 1 > 1e-6); + EXPECT(2 - fieldu(0, 1) > 1e-6); + // 3. Step + if (eckit::mpi::comm().rank() == 0) { + EXPECT(std::abs(fieldu(1132, 1) - 10 - (iterCount - 1)) < 1e-6); + } + // 4. Sinc + EXPECT(fieldu(0, 0) + 1 > 1e-6); + EXPECT(10 - fieldu(0, 0) > 1e-6); + // 5. Gaussian + EXPECT(fieldu(0, 4) > 1); + EXPECT(fieldu(0, 4) < 2 + 1e-6); + + eckit::mpi::comm().barrier(); + } + EXPECT_EQUAL(dataProvider.getStep(), 2); +} +} // namespace test +} // namespace eckit + +int main(int argc, char** argv) { + return run_tests(argc, argv); +} \ No newline at end of file diff --git a/tests/nwp_emulator/test_grib_reader.cc b/tests/nwp_emulator/test_grib_reader.cc new file mode 100644 index 0000000..e47b141 --- /dev/null +++ b/tests/nwp_emulator/test_grib_reader.cc @@ -0,0 +1,75 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ +#include +#include + +#include "atlas/array/ArrayShape.h" +#include "eckit/filesystem/PathName.h" +#include "eckit/mpi/Comm.h" +#include "eckit/testing/Test.h" + +#include "nwp_emulator/nwp_data_provider.h" +#include "nwp_emulator/nwp_definitions.h" + +using namespace eckit::testing; +using namespace nwp_emulator; + +namespace eckit { +namespace test { +CASE("test_reader_setup") { + GRIBFileReader* dataReader; + EXPECT_NO_THROW( + dataReader = new GRIBFileReader(eckit::PathName{std::getenv("TEST_DATA_DIR")}, eckit::mpi::comm().rank(), 0)); + // The parameters should be given to the provider in the order in which they are encoded in the grib source file + std::vector expectedParams{"u,ml,1", "u,ml,28", "u,ml,55", "u,ml,82", + "u,ml,109", "u,ml,136", "100u,sfc,0"}; + // Ensure that the main and secondary readers all share the same setup + EXPECT_EQUAL(dataReader->getGridName(), "N80"); + EXPECT_EQUAL(dataReader->getParams(), expectedParams); + delete dataReader; +} +CASE("test_data_reading") { + NWPDataProvider dataProvider(DataSourceType::GRIB, eckit::PathName{std::getenv("TEST_DATA_DIR")}, + eckit::mpi::comm().rank(), 0, eckit::mpi::comm().size()); + + std::vector expectedFields{"100u", "u"}; + EXPECT_EQUAL(dataProvider.getModelFieldSet().field_names(), expectedFields); + + // With N80 there is 35718 grid points, so 11906 per partition + EXPECT_EQUAL(dataProvider.getModelFieldSet().field("u").shape(), atlas::array::make_shape(11906, 6)); + EXPECT_EQUAL(dataProvider.getModelFieldSet().field("100u").shape(), atlas::array::make_shape(11906, 1)); + + std::vector expectedValuesU{14.5716, 8.74349, 6.9388, -5.17533, 6.08444, 11.8501}; + std::vector expectedValues100U{1.48735, -3.94234, -1.19038, 1.22401, -7.02989, 1.72304}; + + int iterCount = 0; // avoid infinite while loop + while (dataProvider.getStepData()) { + iterCount++; + EXPECT_MSG(iterCount < 3, [=]() { + std::cerr << "The data provider has iterated through all steps, it should return false now..." << std::endl; + };); + + auto field100u = atlas::array::make_view(dataProvider.getModelFieldSet().field("100u")); + auto fieldu = atlas::array::make_view(dataProvider.getModelFieldSet().field("u")); + EXPECT(std::abs(fieldu(0, 0) - expectedValuesU[(dataProvider.getStep() - 1) * eckit::mpi::comm().size() + + eckit::mpi::comm().rank()]) < 1e-4); + EXPECT(std::abs(field100u(0, 0) - expectedValues100U[(dataProvider.getStep() - 1) * eckit::mpi::comm().size() + + eckit::mpi::comm().rank()]) < 1e-4); + eckit::mpi::comm().barrier(); + } + EXPECT_EQUAL(dataProvider.getStep(), 2); +} +} // namespace test +} // namespace eckit + +int main(int argc, char** argv) { + return run_tests(argc, argv); +} \ No newline at end of file