Skip to content

Adding the Radklim reader with the stream YAML file #386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 78 commits into
base: develop
Choose a base branch
from

Conversation

wael-mika
Copy link

@wael-mika wael-mika commented Jun 24, 2025

Description

This PR introduces a new data reader for the RADKLIM precipitation dataset, implemented using the base class interface. It also adds the corresponding YAML stream configuration under config/streams/streams_radklim/.

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • [ X ] New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update

Issue Number

Closes #216

Code Compatibility

  • [ X ] I have performed a self-review of my code

Code Performance and Testing

  • I ran the uv run train and (if necessary) uv run evaluate on a least one GPU node and it works
  • If the new feature introduces modifications at the config level, I have made sure to have notified the other software developers through Mattermost and updated the paths in the $WEATHER_GENERATOR_PRIVATE directory

Dependencies

  • I have ensured that the code is still pip-installable after the changes and runs
  • I have tested that new dependencies themselves are pip-installable.
  • [ X ] I have not introduced new dependencies in the inference portion of the pipeline

Documentation

  • [ X ] My code follows the style guidelines of this project
  • I have updated the documentation and docstrings to reflect the changes
  • [ X ] I have added comments to my code, particularly in hard-to-understand areas

Additional Notes

The stream configuration assumes the existence of RADKLIM-compatible reference data and normalization files.

The reader has been structured for easy extension to similar NetCDF-based datasets.

Further testing with training on multiple streams configurations and longer time windows is NECESSARY.

Copy link
Collaborator

@clessig clessig left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try it out but here already some observations from looking at the code.

"""

# Channels used for source and target data
source_channels: list[str] = ["RR"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be specified in the stream config and not hard coded, see select_channels() in data_reader_anemoi.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will adjust it, but as I mentioned, I need some guidance at the beginning.

geoinfo_channels: list[str] = []

# Channel indices
source_idx: list[int] = [0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be derived from the source_channels and not hard coded.

nt, ny, nx, nvars = arr4.shape

# Validate channel indices
if not channels_idx:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be tested at the beginning of the function

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes sense, I will move it to the correct location.

times = np.repeat(time_vals, self.points_per_slice)

# Apply nan filtering
valid = ~np.any(np.isnan(flat_vars[:, channels_idx]), axis=1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No NaNs should be filtered here. This happens later.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok then, I will remove it.

ds_win = self.ds.isel(time=slice(start, stop))

# Stack into (time, y, x, var) format
arr4 = (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what arr4 is and how it comes out of ds_win

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the dataset for the selected time window
ds_win summary:
<xarray.Dataset> Size: 119MB
Dimensions: (time: 6, y: 1100, x: 900)
Coordinates:
lat (time, y, x) float64 48MB ...
lon (time, y, x) float64 48MB ...

  • time (time) datetime64[ns] 48B 2020-01-03T12:50:00 ... 2020-01-03T17:...
  • x (x) float64 7kB -4.43e+05 -4.42e+05 -4.41e+05 ... 4.55e+05 4.56e+05
  • y (y) float64 9kB -4.758e+06 -4.757e+06 ... -3.66e+06 -3.659e+06
    Data variables:
    RR (time, y, x) float32 24MB ...

This is the shape of the array when requesting the variable ('var', 'time', 'y', 'x')
arr4 (before transpose):
dims : ('var', 'time', 'y', 'x')
shape: (1, 6, 1100, 900)

Transpose to get (time, y, x, var) this was the requested shape from the older version, I can adapt to any shape you might need
arr4 (after transpose and .values):
type : <class 'numpy.ndarray'>
shape: (6, 1100, 900, 1)


RADKLIM :
type : netcdf
referece_path : "/p/scratch/weatherai/data/npp-atms-unpacked/temp_radklim/radklim_output_kerchunk/radklim_full_dataset.json"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

referece_path : typo
also, this will need to change later to make it agnostic to the hpc

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we certainly will do that later, the idea is to try the data reader if it works, we can then make the ultimate design

return rdata


def _clip_lat(lats: NDArray[np.floating]) -> NDArray[np.float32]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's reuse the existing functions instead of copying them around

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can do that as well

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Careful here: this function will not correctly convert from an arbitrary convention for spherical coords to the one we expect internally. For this reason, I would keep it local.

Wael, did you check that it is required at all?

with contextlib.suppress(Exception):
zarr.consolidate_metadata(mapper)

ds_full = xr.open_dataset(mapper, engine="zarr", consolidated=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's it?? it works just like that? I thought we would have to depend on virtualizarr for the reading too, but I am wrong it seems

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the kerchunk version; this are the two magic lines:

 fs = fsspec.filesystem("reference", fo=kerchunk_ref)
mapper = fs.get_mapper("")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is this simple after creating the reference file and then using the little tricks of fsspec.

clessig and others added 16 commits June 26, 2025 21:09
* Exclude channels from src / target

* Simplified code and added comment that pattern matching is used

* Adding new stream config

* Fixing bug that led to error when accessing self.ds when dataset is empty

* Wokign on exlcude_source

* work in progress

* Fixing incorrect formating for logger (ecmwf#388)

* Ruffed

* Refactored and cleaned up channel selection. Also added check that channels are not empty

* Cleaned channel parsing and selection

* Adjustments

* Removing asserts incompatible with empty dataset

---------

Co-authored-by: Christian Lessig <[email protected]>
* chanegs

* mistake

* mistake

* mistake

* changes

* doc
* creating masking class and adapting tokenizer_masking to use this class

* minor changes to masking.py and tokenizer_masking

* removed old tokenizer_masking

* include masking_strategy in default_config

* change ValueError to assert

* linting formatting changes files

* further linting of docstrings

* create mask_source and mask_target in Masker, and update tokenizer_masking to use these, then style improvements

* linted masking, tokenizer_masking

* modify masker, rng and perm_sel now part of class, remove extra masking_rate, update comments, remove archived class

* remove check if all masked, not masked

* remove self.masking_rate from MultiStreamDS class, and masking args from batchify_source

* update tokenizer utils with description of idx_ord_lens in comment

* remove masking args from batchify_, perm_sel removed now internal to Masker class, remove handling special cases of masking (all masked)

* adding masking_strategy: to config

* remove unused mentions of masking_combination

* removed comment about streams

* changed assert to check self perm_sel is not None

* ruff masking, tokenizer_masking

* Ruffed

* Added warning to capture corner case, likely due to incorrect user settings.

* Fixed incorrect call twice

* Fixed missing conditional for logger statement

* Required changes for better handling of rngs

* Improved handling of rngs

* Improved handling of rng

---------

Co-authored-by: Christian Lessig <[email protected]>
* Fix bug with seed being divided by 0 for worker ID=0

* Fix bug causing crash when secrets aren't in private config

* Implement logging losses per channel

* Fix issue with empty targets

* Rework loss logging

* ruff

* Remove computing max_channels

* Change variables names

* ruffed

* Remove redundant enumerations

* Use stages for logging

* Add type hints

* Apply the review

* ruff

* fix

* Fix type hints

* ruff

---------

Co-authored-by: Tim Hunter <[email protected]>
* changes

* changes
shuffle=False for validation at the moment. Should be True to have an unbiased MC estimator over the full val set.
* changes

* changes

* change
* - Avoid time encoding is 0
- eps in layer norms to 10^-3
- bf16

* Fixed incorrect cast

* Make the attention dtype and norm eps configurable

* Fix gitignore and add config files

* Shuffle config files into sensible folders

* Try fp16

* Fix some missing hardcoded

* recover num_ranks from previous run to calculate epoch_base (ecmwf#317)

* recover num_ranks from previous run to calculate epoch_base

* set email settings for commits

* addressing Tim's comment

* make ruff happy

* improve style

* changes (ecmwf#385)

Linter rule so np.ndarray is not used as type

* changed the script name from evaluate to inference as it simply gener… (ecmwf#376)

* changed the script name from evaluate to inference as it simply generate infer samples

* changed evaluate to inference in the main scripts and corresponding calls in the config

* update the main function for the inference script

* changed evaluate to inference also in docstring, unit test scripts, and integration test scripts

---------

Co-authored-by: Patnala,Ankit <[email protected]>

* Introduce tuples instead for strings to avoid TypeError (ecmwf#392)

* Exclude channels from src / target (ecmwf#363)

* Exclude channels from src / target

* Simplified code and added comment that pattern matching is used

* Adding new stream config

* Fixing bug that led to error when accessing self.ds when dataset is empty

* Wokign on exlcude_source

* work in progress

* Fixing incorrect formating for logger (ecmwf#388)

* Ruffed

* Refactored and cleaned up channel selection. Also added check that channels are not empty

* Cleaned channel parsing and selection

* Adjustments

* Removing asserts incompatible with empty dataset

---------

Co-authored-by: Christian Lessig <[email protected]>

* add embed_dropout_rate to config v1 (ecmwf#358)

* [402] adds checks to the pull request (ecmwf#403)

* chanegs

* mistake

* mistake

* mistake

* changes

* doc

* Introduce masking class and incorporate in TokenizerMasking (ecmwf#383)

* creating masking class and adapting tokenizer_masking to use this class

* minor changes to masking.py and tokenizer_masking

* removed old tokenizer_masking

* include masking_strategy in default_config

* change ValueError to assert

* linting formatting changes files

* further linting of docstrings

* create mask_source and mask_target in Masker, and update tokenizer_masking to use these, then style improvements

* linted masking, tokenizer_masking

* modify masker, rng and perm_sel now part of class, remove extra masking_rate, update comments, remove archived class

* remove check if all masked, not masked

* remove self.masking_rate from MultiStreamDS class, and masking args from batchify_source

* update tokenizer utils with description of idx_ord_lens in comment

* remove masking args from batchify_, perm_sel removed now internal to Masker class, remove handling special cases of masking (all masked)

* adding masking_strategy: to config

* remove unused mentions of masking_combination

* removed comment about streams

* changed assert to check self perm_sel is not None

* ruff masking, tokenizer_masking

* Ruffed

* Added warning to capture corner case, likely due to incorrect user settings.

* Fixed incorrect call twice

* Fixed missing conditional for logger statement

* Required changes for better handling of rngs

* Improved handling of rngs

* Improved handling of rng

---------

Co-authored-by: Christian Lessig <[email protected]>

* Make the attention dtype and norm eps configurable

* Final default config

* Implement per-channel logging (ecmwf#283)

* Fix bug with seed being divided by 0 for worker ID=0

* Fix bug causing crash when secrets aren't in private config

* Implement logging losses per channel

* Fix issue with empty targets

* Rework loss logging

* ruff

* Remove computing max_channels

* Change variables names

* ruffed

* Remove redundant enumerations

* Use stages for logging

* Add type hints

* Apply the review

* ruff

* fix

* Fix type hints

* ruff

---------

Co-authored-by: Tim Hunter <[email protected]>

* [346] Passing options through the slurm script (ecmwf#400)

* changes

* fixes

* - Avoid time encoding is 0
- eps in layer norms to 10^-3
- bf16

* Make the attention dtype and norm eps configurable

* Fix gitignore and add config files

* Clean up configs for PR

* Clean up the forgotten HEAD

* Apply ruff formatting

* Organize imports

* Add mlp norm eps to embed targets and pred adapter

* Add comment

---------

Co-authored-by: Christian Lessig <[email protected]>
Co-authored-by: Julian Kuehnert <[email protected]>
Co-authored-by: Timothy Hunter <[email protected]>
Co-authored-by: ankitpatnala <[email protected]>
Co-authored-by: Patnala,Ankit <[email protected]>
Co-authored-by: Savvas Melidonis <[email protected]>
Co-authored-by: Christian Lessig <[email protected]>
Co-authored-by: Till Hauer <[email protected]>
Co-authored-by: Seb Hickman <[email protected]>
Co-authored-by: Kacper Nowak <[email protected]>
@tjhunter tjhunter mentioned this pull request Jul 7, 2025
13 tasks
kacpnowak and others added 5 commits July 7, 2025 10:50
* Fix indexing in DataReaderFesom

* Enforce using only int64 in data loading

* ruff

* ruff2

* Review

* Change int64 back to int32
analysis_streams_output is missing, which leads to error with val_initial=True and log_validation > 0.
…nt (ecmwf#444)

* Re-enabled option to runplot_training as script and removed relative path as default from mutually-exclusive argument -rf.

* Ruffed code.

* Ruff check fix.

* Rename flags for parsing configuration and fixed default handling for standard config YAML-file.
tjhunter and others added 29 commits July 14, 2025 13:49
Commiting changes before rebase
* Added naming convention checks to lint

* Implemented python naming conventions and corrected code accordingly

* Corrected renaming of rotation matrices from R to rot instead of to r

---------

Co-authored-by: Matthias Karlbauer <[email protected]>
* extend format string and timedelta to days

* replace with pd.to_timedelta

* import pandas

* ruff

* enforce "HH:MM:SS" format

* ruff
* Add score-class to evaluate-package.

* Add score-class to evaluate-package.

* Lintered and ruffed code.

* Add fix to io.py and update dependencies in common.

* Several small fixes to score-class and fast evaluation.

* Add utils for evaluate.

* Moved to_list to utils and improved doc-strings.

* Improve several doc-strings, avoid formatting of logger and other changes from PR review.

* Add xhistogram and xskillscore to dependencies of evaluate.

* Ruffed code.

* Lintered code.

* Fix incorrect retrieval of validation batch size in validation IO.

* Final minor changes to argument-names
* Updated to camel case.

* Fixed formatting.
Continue rebase before merging
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

Data loader for RADKLIM data