diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 7f133a61..567f070e 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -243,10 +243,19 @@ jobs:
os: [ 'ubuntu-latest' ]
python-version: [ "3.10", "3.11", "3.12" ]
julia: [ false ]
+ pytorch: [ false ]
+ pytest-args: [ "--ignore='docs/notebooks/extreme_value_analysis.ipynb' --ignore='docs/notebooks/lstm.ipynb'" ]
include:
- os: 'ubuntu-latest'
python-version: "3.10"
julia: true
+ pytorch: false
+ pytest-args: "--ignore='docs/notebooks/lstm.ipynb'"
+ - os: 'ubuntu-latest'
+ python-version: "3.10"
+ julia: false
+ pytorch: true
+ pytest-args: "--ignore='docs/notebooks/extreme_value_analysis.ipynb'"
defaults:
run:
shell: bash -l {0}
@@ -295,6 +304,10 @@ jobs:
if: ${{ matrix.julia == true }}
run: |
micromamba install -c conda-forge "pyjuliacall>=0.9.20"
+ - name: Install pytorch
+ if: ${{ matrix.pytorch == true }}
+ run: |
+ micromamba install -c conda-forge "pytorch>=2.10"
- name: Install xHydro
run: |
python -m pip install --no-deps .
@@ -303,14 +316,9 @@ jobs:
micromamba list
echo ESMF_VERSION=$(cat $ESMFMKFILE | grep "ESMF_VERSION_STRING=" | awk -F= '{print $2}' | tr -d "'")
python -m pip check || true
- - name: Test Notebooks (no Julia)
- if: ${{ matrix.julia == false }}
- run: |
- make test-notebooks-lax-noextremes
- - name: Test Notebooks (Julia)
- if: ${{ matrix.julia == true }}
+ - name: Test Notebooks
run: |
- make test-notebooks-lax
+ make test-notebooks-lax ARGS="${{ matrix.pytest-args }}"
# test-ESMF-source:
# name: Tests using ESMF from sources (Python${{ matrix.python-version }})
diff --git a/Makefile b/Makefile
index 440fb2cf..e42c79d7 100644
--- a/Makefile
+++ b/Makefile
@@ -75,13 +75,10 @@ test-distributed: ## run tests quickly with the default Python and distributed w
python -m pytest --num-processes=logical
test-notebooks: ## run tests on notebooks and compare outputs
- pytest --no-cov --nbval --rootdir=tests/ docs/notebooks
+ pytest --no-cov --nbval --rootdir=tests/ docs/notebooks $(ARGS)
test-notebooks-lax: ## run tests on notebooks but don't be so strict about outputs
- pytest --no-cov --nbval-lax --rootdir=tests/ docs/notebooks
-
-test-notebooks-lax-noextremes: ## run tests on notebooks but don't be so strict about outputs
- pytest --no-cov --nbval-lax --rootdir=tests/ docs/notebooks --ignore='docs/notebooks/extreme_value_analysis.ipynb'
+ pytest --no-cov --nbval-lax --rootdir=tests/ docs/notebooks $(ARGS)
test-all: ## run tests on every Python version with tox
python -m tox
diff --git a/docs/notebooks/index.rst b/docs/notebooks/index.rst
index f0f3105b..4c4eed6c 100644
--- a/docs/notebooks/index.rst
+++ b/docs/notebooks/index.rst
@@ -7,6 +7,8 @@ Usage
gis
hydrological_modelling
+ local_frequency_analysis
+ lstm
optimal_interpolation
local_frequency_analysis
regional_frequency_analysis
diff --git a/docs/notebooks/lstm.ipynb b/docs/notebooks/lstm.ipynb
new file mode 100644
index 00000000..dcba402c
--- /dev/null
+++ b/docs/notebooks/lstm.ipynb
@@ -0,0 +1,1643 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# LSTM networks module"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "LSTM networks are complex Deep Learning models that are extremely powerful at detecting patterns from various time series, and especially good at finding relationships between various timeseries. This is perfect in a hydrological modelling context, as these models can learn patterns between meteorological variables (such as precipitation, temperature and wind speed timeseries) and streamflow. Furthermore, when we also add catchment descriptors, LSTM models can learn hydrograph dynamics according to catchment attributes such as the area, slope, land-use and other such descriptors.\n",
+ "\n",
+ "This page demonstrates how to use `xhydro` to perform simple LSTM modelling on local (one) and regional (multiple) catchments. However, LSTM models are infinitely flexible and it would be a monumental task to expose all possible hyperparameters and modelling decisions through an interface. Therefore, this package can be seen as a starting point to develop custom models on custom data. Eventually, users will want to add codes, models and methods to the package. Official models from research projects could be added, and any model used to generate operational datasets could be implemented as well. For the time being, we will use extremely simple models to show how the code works, and we can then let users explore and add functionalities as needed."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "2024-03-24 19:16:03.180165: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.\n",
+ "2024-03-24 19:16:03.198144: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n",
+ "2024-03-24 19:16:03.198160: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n",
+ "2024-03-24 19:16:03.198733: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n",
+ "2024-03-24 19:16:03.202352: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n",
+ "To enable the following instructions: AVX2 AVX512F AVX512_VNNI AVX512_BF16 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n",
+ "2024-03-24 19:16:03.630814: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT\n",
+ "/home/richard/miniconda3/envs/xhydro/lib/python3.11/site-packages/clisops/core/regrid.py:42: UserWarning: xarray version >= 2023.03.0 is not supported for regridding operations with cf-time indexed arrays. Please use xarray version < 2023.03.0. For more information, see: https://github.com/pydata/xarray/issues/7794.\n",
+ " warnings.warn(\n"
+ ]
+ }
+ ],
+ "source": [
+ "import os\n",
+ "\n",
+ "import pooch\n",
+ "import tensorflow.keras.backend as K\n",
+ "import xarray as xr\n",
+ "\n",
+ "from xhydro.lstm_tools.lstm_controller import (\n",
+ " control_local_lstm_training,\n",
+ " control_regional_lstm_training,\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Example for application on a single catchment\n",
+ "\n",
+ "For this example, we will explore some existing data for a single catchment and see how we can use it for LSTM modelling. We will first get the data from the `xhydro-testdata` test data repository, and we will then process the data while explaining what is going on in the backend. This package is optimized for use with .nc data, and more precisely xarray.Dataset formats."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "
"
+ ],
+ "text/plain": [
+ "\n",
+ "Dimensions: (time: 12419, watershed: 1)\n",
+ "Coordinates:\n",
+ " * time (time) datetime64[ns] 1979-01-01 1979-01-02 ... 2012-12-31\n",
+ " * watershed (watershed) object '01BG005'\n",
+ "Data variables: (12/19)\n",
+ " t2m (time, watershed) float64 ...\n",
+ " tp (time, watershed) float64 ...\n",
+ " sd (time, watershed) float64 ...\n",
+ " sp (time, watershed) float64 ...\n",
+ " d2m (time, watershed) float64 ...\n",
+ " tcc (time, watershed) float64 ...\n",
+ " ... ...\n",
+ " latitude (watershed) float64 ...\n",
+ " forest (watershed) float64 ...\n",
+ " crops (watershed) float64 ...\n",
+ " shrubs (watershed) float64 ...\n",
+ " elevation (watershed) float64 ...\n",
+ " slope (watershed) float64 ..."
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "# Get data on the xhydro-testdata repository\n",
+ "input_data_filename_local = pooch.retrieve(\n",
+ " url=\"https://github.com/hydrologie/xhydro-testdata/raw/main/data/LSTM_data/single_watershed.nc\",\n",
+ " known_hash=\"md5:b1dfe4641321f63fb9198e9538fd679b\",\n",
+ ")\n",
+ "\n",
+ "ds_local = xr.open_dataset(input_data_filename_local)\n",
+ "display(ds_local)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We can see that there are two types of variables:variables that have a \"time\" dimension (i.e. timeseries), and variables that do not:\n",
+ "\n",
+ "1. Those that have a \"time\" dimension are considered \"dynamic features\" as they evolve in time (such as weather data).\n",
+ "2. The others that do not have a \"time\" dimension are deemed \"static features\" as they describe the catchment in a fixed manner and do not evolve in time (such as catchment area and soil properties).\n",
+ "\n",
+ "The dynamic features are instrumental in establishing relationships between observed weather and streamflow. However, static features are only used when building and training regional models such that the LSTM model can modulate flows and hydrographs as a function of catchment attributes. For a single catchment, these attributes are constant and the relationship between meteorology and hydrology do not require modulating according to catchment properties.\n",
+ "\n",
+ "
\n",
+ "IMPORTANT \n",
+ "- This file MUST contain the observed streamflow (Target variable) under the name `qobs`. \n",
+ "- This file MUST contain the drainage area of the catchment under the name `drainage_area`.\n",
+ "
\n",
+ "\n",
+ "\n",
+ "For completeness, let's take a look at the regional (i.e. multi-basin) dataset that we will use later:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "
"
+ ],
+ "text/plain": [
+ "\n",
+ "Dimensions: (time: 12419, watershed: 3)\n",
+ "Coordinates:\n",
+ " * time (time) datetime64[ns] 1979-01-01 1979-01-02 ... 2012-12-31\n",
+ " * watershed (watershed) object '01BG005' '01BG009' '02QC009'\n",
+ "Data variables: (12/19)\n",
+ " t2m (time, watershed) float64 ...\n",
+ " tp (time, watershed) float64 ...\n",
+ " sd (time, watershed) float64 ...\n",
+ " sp (time, watershed) float64 ...\n",
+ " d2m (time, watershed) float64 ...\n",
+ " tcc (time, watershed) float64 ...\n",
+ " ... ...\n",
+ " latitude (watershed) float64 ...\n",
+ " forest (watershed) float64 ...\n",
+ " crops (watershed) float64 ...\n",
+ " shrubs (watershed) float64 ...\n",
+ " elevation (watershed) float64 ...\n",
+ " slope (watershed) float64 ..."
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "# Get data from the xhydro-testdata repository\n",
+ "input_data_filename_regional = pooch.retrieve(\n",
+ " url=\"https://github.com/hydrologie/xhydro-testdata/raw/main/data/LSTM_data/multiple_watersheds.nc\",\n",
+ " known_hash=\"md5:31e1ae3ffcfd14d6a92faefd3d8bd57a\",\n",
+ ")\n",
+ "\n",
+ "ds_regional = xr.open_dataset(input_data_filename_regional)\n",
+ "display(ds_regional)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We can see that in this case, we still have some variables with (and without) the \"time\" dimension. However, we can see that there are other variables with the \"watershed\" dimension. This refers to the watershed from within the regional model.\n",
+ "\n",
+ "The LSTM network package provided in `xhydro` uses a controller function to manage data inputs and formats. These will be described below, but it is important to note that users should consider developing their own functions to process data in specific ways according to their needs. Let's explore the LSTM controller function. Note that there are two functions:\n",
+ "\n",
+ "1. One for the local catchment modelling (control_local_lstm_training)\n",
+ "2. One for the regional catchment modelling (control_regional_lstm_training)\n",
+ "\n",
+ "We first start with the local model. There are a series of inputs to provide (training data, some hyperparameters, general controls), some of which are optional. Here is a short summary of each input:\n",
+ "\n",
+ "Parameters / inputs\n",
+ "-------------------\n",
+ "* **input_data_filename** *[str]* \n",
+ "Path to the netcdf file containing the required input and target data for the LSTM. The ncfile must contain a dataset named \"qobs\" and \"drainage_area\" for the code to work, as these are required as target and scaling for training, respectively.\n",
+ "\n",
+ "* **dynamic_var_tags** *[list of str]* \n",
+ "List of dataset variables to use in the LSTM model training. Must be part of the input_data_filename dataset.\n",
+ "\n",
+ "* **qsim_pos** *[list of bool]* \n",
+ "List of same length as dynamic_var_tags. Should be set to all False EXCEPT where the dynamic_var_tags refer to flow simulations (ex: simulations from a hydrological model). Those should be set to True, because we will scale them according to the catchment size.\n",
+ "\n",
+ "* **batch_size** *[int, Optional]* \n",
+ "Number of data points to use in training. Datasets are often way too big to train in a single batch on a single GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training performance and final model skill, and should be handled accordingly. Defaults to `32` if unspecified by the user.\n",
+ "\n",
+ "* **epochs** *[int, Optional]* \n",
+ "Number of training evaluations. Larger number of epochs means more model iterations and deeper training. At some point, training will stop due to a stop in validation skill improvement. Defaults to `200` if unspecified by the user.\n",
+ "\n",
+ "* **window_size** *[int, Optional]* \n",
+ "Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to allow the model to learn from long-term weather patterns and history to predict the next day's streamflow. Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but can improve results. Defaults to `365` if unspecified by the user.\n",
+ "\n",
+ "* **train_pct** *[int, Optional]* \n",
+ "Percentage of days from the dataset to use as training. The higher, the better for model training skill, but it is important to keep a decent amount for validation and testing. Defaults to `60` if unspecified by the user.\n",
+ "\n",
+ "* **valid_pct** *[int, Optional]* \n",
+ "Percentage of days from the dataset to use as validation. The sum of train_pct and valid_pct needs to be less than 100, such that the remainder can be used for testing. A good starting value is 20%. Validation is used as the stopping criteria during training. When validation stops improving, then the model is overfitting and training is stopped. Defaults to `20` if unspecified by the user.\n",
+ "\n",
+ "* **use_cpu** *[bool, Optional]* \n",
+ "Flag to force the training and simulations to be performed on the CPU rather than on the GPU(s). Must be performed on a CPU that has AVX and AVX2 instruction sets, or tensorflow will fail. CPU training is very slow and should only be used as a last resort (such as for CI testing and debugging). Defaults to `True` if unspecified by the user, to maximize compatibility.\n",
+ "\n",
+ "* **use_parallel** *[bool, Optional]* \n",
+ "Flag to make use of multiple GPUs to accelerate training further. Models trained on multiple GPUs can have larger batch_size values as different batches can be run on different GPUs in parallel. Speedup is not linear as there is overhead related to the management of datasets, batches, the gradient merging and other steps. Still very useful and should be used when possible. Defaults to `False` if unspecified by the user.\n",
+ "\n",
+ "* **do_train** *[bool, Optional]* \n",
+ "Indicate that the code should perform the training step. This is not required as a pre-trained model could be used to perform a simulation by passing an existing model in \"name_of_saved_model\". Defaults to `True` if unspecified by the user.\n",
+ "\n",
+ "* **model_structure** *[str, Optional]* \n",
+ "This is where we can define which LSTM model structure we desire from within the LSTM_static.py function, which contains all the available model structures. Users that want to contribute new model structures should do so here. By default, we use dummy models, but we can recommend some simple starter models such as `[\"simple_local_lstm\", \"simple_regional_lstm\"]` depending on if the model is local or regional.\n",
+ "\n",
+ "* **do_simulation** *[bool, Optional]* \n",
+ "Indicate that simulations should be performed to obtain simulated streamflow and KGE metrics on the watersheds of interest, using the \"name_of_saved_model\" pre-trained model. If set to True and 'do_train' is True, then the new trained model will be used instead. Defaults to `True` if unspecified by the user.\n",
+ "\n",
+ "* **training_func** *[str, Optional]* \n",
+ "Name of the objective function used for training. For a regional model, it is highly recommended to use the scaled nse_loss variable that uses the standard deviation of streamflow as inputs. For a local model, the `kge` function is preferred. Defaults to `kge` if unspecified by the user. Can be one of `[\"kge\", \"nse_scaled\"]`.\n",
+ "\n",
+ "* **filename_base** *[str]* \n",
+ "Name of the trained model that will be trained if it does not already exist. Do not add the \".h5\" extension, it will be added automatically. Defaults to `LSTM_results` if unspecified by the user.\n",
+ "\n",
+ "* **simulation_phases** *[list of str]* \n",
+ "List of periods to generate the simulations. Can contain `['train','valid','test','full']`, corresponding to the training, validation, testing and complete periods, respectively.\n",
+ "\n",
+ "* **name_of_saved_model** *[str]* \n",
+ "Path to the model that has been pre-trained if required for simulations.\n",
+ "\n",
+ "Returns\n",
+ "-------\n",
+ "* **kge_results** *[array-like]* \n",
+ "Kling-Gupta Efficiency metric values for each of the watersheds in the input_data_filename ncfile after running in simulation mode (thus after training). Contains n_watersheds items, each containing 4 values representing the KGE values in training, validation, testing and full period, respectively. If one or more simulation phases are not requested, the items will be set to None.\n",
+ "\n",
+ "* **flow_results** *[array-like]* \n",
+ "Streamflow simulation values for each of the watersheds in the input_data_filename ncfile after running in simulation mode (thus after training). Contains n_watersheds items, each containing 4x 2D-arrays representing the observed and simulation series in training, validation, testing and full period, respectively. If one or more simulation phases are not requested, the items will be set to None.\n",
+ "\n",
+ "* **name_of_saved_model** *[str]* \n",
+ "Path to the model that has been trained, or to the pre-trained model if it already existed.\n",
+ "\n",
+ " \n",
+ "We will now define these inputs for use in the local LSTM model."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Define some of the basic required inputs.\n",
+ "\n",
+ "# The path to the input data file containing flow, weather data (dynamic features) and catchment descriptors\n",
+ "# (static features).\n",
+ "input_data_filename = input_data_filename_local\n",
+ "\n",
+ "# Names of the dynamic features to be used to train the data, from the input_data_filename dataset. We are only\n",
+ "# using two here to keep things simple and to match the LSTM model complexity.\n",
+ "dynamic_var_tags = [\"t2m\", \"tp\"]\n",
+ "\n",
+ "# Scale variable according to area. Used for simulated flow inputs. Ensure the same order as dynamic_var_tags.\n",
+ "qsim_pos = [False, False]\n",
+ "\n",
+ "# Perform the training of the model. We will select \"True\" for this first step.\n",
+ "do_train = True\n",
+ "\n",
+ "# Define the model structure. We do not want to use the dummy model setup here, so let's use a more appropriate model\n",
+ "model_structure = \"simple_local_lstm\"\n",
+ "\n",
+ "# Also perform a simulation after the model is trained. We will select \"True\" to simulate flows on the periods\n",
+ "# identified in the \"simulation_phases\" parameter.\n",
+ "do_simulation = True\n",
+ "\n",
+ "# The phases on which we want to perform the simulations. Can be a list containing any combination of\n",
+ "# ['train','valid','test','full'].\n",
+ "simulation_phases = [\"test\"]\n",
+ "\n",
+ "# Optional, and only needed if we intend to do a similation using a pre-trained LSTM model. We can then provide\n",
+ "# the name of the trained model to use for simulation.\n",
+ "name_of_saved_model = None\n",
+ "\n",
+ "# batch size used in the training - multiple of 32 is ideal for efficiciency but not critical.\n",
+ "batch_size = 256\n",
+ "\n",
+ "# Number of epoch to train the LSTM model. Keeping this very short for this example and the model will not converge.\n",
+ "epochs = 5\n",
+ "\n",
+ "# Number of time steps (days) to use in the LSTM model\n",
+ "window_size = 180\n",
+ "\n",
+ "# Percentage of days to use from the training data for training the LSTM model.\n",
+ "train_pct = 70\n",
+ "\n",
+ "# Percentage of days to use from the training data for training the LSTM model.\n",
+ "valid_pct = 15\n",
+ "\n",
+ "# Use CPU as GPU is not guaranteed to be installed with CUDA/CuDNN etc. Much slower, but more compatible.\n",
+ "use_cpu = True\n",
+ "\n",
+ "# Use multiple GPUs in parallel if available. Highly suggested for larger models, but useless if using CPU.\n",
+ "use_parallel = False\n",
+ "\n",
+ "# The objective function to use for training. Since we are using a local model, use the KGE directly\n",
+ "training_func = \"kge\"\n",
+ "\n",
+ "# The name of the output files to be generated after simulations\n",
+ "filename_base = \"./LSTM_results_local_\""
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ " We can now run the model and get results:\n",
+ "\n",
+ "
\n",
+ " WARNING: \n",
+ "There is a high likelihood that the model will not converge and return nan values for the loss and val_loss metrics. If this happens, the model with restart, and do so until the model converges (or run indefinitely). When it does converge, there is a good chance that the results will be poor. This is because this test example has too few data to properly converge, to keep these examples small and easy to run.\n",
+ "
\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "2024-03-24 19:14:23.502074: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:274] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "USING CPU\n",
+ "Epoch 1/5\n",
+ "33/33 [==============================] - ETA: 0s - loss: 2.1811\n",
+ "Epoch 1: val_loss improved from inf to 1.16132, saving model to /tmp/tmpsr1uyp54/LSTM_results_local_.h5\n",
+ "33/33 [==============================] - 5s 120ms/step - loss: 2.1811 - val_loss: 1.1613 - lr: 5.0000e-04\n",
+ "Epoch 2/5\n",
+ "33/33 [==============================] - ETA: 0s - loss: 1.0752\n",
+ "Epoch 2: val_loss improved from 1.16132 to 1.05163, saving model to /tmp/tmpsr1uyp54/LSTM_results_local_.h5\n",
+ "33/33 [==============================] - 4s 107ms/step - loss: 1.0752 - val_loss: 1.0516 - lr: 5.0000e-04\n",
+ "Epoch 3/5\n",
+ "33/33 [==============================] - ETA: 0s - loss: 1.0359\n",
+ "Epoch 3: val_loss did not improve from 1.05163\n",
+ "33/33 [==============================] - 4s 106ms/step - loss: 1.0359 - val_loss: 1.1292 - lr: 5.0000e-04\n",
+ "Epoch 4/5\n",
+ "33/33 [==============================] - ETA: 0s - loss: 1.0389\n",
+ "Epoch 4: val_loss improved from 1.05163 to 1.03159, saving model to /tmp/tmpsr1uyp54/LSTM_results_local_.h5\n",
+ "33/33 [==============================] - 3s 106ms/step - loss: 1.0389 - val_loss: 1.0316 - lr: 5.0000e-04\n",
+ "Epoch 5/5\n",
+ "33/33 [==============================] - ETA: 0s - loss: 1.0211\n",
+ "Epoch 5: val_loss improved from 1.03159 to 1.02549, saving model to /tmp/tmpsr1uyp54/LSTM_results_local_.h5\n",
+ "33/33 [==============================] - 3s 105ms/step - loss: 1.0211 - val_loss: 1.0255 - lr: 5.0000e-04\n",
+ "7/7 [==============================] - 1s 37ms/step\n"
+ ]
+ }
+ ],
+ "source": [
+ "K.clear_session()\n",
+ "\n",
+ "KGE_results, flow_results, name_of_saved_model = control_local_lstm_training(\n",
+ " input_data_filename=input_data_filename,\n",
+ " dynamic_var_tags=dynamic_var_tags,\n",
+ " qsim_pos=qsim_pos,\n",
+ " batch_size=batch_size,\n",
+ " epochs=epochs,\n",
+ " window_size=window_size,\n",
+ " train_pct=train_pct,\n",
+ " valid_pct=valid_pct,\n",
+ " use_cpu=use_cpu,\n",
+ " use_parallel=use_parallel,\n",
+ " do_train=do_train,\n",
+ " model_structure=model_structure,\n",
+ " do_simulation=do_simulation,\n",
+ " training_func=training_func,\n",
+ " filename_base=filename_base,\n",
+ " simulation_phases=simulation_phases,\n",
+ " name_of_saved_model=name_of_saved_model,\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We can now explore the results. A quick reminder: The KGE_results and flow_results are lists of size 4, reflecting results for the training, validation, testing and full periods, respectively. Since we only asked for the \"test\" simulation to be performed, we will only receive results in the 3rd element in the list:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "The KGE results are:\n"
+ ]
+ },
+ {
+ "data": {
+ "text/plain": [
+ "[None, None, -0.4235879677783505, None]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "\n",
+ "The simulated flows are:\n"
+ ]
+ },
+ {
+ "data": {
+ "text/plain": [
+ "[None,\n",
+ " None,\n",
+ " array([[218. , 191. , 180. , ..., 12.1 ,\n",
+ " 12.6 , 12.7 ],\n",
+ " [ 0.22918376, 0.22747886, 0.22446105, ..., 0. ,\n",
+ " 0. , 0. ]], dtype=float32),\n",
+ " None]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "\n",
+ "The trained model has been saved at the following location:\n"
+ ]
+ },
+ {
+ "data": {
+ "text/plain": [
+ "'/tmp/tmpsr1uyp54/LSTM_results_local_.h5'"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "# Show the KGE results\n",
+ "print(\"The KGE results are:\")\n",
+ "display(KGE_results)\n",
+ "\n",
+ "print(\"\\nThe simulated flows are:\")\n",
+ "display(flow_results)\n",
+ "\n",
+ "print(\"\\nThe trained model has been saved at the following location:\")\n",
+ "display(name_of_saved_model)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "
Now that the model has been trained, we can use the controller to perform a simulation by changing just a few elements to the inputs:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "2024-03-24 19:15:30.041494: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:274] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "33/33 [==============================] - 2s 37ms/step\n",
+ "8/8 [==============================] - 1s 35ms/step\n",
+ "7/7 [==============================] - 1s 38ms/step\n",
+ "48/48 [==============================] - 2s 37ms/step\n",
+ "The KGE results are:\n"
+ ]
+ },
+ {
+ "data": {
+ "text/plain": [
+ "[-0.4208242018323325,\n",
+ " -0.4286247953083231,\n",
+ " -0.4235879677783505,\n",
+ " -0.4227186052082743]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "\n",
+ "The simulated flows are:\n"
+ ]
+ },
+ {
+ "data": {
+ "text/plain": [
+ "[array([[2.7400000e+01, 2.6600000e+01, 2.5500000e+01, ..., 1.0600000e+02,\n",
+ " 1.2000000e+02, 1.0600000e+02],\n",
+ " [3.7390493e-02, 4.1873619e-02, 4.4257998e-02, ..., 2.1161069e-01,\n",
+ " 1.9300836e-01, 1.7358573e-01]], dtype=float32),\n",
+ " array([[12.6 , 12.3 , 12. , ..., 57.3 ,\n",
+ " 51.7 , 47.3 ],\n",
+ " [ 0. , 0. , 0. , ..., 0.11138578,\n",
+ " 0.0875258 , 0.05921394]], dtype=float32),\n",
+ " array([[218. , 191. , 180. , ..., 12.1 ,\n",
+ " 12.6 , 12.7 ],\n",
+ " [ 0.22918376, 0.22747886, 0.22446105, ..., 0. ,\n",
+ " 0. , 0. ]], dtype=float32),\n",
+ " array([[27.4 , 26.6 , 25.5 , ..., 12.6 ,\n",
+ " 12.7 , 12.3 ],\n",
+ " [ 0.03739049, 0.04187362, 0.044258 , ..., 0. ,\n",
+ " 0. , 0. ]], dtype=float32)]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "# Define the variables that need to be changed:\n",
+ "\n",
+ "# Perform the training of the model. We will select \"False\" now that we have a saved trained model\n",
+ "do_train = False\n",
+ "\n",
+ "# The phases on which we want to perform the simulations. Can be a list containing any combination of\n",
+ "# ['train','valid','test','full'].\n",
+ "simulation_phases = [\"train\", \"valid\", \"test\", \"full\"]\n",
+ "\n",
+ "# The name of the saved model from the previous training.\n",
+ "name_of_saved_model = name_of_saved_model\n",
+ "\n",
+ "# And run the model\n",
+ "KGE_results_sim, flow_results_sim, _ = control_local_lstm_training(\n",
+ " input_data_filename=input_data_filename,\n",
+ " dynamic_var_tags=dynamic_var_tags,\n",
+ " qsim_pos=qsim_pos,\n",
+ " batch_size=batch_size,\n",
+ " epochs=epochs,\n",
+ " window_size=window_size,\n",
+ " train_pct=train_pct,\n",
+ " valid_pct=valid_pct,\n",
+ " use_cpu=use_cpu,\n",
+ " use_parallel=use_parallel,\n",
+ " do_train=do_train,\n",
+ " model_structure=model_structure,\n",
+ " do_simulation=do_simulation,\n",
+ " training_func=training_func,\n",
+ " filename_base=filename_base,\n",
+ " simulation_phases=simulation_phases,\n",
+ " name_of_saved_model=name_of_saved_model,\n",
+ ")\n",
+ "\n",
+ "print(\"The KGE results are:\")\n",
+ "display(KGE_results_sim)\n",
+ "\n",
+ "print(\"\\nThe simulated flows are:\")\n",
+ "display(flow_results_sim)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Regional model\n",
+ "\n",
+ "We can now apply the exact same concept to the regional model dataset that contained multiple watersheds. We will first define the input parameters just like for the local model:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Define some of the basic required inputs.\n",
+ "\n",
+ "# The path to the input data file containing flow, weather data (dynamic features) and catchment descriptors\n",
+ "# (static features).\n",
+ "input_data_filename = input_data_filename_regional\n",
+ "\n",
+ "# Names of the dynamic features to be used to train the data, from the input_data_filename dataset. We are only\n",
+ "# using two here to keep things simple and to match the LSTM model complexity.\n",
+ "dynamic_var_tags = [\"t2m\", \"tp\"]\n",
+ "\n",
+ "# Scale variable according to area. Used for simulated flow inputs. Ensure the same order as dynamic_var_tags.\n",
+ "qsim_pos = [False, False]\n",
+ "\n",
+ "# Names of the static features to be used from the input dataset. These are used to regionalize flows between catchments\n",
+ "# based on their attributes. Here we only use 3 to keep things simple. They will not be useful as our dataset only\n",
+ "# contains 3 watersheds as well, so there is not enough data to learn the relationships correctly.\n",
+ "static_var_tags = [\"drainage_area\", \"forest\", \"slope\"]\n",
+ "\n",
+ "# Perform the training of the model. We will select \"True\" for this first step.\n",
+ "do_train = True\n",
+ "\n",
+ "# Define the model structure. We do not want to use the dummy model setup here, so let's use a more appropriate model\n",
+ "model_structure = \"simple_regional_lstm\"\n",
+ "\n",
+ "# Also perform a simulation after the model is trained. We will select \"True\" to simulate flows on the periods\n",
+ "# identified in the \"simulation_phases\" parameter.\n",
+ "do_simulation = True\n",
+ "\n",
+ "# The phases on which we want to perform the simulations. Can be a list containing any combination of\n",
+ "# ['train','valid','test','full'].\n",
+ "simulation_phases = [\"test\"]\n",
+ "\n",
+ "# Optional, and only needed if we intend to do a similation using a pre-trained LSTM model. We can then provide\n",
+ "# the name of the trained model to use for simulation.\n",
+ "name_of_saved_model = None\n",
+ "\n",
+ "# batch size used in the training - multiple of 32 is ideal for efficiciency but not critical.\n",
+ "batch_size = 256\n",
+ "\n",
+ "# Number of epoch to train the LSTM model. Keeping this very short for this example and the model will not converge.\n",
+ "epochs = 5\n",
+ "\n",
+ "# Number of time steps (days) to use in the LSTM model\n",
+ "window_size = 180\n",
+ "\n",
+ "# Percentage of days to use from the training data for training the LSTM model.\n",
+ "train_pct = 70\n",
+ "\n",
+ "# Percentage of days to use from the training data for training the LSTM model.\n",
+ "valid_pct = 15\n",
+ "\n",
+ "# Use CPU as GPU is not guaranteed to be installed with CUDA/CuDNN etc. Much slower, but more compatible.\n",
+ "use_cpu = True\n",
+ "\n",
+ "# Use multiple GPUs in parallel if available. Highly suggested for larger models, but useless if using CPU.\n",
+ "use_parallel = False\n",
+ "\n",
+ "# The objective function to use for training. Since we are using a local model, use the KGE directly\n",
+ "training_func = \"nse_scaled\"\n",
+ "\n",
+ "# The name of the output files to be generated after simulations\n",
+ "filename_base = \"./LSTM_results_regional_\""
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "And now we run the model for training:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Currently working on watershed no: 0\n",
+ "Currently working on watershed no: 0\n"
+ ]
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "2024-03-24 19:16:17.447702: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:274] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "USING SINGLE GPU\n",
+ "Epoch 1/5\n",
+ "33/33 [==============================] - ETA: 0s - loss: 1.0928\n",
+ "Epoch 1: val_loss improved from inf to 0.80151, saving model to /tmp/tmpnel6rkaw/LSTM_results_regional_.h5\n",
+ "33/33 [==============================] - 16s 426ms/step - loss: 1.0928 - val_loss: 0.8015 - lr: 5.0000e-04\n",
+ "Epoch 2/5\n",
+ "33/33 [==============================] - ETA: 0s - loss: 0.6787\n",
+ "Epoch 2: val_loss improved from 0.80151 to 0.52227, saving model to /tmp/tmpnel6rkaw/LSTM_results_regional_.h5\n",
+ "33/33 [==============================] - 14s 413ms/step - loss: 0.6787 - val_loss: 0.5223 - lr: 5.0000e-04\n",
+ "Epoch 3/5\n",
+ "33/33 [==============================] - ETA: 0s - loss: 0.3533\n",
+ "Epoch 3: val_loss improved from 0.52227 to 0.39809, saving model to /tmp/tmpnel6rkaw/LSTM_results_regional_.h5\n",
+ "33/33 [==============================] - 14s 419ms/step - loss: 0.3533 - val_loss: 0.3981 - lr: 5.0000e-04\n",
+ "Epoch 4/5\n",
+ "33/33 [==============================] - ETA: 0s - loss: 0.2680\n",
+ "Epoch 4: val_loss improved from 0.39809 to 0.34954, saving model to /tmp/tmpnel6rkaw/LSTM_results_regional_.h5\n",
+ "33/33 [==============================] - 14s 415ms/step - loss: 0.2680 - val_loss: 0.3495 - lr: 5.0000e-04\n",
+ "Epoch 5/5\n",
+ "33/33 [==============================] - ETA: 0s - loss: 0.2507\n",
+ "Epoch 5: val_loss improved from 0.34954 to 0.33105, saving model to /tmp/tmpnel6rkaw/LSTM_results_regional_.h5\n",
+ "33/33 [==============================] - 14s 434ms/step - loss: 0.2507 - val_loss: 0.3310 - lr: 5.0000e-04\n",
+ "Currently working on watershed no: 0\n",
+ "7/7 [==============================] - 2s 175ms/step\n"
+ ]
+ }
+ ],
+ "source": [
+ "K.clear_session()\n",
+ "\n",
+ "KGE_results, flow_results, name_of_saved_model = control_regional_lstm_training(\n",
+ " input_data_filename=input_data_filename,\n",
+ " dynamic_var_tags=dynamic_var_tags,\n",
+ " qsim_pos=qsim_pos,\n",
+ " static_var_tags=static_var_tags,\n",
+ " batch_size=batch_size,\n",
+ " epochs=epochs,\n",
+ " window_size=window_size,\n",
+ " train_pct=train_pct,\n",
+ " valid_pct=valid_pct,\n",
+ " use_cpu=use_cpu,\n",
+ " use_parallel=use_parallel,\n",
+ " do_train=do_train,\n",
+ " model_structure=model_structure,\n",
+ " do_simulation=do_simulation,\n",
+ " training_func=training_func,\n",
+ " filename_base=filename_base,\n",
+ " simulation_phases=simulation_phases,\n",
+ " name_of_saved_model=name_of_saved_model,\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Similarly to the previous model, we can now get the overall results"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "The KGE results are:\n"
+ ]
+ },
+ {
+ "data": {
+ "text/plain": [
+ "[[None, None, 0.733690030240023, None]]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "\n",
+ "The simulated flows are:\n"
+ ]
+ },
+ {
+ "data": {
+ "text/plain": [
+ "[[None,\n",
+ " None,\n",
+ " array([[217.99998446, 190.99999739, 179.9999865 , ..., 12.09999971,\n",
+ " 12.6000001 , 12.6999995 ],\n",
+ " [166.93844604, 166.9499054 , 165.28643799, ..., 14.92283154,\n",
+ " 15.75016975, 14.90410519]]),\n",
+ " None]]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "\n",
+ "The trained model has been saved at the following location:\n"
+ ]
+ },
+ {
+ "data": {
+ "text/plain": [
+ "'/tmp/tmpnel6rkaw/LSTM_results_regional_.h5'"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "# Show the KGE results\n",
+ "print(\"The KGE results are:\")\n",
+ "display(KGE_results)\n",
+ "\n",
+ "print(\"\\nThe simulated flows are:\")\n",
+ "display(flow_results)\n",
+ "\n",
+ "print(\"\\nThe trained model has been saved at the following location:\")\n",
+ "display(name_of_saved_model)"
+ ]
+ }
+ ],
+ "metadata": {
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.11.7"
+ },
+ "vscode": {
+ "interpreter": {
+ "hash": "e28391989cdb8b31df72dd917935faad186df3329a743c813090fc6af977a1ca"
+ }
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 4
+}
diff --git a/environment-dev.yml b/environment-dev.yml
index 38433d36..e521491b 100644
--- a/environment-dev.yml
+++ b/environment-dev.yml
@@ -18,6 +18,7 @@ dependencies:
- pandas >=2.2
- planetary-computer
- pooch >=1.8.0
+ - pydantic >=2.0,<2.5.3 # FIXME: Remove pin once our dependencies (xclim, xscen) support pydantic 2.5.3
- pystac <1.12.0 # Temporary pin until support can be added for pystac 1.13.0
- pystac-client
- pyyaml >=6.0.2
@@ -37,6 +38,8 @@ dependencies:
# Julia
# - julia >=1.6.0 # Works locally, breaks remotely
# - pyjuliacall >=0.9.20
+ # LSTM
+ # - tensorflow >=2.10
# Dev tools and testing
- pip >=25.0
- black ==25.1.0
diff --git a/environment.yml b/environment.yml
index 7d0bb620..84d15a64 100644
--- a/environment.yml
+++ b/environment.yml
@@ -37,3 +37,5 @@ dependencies:
- xvec
# Julia
# - pyjuliacall >=0.9.20
+ # LSTM
+ # - tensorflow >=2.10
diff --git a/pyproject.toml b/pyproject.toml
index 67c5719b..f9ae480c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -108,7 +108,10 @@ docs = [
julia = [
"juliacall >=0.9.20"
]
-all = ["xhydro[dev]", "xhydro[docs]", "xhydro[julia]"]
+lstm = [
+ "tensorflow>=2.10"
+]
+all = ["xhydro[dev]", "xhydro[docs]", "xhydro[julia]", "xhydro[lstm]"]
[project.urls]
"Homepage" = "https://xhydro.readthedocs.io/"
@@ -165,7 +168,7 @@ values = [
]
[tool.codespell]
-ignore-words-list = "ans,astroid,nd,parametre,projet,socio-economic"
+ignore-words-list = "ans,astroid,nd,parametre,projet,socio-economic,statics"
skip = "*.po"
[tool.coverage.paths]
diff --git a/src/xhydro/lstm_tools/__init__.py b/src/xhydro/lstm_tools/__init__.py
new file mode 100644
index 00000000..ca012db5
--- /dev/null
+++ b/src/xhydro/lstm_tools/__init__.py
@@ -0,0 +1,4 @@
+"""LSTM module imports."""
+
+from .lstm_controller import *
+from .lstm_functions import *
diff --git a/src/xhydro/lstm_tools/create_datasets.py b/src/xhydro/lstm_tools/create_datasets.py
new file mode 100644
index 00000000..1eb248d8
--- /dev/null
+++ b/src/xhydro/lstm_tools/create_datasets.py
@@ -0,0 +1,486 @@
+"""Tools to create the datasets to be used in LSTM model training and simulation."""
+
+import os
+from pathlib import Path
+
+import numpy as np
+import xarray as xr
+
+__all__ = [
+ "create_dataset_flexible",
+ "create_dataset_flexible_local",
+ "create_lstm_dataset",
+ "create_lstm_dataset_local",
+ "remove_nans_func",
+ "remove_nans_func_local",
+]
+
+
+def create_dataset_flexible(
+ filename: str | os.PathLike,
+ dynamic_var_tags: list,
+ qsim_pos: list,
+ static_var_tags: list,
+):
+ """Prepare the arrays of dynamic, static and observed flow variables.
+
+ A few things are absolutely required:
+ 1. a "watershed" coordinate that contains the ID of watersheds, such that we can preallocate the size of
+ the matrices.
+ 2. a "qobs" variable that contains observed flows for the catchments.
+ 3. The size of the catchments in the "drainage_area" variable. This is used to compute scaled streamflow values
+ for regionalization applications.
+
+ Parameters
+ ----------
+ filename : str or os.Pathlike
+ Path to the netcdf file containing the required input and target data for the LSTM. The ncfile must contain a
+ dataset named "qobs" and "drainage_area" for the code to work, as these are required as target and scaling for
+ training, respectively.
+ dynamic_var_tags : list of str
+ List of dataset variables to use in the LSTM model training. Must be part of the input_data_filename ncfile.
+ qsim_pos : list of bool
+ List of same length as dynamic_var_tags. Should be set to all False EXCEPT where the dynamic_var_tags refer to
+ flow simulations (ex: simulations from a hydrological model such as HYDROTEL). Those should be set to True.
+ static_var_tags : list of str
+ List of the catchment descriptor names in the input_data_filename ncfile. They need to be present in the ncfile
+ and will be used as inputs to the regional model, to help the flow regionalization process.
+
+ Returns
+ -------
+ arr_dynamic : np.ndarray
+ Tensor of size [watersheds x timestep x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training. The first element in axis=2 is the observed flow.
+ arr_static : np.ndarray
+ Tensor of size [watersheds x n_static_variables] that contains the static (i.e. catchment descriptors) variables
+ that will be used during training.
+ q_stds : np.ndarray
+ Tensor of size [watersheds] that contains the standard deviation of scaled streamflow values for the catchment
+ associated to the data in arr_dynamic and arr_static.
+ """
+ # Open the dataset file
+ ds = xr.open_dataset(Path(filename))
+
+ # Number of watersheds in the dataset
+ n_watersheds = len(ds.watershed)
+
+ # Number of days for each dynamic variable.
+ n_days = len(ds.time)
+
+ # Perform the analysis for qobs first.
+ arr_qobs = ds.qobs.values.astype(np.float32)
+ if ds.qobs.dims[0] == "time":
+ arr_qobs = arr_qobs.T
+ arr_qobs = arr_qobs / ds.drainage_area.values[:, np.newaxis] * 86.4
+
+ # Prepare the dynamic data array and set qobs as the first value
+ arr_dynamic = np.empty(
+ shape=[n_watersheds, n_days, len(dynamic_var_tags) + 1], dtype=np.float32
+ )
+ arr_dynamic[:] = np.nan
+ arr_dynamic[:, :, 0] = arr_qobs
+
+ for i in range(len(dynamic_var_tags)):
+ # Read the data and put in a tmp var
+ tmp = ds[dynamic_var_tags[i]].values.astype(np.float32)
+ if ds.qobs.dims[0] == "time":
+ tmp = tmp.T
+ # If the variable must be scaled, do it
+ if qsim_pos[i]:
+ tmp = tmp / ds.drainage_area.values[:, np.newaxis] * 86.4
+
+ # Set the data in the main dataset
+ arr_dynamic[:, :, i + 1] = tmp
+
+ # Prepare the static dataset
+ arr_static = np.empty([n_watersheds, len(static_var_tags)], dtype=np.float32)
+
+ # Loop the
+ for i in range(len(static_var_tags)):
+ arr_static[:, i] = ds[static_var_tags[i]].values
+
+ return arr_dynamic, arr_static, arr_qobs
+
+
+def create_dataset_flexible_local(
+ filename: str | os.PathLike,
+ dynamic_var_tags: list,
+ qsim_pos: list,
+):
+ """Prepare the arrays of dynamic and observed flow variables.
+
+ A few things are absolutely required:
+ 1. a "watershed" variable that contains the ID of watersheds, such that
+ we can preallocate the size of the matrices.
+ 2. a "qobs" variable that contains observed flows for the catchments.
+
+ Parameters
+ ----------
+ filename : str or os.PathLike
+ Path to the netcdf file containing the required input and target data for the LSTM. The ncfile must contain a
+ dataset named "qobs" and "drainage_area" for the code to work, as these are required as target and scaling for
+ training, respectively.
+ dynamic_var_tags : list of str
+ List of dataset variables to use in the LSTM model training. Must be part of the input_data_filename ncfile.
+ qsim_pos : list of bool
+ List of same length as dynamic_var_tags. Should be set to all False EXCEPT where the dynamic_var_tags refer to
+ flow simulations (ex: simulations from a hydrological model such as HYDROTEL). Those should be set to True.
+
+ Returns
+ -------
+ arr_dynamic : np.ndarray
+ Tensor of size [watersheds x timestep x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training. The first element in axis=1 is the observed flow.
+ arr_qobs : np.ndarray
+ Array containing the observed flow vector.
+ """
+ # Open the dataset file
+ ds = xr.open_dataset(Path(filename))
+
+ # Number of days for each dynamic variables
+ # of watersheds.
+ n_days = len(ds.time)
+
+ # Perform the analysis for qobs first.
+ arr_qobs = ds.qobs.values.astype(np.float32)
+
+ # Prepare the dynamic data array and set qobs as the first value
+ arr_dynamic = np.empty(shape=[n_days, len(dynamic_var_tags) + 1], dtype=np.float32)
+ arr_dynamic[:] = np.nan
+ arr_dynamic[:, 0] = np.squeeze(arr_qobs)
+
+ for i in range(len(dynamic_var_tags)):
+ # Read the data and put in a tmp var
+ tmp = ds[dynamic_var_tags[i]].values.astype(np.float32)
+
+ # If the variable must be scaled, do it
+ if qsim_pos[i]:
+ tmp = tmp / ds.drainage_area.values * 86.4
+
+ # Set the data in the main dataset
+ arr_dynamic[:, i + 1] = np.squeeze(tmp)
+
+ return arr_dynamic, np.squeeze(arr_qobs)
+
+
+def create_lstm_dataset(
+ arr_dynamic: np.ndarray,
+ arr_static: np.ndarray,
+ q_stds: np.ndarray,
+ window_size: int,
+ watershed_list: list,
+ idx: np.ndarray,
+ remove_nans: bool = True,
+):
+ """Create the LSTM dataset and shape the data using look-back windows and preparing all data for training.
+
+ Parameters
+ ----------
+ arr_dynamic : np.ndarray
+ Tensor of size [watersheds x timestep x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training. The first element in axis=2 is the observed flow.
+ arr_static : np.ndarray
+ Tensor of size [watersheds x n_static_variables] that contains the static (i.e. catchment descriptors) variables
+ that will be used during training.
+ q_stds : np.ndarray
+ Tensor of size [watersheds] that contains the standard deviation of scaled streamflow values for the catchment
+ associated to the data in arr_dynamic and arr_static.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ watershed_list : list
+ The total number of watersheds that will be used for training and simulation. Corresponds to the watershed in
+ the input file, i.e. in the arr_dynamic array axis 0.
+ idx : np.ndarray
+ 2-element array of indices of the beginning and end of the desired period for which the LSTM model should be
+ simulated.
+ remove_nans : bool
+ Flag indicating that the NaN values associated to the observed streamflow should be removed. Required for
+ training but can be kept to False for simulation to ensure simulation on the entire period.
+
+ Returns
+ -------
+ x : np.ndarray
+ Tensor of size [(timesteps * watersheds) x window_size x n_dynamic_variables] that contains the dynamic (i.e.
+ timeseries) variables that will be used during training.
+ x_static : np.ndarray
+ Tensor of size [(timesteps * watersheds) x n_static_variables] that contains the static (i.e. catchment
+ descriptors) variables that will be used during training.
+ x_q_stds : np.ndarray
+ Tensor of size [(timesteps * watersheds)] that contains the standard deviation of scaled streamflow values for
+ the catchment associated to the data in x and x_static. Each data point could come from any catchment and this
+ q_std variable helps scale the objective function.
+ y : np.ndarray
+ Tensor of size [(timesteps * watersheds)] containing the target variable for the same time point as in x,
+ x_static and x_q_stds. Usually the observed streamflow for the day associated to each of the training points.
+ """
+ ndata = arr_dynamic.shape[2] - 1
+ x = np.empty((0, window_size, ndata))
+ x_static = np.empty((0, arr_static.shape[1]))
+ x_q_stds = np.empty(0)
+ y = np.empty(0)
+
+ for w in watershed_list:
+ idx_w = idx[w]
+ print("Currently working on watershed no: " + str(w))
+ x_w, x_w_static, x_w_q_std, y_w = _extract_watershed_block(
+ arr_w_dynamic=arr_dynamic[w, idx_w[0] : idx_w[1], :],
+ arr_w_static=arr_static[w, :],
+ q_std_w=q_stds[w],
+ window_size=window_size,
+ )
+
+ # remove nans
+ if remove_nans:
+ y_w, x_w, x_w_q_std, x_w_static = remove_nans_func(
+ y_w, x_w, x_w_q_std, x_w_static
+ )
+
+ x = np.vstack([x, x_w])
+ x_static = np.vstack([x_static, x_w_static])
+ x_q_stds = np.hstack([x_q_stds, x_w_q_std])
+ y = np.hstack([y, y_w])
+
+ return x, x_static, x_q_stds, y
+
+
+def create_lstm_dataset_local(
+ arr_dynamic: np.ndarray,
+ window_size: int,
+ idx: np.ndarray,
+ remove_nans: bool = True,
+):
+ """Create the local LSTM dataset and shape the data using look-back windows and preparing all data for training.
+
+ Parameters
+ ----------
+ arr_dynamic : np.ndarray
+ Tensor of size [watersheds x timestep x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training. The first element in axis=2 is the observed flow.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ idx : np.ndarray
+ 2-element array of indices of the beginning and end of the desired period for which the LSTM model should be
+ simulated.
+ remove_nans : bool
+ Flag indicating that the NaN values associated to the observed streamflow should be removed. Required for
+ training but can be kept to False for simulation to ensure simulation on the entire period.
+
+ Returns
+ -------
+ x : np.ndarray
+ Tensor of size [timesteps x window_size x n_dynamic_variables] that contains the dynamic (i.e. timeseries)
+ variables that will be used during training.
+ y : np.ndarray
+ Tensor of size [timesteps] containing the target variable for the same time point as in x,
+ x_static and x_q_stds. Usually the observed streamflow for the day associated to each of the training points.
+ """
+ x, y = _extract_watershed_block_local(
+ arr_dynamic=arr_dynamic[idx[0] : idx[1], :],
+ window_size=window_size,
+ )
+
+ # Remove nans
+ if remove_nans:
+ y, x = remove_nans_func_local(y, x)
+
+ return x, y
+
+
+def _extract_windows_vectorized(
+ array: np.ndarray,
+ sub_window_size: int,
+):
+ """Create the array where each day contains data from a previous period (look-back period).
+
+ Parameters
+ ----------
+ array : np.ndarray
+ The array of dynamic variables for a single catchment.
+ sub_window_size : int
+ Size of the look-back window.
+
+ Returns
+ -------
+ data_array
+ Array of dynamic data processed into a 3D tensor for LSTM model training.
+ """
+ max_time = array.shape[0]
+
+ # expand_dims are used to convert a 1D array to 2D array.
+ sub_windows = (
+ np.expand_dims(np.arange(sub_window_size), 0)
+ + np.expand_dims(np.arange(max_time - sub_window_size), 0).T
+ )
+ data_array = array[sub_windows]
+
+ return data_array
+
+
+def _extract_watershed_block(
+ arr_w_dynamic: np.ndarray,
+ arr_w_static: np.ndarray,
+ q_std_w: np.ndarray,
+ window_size: int,
+):
+ """Extract all series of the desired window length over all features for a given watershed.
+
+ Create the LSTM tensor format of data from the regular input arrays. Both dynamic and static variables are
+ extracted.
+
+ Parameters
+ ----------
+ arr_w_dynamic : np.ndarray
+ Tensor of size [timestep x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training for the current catchment. The first element in axis=1 is the
+ observed flow.
+ arr_w_static : np.ndarray
+ Tensor of size [n_static_variables] that contains the static (i.e. catchment descriptors) variables
+ that will be used during training for the current catchment.
+ q_std_w : np.ndarray
+ Tensor of size [1] that contains the standard deviation of scaled streamflow values for the catchment
+ associated to the current catchment.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+
+ Returns
+ -------
+ x_w : np.ndarray
+ Tensor of size [timesteps x window_size x n_dynamic_variables] that contains the dynamic (i.e. time-series)
+ variables that will be used during training for a single processed catchment.
+ x_w_static : np.ndarray
+ Tensor of size [timesteps x n_static_variables] that contains the static (i.e. catchment descriptors) variables
+ that will be used during training for a single processed catchment.
+ x_w_q_stds : np.ndarray
+ Tensor of size [timesteps] that contains the standard deviation of scaled streamflow values for the catchment
+ associated to the data in x and x_static for a single processed catchment.
+ y_w : np.ndarray
+ Tensor of size [timesteps] containing the target variable for the same time point as in x_w, x_w_static and
+ x_w_q_stds. Usually the observed streamflow for the day associated to each of the training points for the
+ currently processed catchment.
+ """
+ # Extract all series of the desired window length for all features
+ x_w = _extract_windows_vectorized(array=arr_w_dynamic, sub_window_size=window_size)
+
+ x_w_static = np.repeat(arr_w_static.reshape(-1, 1), x_w.shape[0], axis=1).T
+
+ x_w_q_std = np.squeeze(np.repeat(q_std_w.reshape(-1, 1), x_w.shape[0], axis=1).T)
+
+ # Get the last value of qobs from each series for the prediction
+ y_w = x_w[:, -1, 0]
+
+ # Remove qobs from the features
+ x_w = np.delete(x_w, 0, axis=2)
+
+ return x_w, x_w_static, x_w_q_std, y_w
+
+
+def _extract_watershed_block_local(arr_dynamic: np.ndarray, window_size: int):
+ """Extract all series of the desired window length over all features for a given watershed.
+
+ Create the LSTM tensor format of data from the regular input arrays. Both dynamic and static variables are
+ extracted.
+
+ Parameters
+ ----------
+ arr_dynamic : np.ndarray
+ Tensor of size [timestep x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training for the current catchment. The first element in axis=1 is the
+ observed flow.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+
+ Returns
+ -------
+ x : np.ndarray
+ Tensor of size [timesteps x window_size x n_dynamic_variables] that contains the dynamic (i.e. time-series)
+ variables that will be used during training for a the catchment.
+ y : np.ndarray
+ Tensor of size [timesteps] containing the target variable for the same time point as in 'x'. Usually the
+ observed streamflow for the day associated to each of the training points.
+ """
+ # Extract all series of the desired window length for all features
+ x = _extract_windows_vectorized(array=arr_dynamic, sub_window_size=window_size)
+
+ # Get the last value of qobs from each series for the prediction
+ y = x[:, -1, 0]
+
+ # Remove qobs from the features
+ x = np.delete(x, 0, axis=2)
+
+ return x, y
+
+
+def remove_nans_func(
+ y: np.ndarray, x: np.ndarray, x_q_std: np.ndarray, x_static: np.ndarray
+):
+ """Check for nans in the variable "y" and remove all lines containing those nans in all datasets.
+
+ Parameters
+ ----------
+ y : np.ndarray
+ Array of target variables for training, that might contain NaNs.
+ x : np.ndarray
+ Array of dynamic variables for LSTM model training and simulation.
+ x_q_std : np.ndarray
+ Array of observed streamflow standard deviations for catchments in regional LSTM models.
+ x_static : np.ndarray
+ Array of static variables for LSTM model training and simulation, specifically for regional LSTM models.
+
+ Returns
+ -------
+ y : np.ndarray
+ Array of target variables for training, with all NaNs removed.
+ x : np.ndarray
+ Array of dynamic variables for LSTM model training and simulation, with values associated to NaN "y" values
+ removed.
+ x_q_std : np.ndarray
+ Array of observed streamflow standard deviations for catchments in regional LSTM models, with values associated
+ to NaN "y" values removed.
+ x_static : np.ndarray
+ Array of static variables for LSTM model training and simulation, specifically for regional LSTM models, with
+ values associated to NaN "y" values removed.
+ """
+ ind_nan = np.isnan(y)
+ y = y[~ind_nan]
+ x = x[~ind_nan, :, :]
+ x_q_stds = x_q_std[~ind_nan]
+ x_static = x_static[~ind_nan, :]
+
+ return y, x, x_q_stds, x_static
+
+
+def remove_nans_func_local(y: np.ndarray, x: np.ndarray):
+ """Check for nans in the variable "y" and remove all lines containing those nans in all datasets.
+
+ Parameters
+ ----------
+ y : np.ndarray
+ Array of target variables for training, that might contain NaNs.
+ x : np.ndarray
+ Array of dynamic variables for LSTM model training and simulation.
+
+ Returns
+ -------
+ y : np.ndarray
+ Array of target variables for training, with all NaNs removed.
+ x : np.ndarray
+ Array of dynamic variables for LSTM model training and simulation, with values associated to NaN "y" values
+ removed.
+ """
+ ind_nan = np.isnan(y)
+ y = y[~ind_nan]
+ x = x[~ind_nan, :, :]
+
+ return y, x
diff --git a/src/xhydro/lstm_tools/lstm_controller.py b/src/xhydro/lstm_tools/lstm_controller.py
new file mode 100644
index 00000000..56abc762
--- /dev/null
+++ b/src/xhydro/lstm_tools/lstm_controller.py
@@ -0,0 +1,398 @@
+"""Control the LSTM training and simulation tools to make clean workflows."""
+
+import os
+import tempfile
+from pathlib import Path
+
+from xhydro.lstm_tools.lstm_functions import (
+ perform_initial_train,
+ perform_initial_train_local,
+ run_model_after_training,
+ run_model_after_training_local,
+ scale_dataset,
+ scale_dataset_local,
+ split_dataset,
+ split_dataset_local,
+)
+
+
+def control_regional_lstm_training(
+ input_data_filename: str,
+ dynamic_var_tags: list,
+ qsim_pos: list,
+ static_var_tags: list,
+ batch_size: int = 32,
+ epochs: int = 200,
+ window_size: int = 365,
+ train_pct: int = 60,
+ valid_pct: int = 20,
+ use_cpu: bool = True,
+ use_parallel: bool = False,
+ do_train: bool = True,
+ model_structure: str = "dummy_regional_lstm",
+ do_simulation: bool = True,
+ training_func: str = "nse_scaled",
+ filename_base: str = "LSTM_results",
+ simulation_phases: list[str] | None = None,
+ name_of_saved_model: str | None = None,
+) -> tuple[list | None, list | None, str]:
+ """Control the regional LSTM model training and simulation.
+
+ Parameters
+ ----------
+ input_data_filename : str
+ Path to the netcdf file containing the required input and target data for the LSTM. The ncfile must contain a
+ dataset named "qobs" and "drainage_area" for the code to work, as these are required as target and scaling for
+ training, respectively.
+ dynamic_var_tags : list of str
+ List of dataset variables to use in the LSTM model training. Must be part of the input_data_filename ncfile.
+ qsim_pos : list of bool
+ List of same length as dynamic_var_tags. Should be set to all False EXCEPT where the dynamic_var_tags refer to
+ flow simulations (ex: simulations from a hydrological model such as HYDROTEL). Those should be set to True.
+ static_var_tags : list of str
+ List of the catchment descriptor names in the input_data_filename ncfile. They need to be present in the ncfile
+ and will be used as inputs to the regional model, to help the flow regionalization process.
+ batch_size : int
+ Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
+ GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
+ performance and final model skill, and should be handled accordingly.
+ epochs : int
+ Number of training evaluations. Larger number of epochs means more model iterations and deeper training. At some
+ point, training will stop due to a stop in validation skill improvement.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ train_pct : int
+ Percentage of days from the dataset to use as training. The higher, the better for model training skill, but it
+ is important to keep a decent amount for validation and testing.
+ valid_pct : int
+ Percentage of days from the dataset to use as validation. The sum of train_pct and valid_pct needs to be less
+ than 100, such that the remainder can be used for testing. A good starting value is 20%. Validation is used as
+ the stopping criteria during training. When validation stops improving, then the model is overfitting and
+ training is stopped.
+ use_cpu : bool
+ Flag to force the training and simulations to be performed on the CPU rather than on the GPU(s). Must be
+ performed on a CPU that has AVX and AVX2 instruction sets, or tensorflow will fail. CPU training is very slow
+ and should only be used as a last resort (such as for CI testing and debugging).
+ use_parallel : bool
+ Flag to make use of multiple GPUs to accelerate training further. Models trained on multiple GPUs can have
+ larger batch_size values as different batches can be run on different GPUs in parallel. Speedup is not linear as
+ there is overhead related to the management of datasets, batches, the gradient merging and other steps. Still
+ very useful and should be used when possible.
+ do_train : bool
+ Indicate that the code should perform the training step. This is not required as a pre-trained model could be
+ used to perform a simulation by passing an existing model in "name_of_saved_model".
+ model_structure : str
+ The version of the LSTM model that we want to use to apply to our data. Must be the name of a function that
+ exists in lstm_static.py.
+ do_simulation : bool
+ Indicate that simulations should be performed to obtain simulated streamflow and KGE metrics on the watersheds
+ of interest, using the "name_of_saved_model" pre-trained model. If set to True and 'do_train' is True, then the
+ new trained model will be used instead.
+ training_func : str
+ For a regional model, it is highly recommended to use the scaled nse_loss variable that uses the standard
+ deviation of streamflow as inputs. For a local model, the "kge" function is preferred. Defaults to "nse_scaled"
+ if unspecified by the user. Can be one of ["kge", "nse_scaled"].
+ filename_base : str
+ Name of the trained model that will be trained if it does not already exist. Do not add the ".h5" extension, it
+ will be added automatically.
+ simulation_phases : list of str, optional
+ List of periods to generate the simulations. Can contain ['train', 'valid', 'test', 'full'], corresponding to
+ the training, validation, testing and complete periods, respectively.
+ name_of_saved_model : str, optional
+ Path to the model that has been pre-trained if required for simulations.
+
+ Returns
+ -------
+ kge_results : array-like
+ Kling-Gupta Efficiency metric values for each of the watersheds in the input_data_filename ncfile after running
+ in simulation mode (thus after training). Contains n_watersheds items, each containing 4 values representing the
+ KGE values in training, validation, testing and full period, respectively. If one or more simulation phases are
+ not requested, the items will be set to None.
+ flow_results : array-like
+ Streamflow simulation values for each of the watersheds in the input_data_filename ncfile after running
+ in simulation mode (thus after training). Contains n_watersheds items, each containing 4x 2D-arrays representing
+ the observed and simulation series in training, validation, testing and full period, respectively. If one or
+ more simulation phases are not requested, the items will be set to None.
+ name_of_saved_model : str
+ Path to the model that has been trained, or to the pre-trained model if it already existed.
+ """
+ if simulation_phases is None:
+ simulation_phases = ["train", "valid", "test", "full"]
+
+ # If we want to use CPU only, deactivate GPU for memory allocation. The order of operations MUST be preserved.
+ if use_cpu:
+ os.environ["CUDA_VISIBLE_DEVICES"] = ""
+ import tensorflow as tf
+
+ tf.get_logger().setLevel("INFO")
+
+ if name_of_saved_model is None:
+ if not do_train:
+ raise ValueError(
+ "Model training is set to False and no existing model is provided. Please provide a "
+ 'trained model or set "do_train" to True.'
+ )
+ tmpdir = tempfile.mkdtemp()
+ # This needs to be a string for the ModelCheckpoint Callback to work. a PosixPath fails.
+ name_of_saved_model = str(Path(tmpdir) / f"{filename_base}.h5")
+
+ # Import and scale dataset
+ (
+ watershed_areas,
+ watersheds_ind,
+ arr_dynamic,
+ arr_static,
+ q_stds,
+ train_idx,
+ valid_idx,
+ test_idx,
+ all_idx,
+ ) = scale_dataset(
+ input_data_filename,
+ dynamic_var_tags,
+ qsim_pos,
+ static_var_tags,
+ train_pct,
+ valid_pct,
+ )
+
+ if do_train:
+ # Split into train and valid
+ (
+ x_train,
+ x_train_static,
+ x_train_q_stds,
+ y_train,
+ x_valid,
+ x_valid_static,
+ x_valid_q_stds,
+ y_valid,
+ ) = split_dataset(
+ arr_dynamic,
+ arr_static,
+ q_stds,
+ watersheds_ind,
+ train_idx,
+ window_size,
+ valid_idx,
+ )
+
+ # Do the main large-scale training
+ perform_initial_train(
+ model_structure,
+ use_parallel,
+ window_size,
+ batch_size,
+ epochs,
+ x_train,
+ x_train_static,
+ x_train_q_stds,
+ y_train,
+ x_valid,
+ x_valid_static,
+ x_valid_q_stds,
+ y_valid,
+ name_of_saved_model,
+ training_func=training_func,
+ use_cpu=use_cpu,
+ )
+
+ if do_simulation:
+ # Do the model simulation on all watersheds after training
+ kge_results = [None] * len(watersheds_ind)
+ flow_results = [None] * len(watersheds_ind)
+
+ for w in watersheds_ind:
+ kge_results[w], flow_results[w] = run_model_after_training(
+ w,
+ arr_dynamic,
+ arr_static,
+ q_stds,
+ window_size,
+ train_idx,
+ batch_size,
+ watershed_areas,
+ name_of_saved_model,
+ valid_idx,
+ test_idx,
+ all_idx,
+ simulation_phases,
+ )
+
+ return kge_results, flow_results, name_of_saved_model
+
+ else:
+ return None, None, name_of_saved_model
+
+
+def control_local_lstm_training(
+ input_data_filename: str,
+ dynamic_var_tags: list,
+ qsim_pos: list,
+ batch_size: int = 32,
+ epochs: int = 200,
+ window_size: int = 365,
+ train_pct: int = 60,
+ valid_pct: int = 20,
+ use_cpu: bool = True,
+ use_parallel: bool = False,
+ do_train: bool = True,
+ model_structure: str = "dummy_local_lstm",
+ do_simulation: bool = True,
+ training_func: str = "kge",
+ filename_base: str = "LSTM_results",
+ simulation_phases: list[str] | None = None,
+ name_of_saved_model: str | None = None,
+):
+ """Control the regional LSTM model training and simulation.
+
+ Parameters
+ ----------
+ input_data_filename : str
+ Path to the netcdf file containing the required input and target data for the LSTM. The ncfile must contain a
+ dataset named "qobs" and "drainage_area" for the code to work, as these are required as target and scaling for
+ training, respectively.
+ dynamic_var_tags : list of str
+ List of dataset variables to use in the LSTM model training. Must be part of the input_data_filename ncfile.
+ qsim_pos : list of bool
+ List of same length as dynamic_var_tags. Should be set to all False EXCEPT where the dynamic_var_tags refer to
+ flow simulations (ex: simulations from a hydrological model such as HYDROTEL). Those should be set to True.
+ batch_size : int
+ Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
+ GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
+ performance and final model skill, and should be handled accordingly.
+ epochs : int
+ Number of training evaluations. Larger number of epochs means more model iterations and deeper training. At some
+ point, training will stop due to a stop in validation skill improvement.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ train_pct : int
+ Percentage of days from the dataset to use as training. The higher, the better for model training skill, but it
+ is important to keep a decent amount for validation and testing.
+ valid_pct : int
+ Percentage of days from the dataset to use as validation. The sum of train_pct and valid_pct needs to be less
+ than 100, such that the remainder can be used for testing. A good starting value is 20%. Validation is used as
+ the stopping criteria during training. When validation stops improving, then the model is overfitting and
+ training is stopped.
+ use_cpu : bool
+ Flag to force the training and simulations to be performed on the CPU rather than on the GPU(s). Must be
+ performed on a CPU that has AVX and AVX2 instruction sets, or tensorflow will fail. CPU training is very slow
+ and should only be used as a last resort (such as for CI testing and debugging).
+ use_parallel : bool
+ Flag to make use of multiple GPUs to accelerate training further. Models trained on multiple GPUs can have
+ larger batch_size values as different batches can be run on different GPUs in parallel. Speedup is not linear as
+ there is overhead related to the management of datasets, batches, the gradient merging and other steps. Still
+ very useful and should be used when possible.
+ do_train : bool
+ Indicate that the code should perform the training step. This is not required as a pre-trained model could be
+ used to perform a simulation by passing an existing model in "name_of_saved_model".
+ model_structure : str
+ The version of the LSTM model that we want to use to apply to our data. Must be the name of a function that
+ exists in lstm_static.py.
+ do_simulation : bool
+ Indicate that simulations should be performed to obtain simulated streamflow and KGE metrics on the watersheds
+ of interest, using the "name_of_saved_model" pre-trained model. If set to True and 'do_train' is True, then the
+ new trained model will be used instead.
+ training_func : str
+ For a regional model, it is highly recommended to use the scaled nse_loss variable that uses the standard
+ deviation of streamflow as inputs. For a local model, the "kge" function is preferred. Defaults to "kge" if
+ unspecified by the user. Can be one of ["kge", "nse_scaled"].
+ filename_base : str
+ Name of the trained model that will be trained if it does not already exist. Do not add the ".h5" extension, it
+ will be added automatically.
+ simulation_phases : list of str, optional
+ List of periods to generate the simulations. Can contain ['train', 'valid', 'test', 'full'], corresponding to
+ the training, validation, testing and complete periods, respectively.
+ name_of_saved_model : str, optional
+ Path to the model that has been pre-trained if required for simulations.
+
+ Returns
+ -------
+ kge_results : array-like
+ Kling-Gupta Efficiency metric values for each of the watersheds in the input_data_filename ncfile after running
+ in simulation mode (thus after training). Contains n_watersheds items, each containing 4 values representing the
+ KGE values in training, validation, testing and full period, respectively. If one or more simulation phases are
+ not requested, the items will be set to None.
+ flow_results : array-like
+ Streamflow simulation values for each of the watersheds in the input_data_filename ncfile after running
+ in simulation mode (thus after training). Contains n_watersheds items, each containing 4x 2D-arrays representing
+ the observed and simulation series in training, validation, testing and full period, respectively. If one or
+ more simulation phases are not requested, the items will be set to None.
+ name_of_saved_model : str
+ Path to the model that has been trained, or to the pre-trained model if it already existed.
+ """
+ if simulation_phases is None:
+ simulation_phases = ["train", "valid", "test", "full"]
+
+ # If we want to use CPU only, deactivate GPU for memory allocation. The order of operations MUST be preserved.
+ if use_cpu:
+ os.environ["CUDA_VISIBLE_DEVICES"] = ""
+ import tensorflow as tf
+
+ tf.get_logger().setLevel("INFO")
+
+ if name_of_saved_model is None:
+ if not do_train:
+ raise ValueError(
+ "Model training is set to False and no existing model is provided. Please provide a "
+ 'trained model or set "do_train" to True.'
+ )
+ tmpdir = tempfile.mkdtemp()
+ # This needs to be a string for the ModelCheckpoint Callback to work. a PosixPath fails.
+ name_of_saved_model = str(Path(tmpdir) / f"{filename_base}.h5")
+
+ # Import and scale dataset
+ arr_dynamic, train_idx, valid_idx, test_idx, all_idx = scale_dataset_local(
+ input_data_filename,
+ dynamic_var_tags,
+ qsim_pos,
+ train_pct,
+ valid_pct,
+ )
+
+ if do_train:
+ # Split into train and valid
+ x_train, y_train, x_valid, y_valid = split_dataset_local(
+ arr_dynamic, train_idx, window_size, valid_idx
+ )
+
+ # Do the main large-scale training
+ perform_initial_train_local(
+ model_structure,
+ use_parallel,
+ window_size,
+ batch_size,
+ epochs,
+ x_train,
+ y_train,
+ x_valid,
+ y_valid,
+ name_of_saved_model,
+ training_func=training_func,
+ use_cpu=use_cpu,
+ )
+
+ if do_simulation:
+ # Do the model simulation on all watersheds after training
+ kge_results, flow_results = run_model_after_training_local(
+ arr_dynamic,
+ window_size,
+ train_idx,
+ batch_size,
+ name_of_saved_model,
+ valid_idx,
+ test_idx,
+ all_idx,
+ simulation_phases,
+ )
+
+ return kge_results, flow_results, name_of_saved_model
+
+ else:
+ return None, None, name_of_saved_model
diff --git a/src/xhydro/lstm_tools/lstm_functions.py b/src/xhydro/lstm_tools/lstm_functions.py
new file mode 100644
index 00000000..370f2360
--- /dev/null
+++ b/src/xhydro/lstm_tools/lstm_functions.py
@@ -0,0 +1,964 @@
+"""Collection of functions required to process LSTM models and their required data."""
+
+import numpy as np
+import tensorflow as tf
+import tensorflow.keras.backend as k
+from sklearn.preprocessing import MinMaxScaler, StandardScaler
+
+# TODO: Cannot use star imports, pre-commit screams.
+from xhydro.lstm_tools.create_datasets import (
+ create_dataset_flexible,
+ create_dataset_flexible_local,
+ create_lstm_dataset,
+ create_lstm_dataset_local,
+ remove_nans_func,
+ remove_nans_func_local,
+)
+from xhydro.lstm_tools.lstm_static import (
+ TrainingGenerator,
+ TrainingGeneratorLocal,
+ get_list_of_LSTM_models,
+ run_trained_model,
+ run_trained_model_local,
+)
+
+__all__ = [
+ "perform_initial_train",
+ "perform_initial_train_local",
+ "run_model_after_training",
+ "run_model_after_training_local",
+ "scale_dataset",
+ "scale_dataset_local",
+ "split_dataset",
+ "split_dataset_local",
+]
+
+
+def scale_dataset(
+ input_data_filename: str,
+ dynamic_var_tags: list,
+ qsim_pos: list,
+ static_var_tags: list,
+ train_pct: int,
+ valid_pct: int,
+):
+ """Scale the datasets using training data to normalize all inputs, ensuring weighting is unbiased.
+
+ Parameters
+ ----------
+ input_data_filename : str
+ Path to the netcdf file containing the required input and target data for the LSTM. The ncfile must contain a
+ dataset named "qobs" and "drainage_area" for the code to work, as these are required as target and scaling for
+ training, respectively.
+ dynamic_var_tags : list of str
+ List of dataset variables to use in the LSTM model training. Must be part of the input_data_filename ncfile.
+ qsim_pos : list of bool
+ List of same length as dynamic_var_tags. Should be set to all False EXCEPT where the dynamic_var_tags refer to
+ flow simulations (ex: simulations from a hydrological model such as HYDROTEL). Those should be set to True.
+ static_var_tags : list of str
+ List of the catchment descriptor names in the input_data_filename ncfile. They need to be present in the ncfile
+ and will be used as inputs to the regional model, to help the flow regionalization process.
+ train_pct : int
+ Percentage of days from the dataset to use as training. The higher, the better for model training skill, but it
+ is important to keep a decent amount for validation and testing.
+ valid_pct : int
+ Percentage of days from the dataset to use as validation. The sum of train_pct and valid_pct needs to be less
+ than 100, such that the remainder can be used for testing. A good starting value is 20%. Validation is used as
+ the stopping criteria during training. When validation stops improving, then the model is overfitting and
+ training is stopped.
+
+ Returns
+ -------
+ watershed_areas : np.ndarray
+ Area of the watershed, in square kilometers, as taken from the training dataset initial input ncfile.
+ watersheds_ind : np.ndarray
+ List of watershed indices to use during training.
+ arr_dynamic : np.ndarray
+ Tensor of size [time_steps x window_size x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training. The first element in axis=2 is the observed flow.
+ arr_static : np.array
+ Tensor of size [time_steps x n_static_variables] that contains the static (i.e. catchment descriptors) variables
+ that will be used during training.
+ q_stds : np.ndarray
+ Tensor of size [time_steps] that contains the standard deviation of scaled streamflow values for the catchment
+ associated to the data in arr_dynamic and arr_static.
+ train_idx : np.ndarray
+ Indices of the training period from the complete period. Contains 2 values per watershed: start and end indices.
+ valid_idx : np.ndarray
+ Indices of the validation period from the complete period. Contains 2 values per watershed: start and end
+ indices.
+ test_idx : np.ndarray
+ Indices of the testing period from the complete period. Contains 2 values per watershed: start and end indices.
+ all_idx : np.ndarray
+ Indices of the full period. Contains 2 values per watershed: start and end indices.
+ """
+ # Create the dataset
+ arr_dynamic, arr_static, arr_qobs = create_dataset_flexible(
+ input_data_filename, dynamic_var_tags, qsim_pos, static_var_tags
+ )
+
+ # TODO : This should be a user-selected option
+ # Filter catchments with too many NaNs.
+ # Find which catchments have less than 10 years of data
+ # (20% of 10 = 2 years for valid/testing) and delete them
+ for i in reversed(range(0, arr_qobs.shape[0])):
+ if np.count_nonzero(~np.isnan(arr_qobs[i, :])) < 10 * 365:
+ arr_dynamic = np.delete(arr_dynamic, i, 0)
+ arr_static = np.delete(arr_static, i, 0)
+ arr_qobs = np.delete(arr_qobs, i, 0)
+
+ # Get the indexes of the train, test and valid periods of each catchment.
+ train_idx = np.empty([arr_dynamic.shape[0], 2], dtype=int)
+ valid_idx = np.empty([arr_dynamic.shape[0], 2], dtype=int)
+ test_idx = np.empty([arr_dynamic.shape[0], 2], dtype=int)
+ all_idx = np.empty([arr_dynamic.shape[0], 2], dtype=int)
+
+ for i in range(0, arr_dynamic.shape[0]):
+ jj = np.argwhere(~np.isnan(arr_qobs[i, :]))
+ total_number_days = np.shape(jj)[0]
+ number_training_days = int(np.floor(total_number_days * train_pct / 100))
+ number_valid_days = int(np.floor(total_number_days * valid_pct / 100))
+
+ train_idx[i, 0] = int(jj[0]) # From this index...
+ train_idx[i, 1] = int(jj[number_training_days]) # to this index.
+ valid_idx[i, 0] = int(jj[number_training_days]) # From this index...
+ valid_idx[i, 1] = int(
+ jj[number_training_days + number_valid_days]
+ ) # to this index.
+ test_idx[i, 0] = int(
+ jj[number_training_days + number_valid_days]
+ ) # From this index...
+ test_idx[i, 1] = int(jj[total_number_days - 1]) # to this index.
+ all_idx[i, 0] = 0
+ all_idx[i, 1] = int(jj[total_number_days - 1])
+
+ # Get watershed numbers list
+ watersheds_ind = np.arange(0, arr_dynamic.shape[0])
+
+ # Get catchment areas, required for testing later to transform flows back into m3/s
+ watershed_areas = arr_static[:, 0]
+
+ # Get the standard deviation of the streamflow for all watersheds
+ q_stds = np.nanstd(arr_qobs, axis=1)
+
+ # Scale the dynamic feature variables (not the target variable)
+ scaler_dynamic = StandardScaler() # Use standardization by mean and std
+
+ # Prepare data for scaling. Must rebuild a vector from the training data length instead of constants here due to
+ # differing lengths of data per catchment. See Matlab code in this folder to see how it was performed.
+ n_data = arr_dynamic.shape[2] - 1
+ dynamic_data = np.empty((0, n_data))
+
+ for tmp in range(0, arr_dynamic.shape[0]):
+ dynamic_data = np.vstack(
+ [dynamic_data, arr_dynamic[tmp, train_idx[tmp, 0] : train_idx[tmp, 1], 1:]]
+ )
+
+ # Fit the scaler using only the training watersheds
+ _ = scaler_dynamic.fit_transform(dynamic_data)
+
+ # Scale all watersheds data
+ for w in watersheds_ind:
+ arr_dynamic[w, :, 1:] = scaler_dynamic.transform(arr_dynamic[w, :, 1:])
+
+ # Scale the static feature variables
+ scaler_static = MinMaxScaler() # Use normalization between 0 and 1
+ _ = scaler_static.fit_transform(arr_static)
+ arr_static = scaler_static.transform(arr_static)
+
+ return (
+ watershed_areas,
+ watersheds_ind,
+ arr_dynamic,
+ arr_static,
+ q_stds,
+ train_idx,
+ valid_idx,
+ test_idx,
+ all_idx,
+ )
+
+
+def scale_dataset_local(
+ input_data_filename: str,
+ dynamic_var_tags: list,
+ qsim_pos: list,
+ train_pct: int,
+ valid_pct: int,
+):
+ """Scale the datasets using training data to normalize all inputs, ensuring weighting is unbiased.
+
+ Parameters
+ ----------
+ input_data_filename : str
+ Path to the netcdf file containing the required input and target data for the LSTM. The ncfile must contain a
+ dataset named "qobs" and "drainage_area" for the code to work, as these are required as target and scaling for
+ training, respectively.
+ dynamic_var_tags : list of str
+ List of dataset variables to use in the LSTM model training. Must be part of the input_data_filename ncfile.
+ qsim_pos : list of bool
+ List of same length as dynamic_var_tags. Should be set to all False EXCEPT where the dynamic_var_tags refer to
+ flow simulations (ex: simulations from a hydrological model such as HYDROTEL). Those should be set to True.
+ train_pct : int
+ Percentage of days from the dataset to use as training. The higher, the better for model training skill, but it
+ is important to keep a decent amount for validation and testing.
+ valid_pct : int
+ Percentage of days from the dataset to use as validation. The sum of train_pct and valid_pct needs to be less
+ than 100, such that the remainder can be used for testing. A good starting value is 20%. Validation is used as
+ the stopping criteria during training. When validation stops improving, then the model is overfitting and
+ training is stopped.
+
+ Returns
+ -------
+ watershed_areas : np.ndarray
+ Area of the watershed, in square kilometers, as taken from the training dataset initial input ncfile.
+ watersheds_ind : np.ndarray
+ List of watershed indices to use during training.
+ arr_dynamic : np.ndarray
+ Tensor of size [time_steps x window_size x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training. The first element in axis=2 is the observed flow.
+ train_idx : np.ndarray
+ Indices of the training period from the complete period. Contains 2 values per watershed: start and end indices.
+ valid_idx : np.ndarray
+ Indices of the validation period from the complete period. Contains 2 values per watershed: start and end
+ indices.
+ test_idx : np.ndarray
+ Indices of the testing period from the complete period. Contains 2 values per watershed: start and end indices.
+ all_idx : np.ndarray
+ Indices of the full period. Contains 2 values per watershed: start and end indices.
+ """
+ # Create the dataset
+ arr_dynamic, arr_qobs = create_dataset_flexible_local(
+ input_data_filename, dynamic_var_tags, qsim_pos
+ )
+
+ # Get the indexes of the train, test and valid periods of each catchment.
+ train_idx = np.empty([2], dtype=int)
+ valid_idx = np.empty([2], dtype=int)
+ test_idx = np.empty([2], dtype=int)
+ all_idx = np.empty([2], dtype=int)
+
+ # Count how may non-nan qobs data exists.
+ jj = np.argwhere(~np.isnan(arr_qobs))
+ total_number_days = np.shape(jj)[0]
+ number_training_days = int(np.floor(total_number_days * train_pct / 100))
+ number_valid_days = int(np.floor(total_number_days * valid_pct / 100))
+
+ train_idx[0] = int(jj[0]) # From this index...
+ train_idx[1] = int(jj[number_training_days]) # to this index.
+ valid_idx[0] = int(jj[number_training_days]) # From this index...
+ valid_idx[1] = int(jj[number_training_days + number_valid_days]) # to this index.
+ test_idx[0] = int(
+ jj[number_training_days + number_valid_days]
+ ) # From this index...
+ test_idx[1] = int(jj[total_number_days - 1]) # to this index.
+ all_idx[0] = 0
+ all_idx[1] = arr_qobs.shape[0]
+
+ dynamic_data = arr_dynamic[train_idx[0] : train_idx[1], 1:]
+
+ # Fit the scaler using only the training watersheds
+ scaler_dynamic = StandardScaler() # Use standardization by mean and std
+ _ = scaler_dynamic.fit_transform(dynamic_data)
+
+ # Scale all watersheds data
+ arr_dynamic[:, 1:] = scaler_dynamic.transform(arr_dynamic[:, 1:])
+
+ return (
+ arr_dynamic,
+ train_idx,
+ valid_idx,
+ test_idx,
+ all_idx,
+ )
+
+
+def split_dataset(
+ arr_dynamic: np.array,
+ arr_static: np.array,
+ q_stds: np.array,
+ watersheds_ind: np.array,
+ train_idx: np.array,
+ window_size: int,
+ valid_idx: np.array,
+):
+ """Extract only the required data from the entire dataset according to the desired period.
+
+ Parameters
+ ----------
+ arr_dynamic : np.ndarray
+ Tensor of size [time_steps x window_size x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training. The first element in axis=2 is the observed flow.
+ arr_static : np.ndarray
+ Tensor of size [time_steps x n_static_variables] that contains the static (i.e. catchment descriptors) variables
+ that will be used during training.
+ q_stds : np.ndarray
+ Tensor of size [time_steps] that contains the standard deviation of scaled streamflow values for the catchment
+ associated to the data in arr_dynamic and arr_static.
+ watersheds_ind : np.ndarray
+ List of watershed indices to use during training.
+ train_idx : np.ndarray
+ Indices of the training period from the complete period. Contains 2 values per watershed: start and end indices.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ valid_idx : np.ndarray
+ Indices of the validation period from the complete period. Contains 2 values per watershed: start and end
+ indices.
+
+ Returns
+ -------
+ x_train : np.ndarray
+ Tensor of size [(timesteps * watersheds) x window_size x n_dynamic_variables] that contains the dynamic (i.e.
+ timeseries) variables that will be used during training.
+ x_train_static : np.ndarray
+ Tensor of size [(timesteps * watersheds) x n_static_variables] that contains the static (i.e. catchment
+ descriptors) variables that will be used during training.
+ x_train_q_stds : np.ndarray
+ Tensor of size [(timesteps * watersheds)] that contains the standard deviation of scaled streamflow values for
+ the catchment associated to the data in x_train and x_train_static. Each data point could come from any
+ catchment and this x_train_q_std variable helps scale the objective function.
+ y_train : np.ndarray
+ Tensor of size [(timesteps * watersheds)] containing the target variable for the same time point as in x_train,
+ x_train_static and x_train_q_stds. Usually the observed streamflow for the day associated to each of the
+ training points.
+ x_valid : np.ndarray
+ Tensor of size [(timesteps * watersheds) x window_size x n_dynamic_variables] that contains the dynamic (i.e.
+ timeseries) variables that will be used during validation.
+ x_valid_static : np.ndarray
+ Tensor of size [(timesteps * watersheds) x n_static_variables] that contains the static (i.e. catchment
+ descriptors) variables that will be used during validation.
+ x_valid_q_stds : np.ndarray
+ Tensor of size [(timesteps * watersheds)] that contains the standard deviation of scaled streamflow values for
+ the catchment associated to the data in x_valid and x_valid_static. Each data point could come from any
+ catchment and this x_valid_q_std variable helps scale the objective function for the validation points.
+ y_valid : np.ndarray
+ Tensor of size [(timesteps * watersheds)] containing the target variable for the same time point as in x_valid,
+ x_valid_static and x_valid_q_stds. Usually the observed streamflow for the day associated to each of the
+ validation points.
+ """
+ # Training dataset
+ x_train, x_train_static, x_train_q_stds, y_train = create_lstm_dataset(
+ arr_dynamic=arr_dynamic,
+ arr_static=arr_static,
+ q_stds=q_stds,
+ window_size=window_size,
+ watershed_list=watersheds_ind,
+ idx=train_idx,
+ )
+
+ # Remove Nans
+ y_train, x_train, x_train_q_stds, x_train_static = remove_nans_func(
+ y_train, x_train, x_train_q_stds, x_train_static
+ )
+
+ # Validation dataset
+ x_valid, x_valid_static, x_valid_q_stds, y_valid = create_lstm_dataset(
+ arr_dynamic=arr_dynamic,
+ arr_static=arr_static,
+ q_stds=q_stds,
+ window_size=window_size,
+ watershed_list=watersheds_ind,
+ idx=valid_idx,
+ )
+
+ # Remove nans
+ y_valid, x_valid, x_valid_q_stds, x_valid_static = remove_nans_func(
+ y_valid, x_valid, x_valid_q_stds, x_valid_static
+ )
+
+ return (
+ x_train,
+ x_train_static,
+ x_train_q_stds,
+ y_train,
+ x_valid,
+ x_valid_static,
+ x_valid_q_stds,
+ y_valid,
+ )
+
+
+def split_dataset_local(
+ arr_dynamic: np.ndarray,
+ train_idx: np.ndarray,
+ window_size: int,
+ valid_idx: np.ndarray,
+):
+ """Extract only the required data from the entire dataset according to the desired period.
+
+ Parameters
+ ----------
+ arr_dynamic : np.ndarray
+ Tensor of size [time_steps x window_size x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training. The first element in axis=2 is the observed flow.
+ train_idx : np.ndarray
+ Indices of the training period from the complete period. Contains 2 values per watershed: start and end indices.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ valid_idx : np.ndarray
+ Indices of the validation period from the complete period. Contains 2 values per watershed: start and end
+ indices.
+
+ Returns
+ -------
+ x_train : np.ndarray
+ Tensor of size [(timesteps * watersheds) x window_size x n_dynamic_variables] that contains the dynamic (i.e.
+ timeseries) variables that will be used during training.
+ y_train : np.ndarray
+ Tensor of size [(timesteps * watersheds)] containing the target variable for the same time point as in x_train,
+ x_train_static and x_train_q_stds. Usually the observed streamflow for the day associated to each of the
+ training points.
+ x_valid : np.ndarray
+ Tensor of size [(timesteps * watersheds) x window_size x n_dynamic_variables] that contains the dynamic (i.e.
+ timeseries) variables that will be used during validation.
+ y_valid : np.ndarray
+ Tensor of size [(timesteps * watersheds)] containing the target variable for the same time point as in x_valid,
+ x_valid_static and x_valid_q_stds. Usually the observed streamflow for the day associated to each of the
+ validation points.
+ """
+ # Training dataset
+ x_train, y_train = create_lstm_dataset_local(
+ arr_dynamic=arr_dynamic, window_size=window_size, idx=train_idx
+ )
+
+ # Remove nans
+ y_train, x_train = remove_nans_func_local(y_train, x_train)
+
+ # Validation dataset
+ x_valid, y_valid = create_lstm_dataset_local(
+ arr_dynamic=arr_dynamic, window_size=window_size, idx=valid_idx
+ )
+
+ # Remove nans
+ y_valid, x_valid = remove_nans_func_local(y_valid, x_valid)
+
+ return x_train, y_train, x_valid, y_valid
+
+
+def perform_initial_train(
+ model_structure: str,
+ use_parallel: bool,
+ window_size: int,
+ batch_size: int,
+ epochs: int,
+ x_train: np.ndarray,
+ x_train_static: np.ndarray,
+ x_train_q_stds: np.ndarray,
+ y_train: np.ndarray,
+ x_valid: np.ndarray,
+ x_valid_static: np.ndarray,
+ x_valid_q_stds: np.ndarray,
+ y_valid: np.ndarray,
+ name_of_saved_model: str,
+ training_func: str,
+ use_cpu: bool = False,
+):
+ """Train the LSTM model using preprocessed data.
+
+ Parameters
+ ----------
+ model_structure : str
+ The version of the LSTM model that we want to use to apply to our data. Must be the name of a function that
+ exists in lstm_static.py.
+ use_parallel : bool
+ Flag to make use of multiple GPUs to accelerate training further. Models trained on multiple GPUs can have
+ larger batch_size values as different batches can be run on different GPUs in parallel. Speedup is not linear as
+ there is overhead related to the management of datasets, batches, the gradient merging and other steps. Still
+ very useful and should be used when possible.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ batch_size : int
+ Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
+ GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
+ performance and final model skill, and should be handled accordingly.
+ epochs : int
+ Number of training evaluations. Larger number of epochs means more model iterations and deeper training. At some
+ point, training will stop due to a stop in validation skill improvement.
+ x_train : np.ndarray
+ Tensor of size [(timesteps * watersheds) x window_size x n_dynamic_variables] that contains the dynamic (i.e.
+ timeseries) variables that will be used during training.
+ x_train_static : np.ndarray
+ Tensor of size [(timesteps * watersheds) x n_static_variables] that contains the static (i.e. catchment
+ descriptors) variables that will be used during training.
+ x_train_q_stds : np.ndarray
+ Tensor of size [(timesteps * watersheds)] that contains the standard deviation of scaled streamflow values for
+ the catchment associated to the data in x_train and x_train_static. Each data point could come from any
+ catchment and this x_train_q_std variable helps scale the objective function.
+ y_train : np.ndarray
+ Tensor of size [(timesteps * watersheds)] containing the target variable for the same time point as in x_train,
+ x_train_static and x_train_q_stds. Usually the observed streamflow for the day associated to each of the
+ training points.
+ x_valid : np.ndarray
+ Tensor of size [(timesteps * watersheds) x window_size x n_dynamic_variables] that contains the dynamic (i.e.
+ timeseries) variables that will be used during validation.
+ x_valid_static : np.ndarray
+ Tensor of size [(timesteps * watersheds) x n_static_variables] that contains the static (i.e. catchment
+ descriptors) variables that will be used during validation.
+ x_valid_q_stds : np.ndarray
+ Tensor of size [(timesteps * watersheds)] that contains the standard deviation of scaled streamflow values for
+ the catchment associated to the data in x_valid and x_valid_static. Each data point could come from any
+ catchment and this x_valid_q_std variable helps scale the objective function for the validation points.
+ y_valid : np.ndarray
+ Tensor of size [(timesteps * watersheds)] containing the target variable for the same time point as in x_valid,
+ x_valid_static and x_valid_q_stds. Usually the observed streamflow for the day associated to each of the
+ validation points.
+ name_of_saved_model : str
+ Path to the model that has been pre-trained if required for simulations.
+ training_func : str
+ Name of the objective function used for training. For a regional model, it is highly recommended to use the
+ scaled nse_loss variable that uses the standard deviation of streamflow as inputs.
+ use_cpu : bool
+ Flag to force the training and simulations to be performed on the CPU rather than on the GPU(s). Must be
+ performed on a CPU that has AVX and AVX2 instruction sets, or tensorflow will fail. CPU training is very slow
+ and should only be used as a last resort (such as for CI testing and debugging).
+ """
+ success = 0
+ while success == 0:
+ k.clear_session() # Reset the model
+ # Define multi-GPU training
+ if use_parallel and not use_cpu:
+ strategy = tf.distribute.MirroredStrategy()
+ print(f"Number of devices: {strategy.num_replicas_in_sync}")
+ with strategy.scope():
+ model_func = get_list_of_LSTM_models(model_structure)
+ model_lstm, callback = model_func(
+ window_size=window_size,
+ n_dynamic_features=x_train.shape[2],
+ n_static_features=x_train_static.shape[1],
+ training_func=training_func,
+ checkpoint_path=name_of_saved_model,
+ )
+ print("USING MULTIPLE GPU SETUP")
+ else:
+ model_func = get_list_of_LSTM_models(model_structure)
+ model_lstm, callback = model_func(
+ window_size=window_size,
+ n_dynamic_features=x_train.shape[2],
+ n_static_features=x_train_static.shape[1],
+ training_func=training_func,
+ checkpoint_path=name_of_saved_model,
+ )
+ if use_cpu:
+ print("USING CPU")
+ else:
+ print("USING SINGLE GPU")
+
+ if use_cpu:
+ tf.config.set_visible_devices([], "GPU")
+
+ h = model_lstm.fit(
+ TrainingGenerator(
+ x_train,
+ x_train_static,
+ x_train_q_stds,
+ y_train,
+ batch_size=batch_size,
+ ),
+ epochs=epochs,
+ validation_data=TrainingGenerator(
+ x_valid,
+ x_valid_static,
+ x_valid_q_stds,
+ y_valid,
+ batch_size=batch_size,
+ ),
+ callbacks=[callback],
+ verbose=1,
+ )
+
+ if not np.isnan(h.history["loss"][-1]):
+ success = 1
+
+
+def perform_initial_train_local(
+ model_structure: str,
+ use_parallel: bool,
+ window_size: int,
+ batch_size: int,
+ epochs: int,
+ x_train: np.ndarray,
+ y_train: np.ndarray,
+ x_valid: np.ndarray,
+ y_valid: np.ndarray,
+ name_of_saved_model: str,
+ training_func: str,
+ use_cpu: bool = False,
+):
+ """Train the LSTM model using preprocessed data on a local catchment.
+
+ Parameters
+ ----------
+ model_structure : str
+ The version of the LSTM model that we want to use to apply to our data. Must be the name of a function that
+ exists in lstm_static.py.
+ use_parallel : bool
+ Flag to make use of multiple GPUs to accelerate training further. Models trained on multiple GPUs can have
+ larger batch_size values as different batches can be run on different GPUs in parallel. Speedup is not linear as
+ there is overhead related to the management of datasets, batches, the gradient merging and other steps. Still
+ very useful and should be used when possible.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ batch_size : int
+ Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
+ GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
+ performance and final model skill, and should be handled accordingly.
+ epochs : int
+ Number of training evaluations. Larger number of epochs means more model iterations and deeper training. At some
+ point, training will stop due to a stop in validation skill improvement.
+ x_train : np.ndarray
+ Tensor of size [(timesteps * watersheds) x window_size x n_dynamic_variables] that contains the dynamic (i.e.
+ timeseries) variables that will be used during training.
+ y_train : np.ndarray
+ Tensor of size [(timesteps * watersheds)] containing the target variable for the same time point as in x_train,
+ x_train_static and x_train_q_stds. Usually the observed streamflow for the day associated to each of the
+ training points.
+ x_valid : np.ndarray
+ Tensor of size [(timesteps * watersheds) x window_size x n_dynamic_variables] that contains the dynamic (i.e.
+ timeseries) variables that will be used during validation.
+ y_valid : np.ndarray
+ Tensor of size [(timesteps * watersheds)] containing the target variable for the same time point as in x_valid,
+ x_valid_static and x_valid_q_stds. Usually the observed streamflow for the day associated to each of the
+ validation points.
+ name_of_saved_model : str
+ Path to the model that has been pre-trained if required for simulations.
+ training_func : str
+ Name of the objective function used for training. For a regional model, it is highly recommended to use the
+ scaled nse_loss variable that uses the standard deviation of streamflow as inputs.
+ use_cpu : bool
+ Flag to force the training and simulations to be performed on the CPU rather than on the GPU(s). Must be
+ performed on a CPU that has AVX and AVX2 instruction sets, or tensorflow will fail. CPU training is very slow
+ and should only be used as a last resort (such as for CI testing and debugging).
+
+ Returns
+ -------
+ code
+ Adding this just because linter will not let me put nothing. Exits with 0 if all is normal.
+ """
+ success = 0
+ while success == 0:
+ k.clear_session() # Reset the model
+ # Define multi-GPU training
+ if use_parallel and not use_cpu:
+ strategy = tf.distribute.MirroredStrategy()
+ print(f"Number of devices: {strategy.num_replicas_in_sync}")
+ with strategy.scope():
+ model_func = get_list_of_LSTM_models(model_structure)
+ model_lstm, callback = model_func(
+ window_size=window_size,
+ n_dynamic_features=x_train.shape[2],
+ training_func=training_func,
+ checkpoint_path=name_of_saved_model,
+ )
+ print("USING MULTIPLE GPU SETUP")
+ else:
+ model_func = get_list_of_LSTM_models(model_structure)
+ model_lstm, callback = model_func(
+ window_size=window_size,
+ n_dynamic_features=x_train.shape[2],
+ training_func=training_func,
+ checkpoint_path=name_of_saved_model,
+ )
+ if use_cpu:
+ print("USING CPU")
+ else:
+ print("USING SINGLE GPU")
+
+ if use_cpu:
+ tf.config.set_visible_devices([], "GPU")
+
+ h = model_lstm.fit(
+ TrainingGeneratorLocal(
+ x_train,
+ y_train,
+ batch_size=batch_size,
+ ),
+ epochs=epochs,
+ validation_data=TrainingGeneratorLocal(
+ x_valid,
+ y_valid,
+ batch_size=batch_size,
+ ),
+ callbacks=[callback],
+ verbose=1,
+ )
+
+ if not np.isnan(h.history["loss"][-1]):
+ success = 1
+
+ return 0
+
+
+def run_model_after_training(
+ w: int,
+ arr_dynamic: np.ndarray,
+ arr_static: np.ndarray,
+ q_stds: np.ndarray,
+ window_size: int,
+ train_idx: np.ndarray,
+ batch_size: int,
+ watershed_areas: np.ndarray,
+ name_of_saved_model: str,
+ valid_idx: np.ndarray,
+ test_idx: np.ndarray,
+ all_idx: np.ndarray,
+ simulation_phases: list,
+):
+ """Simulate streamflow on given input data for a user-defined number of periods.
+
+ Parameters
+ ----------
+ w : int
+ Number of the watershed from the list of catchments that will be simulated.
+ arr_dynamic : np.ndarray
+ Tensor of size [time_steps x window_size x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training. The first element in axis=2 is the observed flow.
+ arr_static : np.ndarray
+ Tensor of size [time_steps x n_static_variables] that contains the static (i.e. catchment descriptors) variables
+ that will be used during training.
+ q_stds : np.ndarray
+ Tensor of size [time_steps] that contains the standard deviation of scaled streamflow values for the catchment
+ associated to the data in arr_dynamic and arr_static.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ train_idx : np.ndarray
+ Indices of the training period from the complete period. Contains 2 values per watershed: start and end indices.
+ batch_size : int
+ Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
+ GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
+ performance and final model skill, and should be handled accordingly.
+ watershed_areas : np.ndarray
+ Area of the watershed, in square kilometers, as taken from the training dataset initial input ncfile.
+ name_of_saved_model : str
+ Path to the model that has been pre-trained if required for simulations.
+ valid_idx : np.ndarray
+ Indices of the validation period from the complete period. Contains 2 values per watershed: start and end
+ indices.
+ test_idx : np.ndarray
+ Indices of the testing period from the complete period. Contains 2 values per watershed: start and end indices.
+ test_idx : np.ndarray
+ Indices of the testing period from the complete period. Contains 2 values per watershed: start and end indices.
+ all_idx : np.ndarray
+ Indices of the full period. Contains 2 values per watershed: start and end indices.
+ simulation_phases : list of str
+ List of periods to generate the simulations. Can contain ['train','valid','test','full'], corresponding to the
+ training, validation, testing and complete periods, respectively.
+
+ Returns
+ -------
+ kge : list
+ A list of size 4, with one float per period in ['train','valid','test','all']. Each KGE value is comupted
+ between observed and simulated flows for the watershed of interest and for all specified periods. Unrequested
+ periods return None.
+ flows : list
+ A list of np.ndarray objects of size 4, with one 2D np.ndarray per period in ['train','valid','test','all'].
+ Observed (column 1) and simulated (column 2) streamflows are computed for the watershed of interest and for all
+ specified periods. Unrequested periods return None.
+ """
+ # Run trained model on the training period and save outputs
+ if "train" in simulation_phases:
+ kge_train, flows_train = run_trained_model(
+ arr_dynamic,
+ arr_static,
+ q_stds,
+ window_size,
+ w,
+ train_idx,
+ batch_size,
+ watershed_areas,
+ name_of_saved_model=name_of_saved_model,
+ remove_nans=False,
+ )
+ else:
+ kge_train = None
+ flows_train = None
+
+ if "valid" in simulation_phases:
+ # Run trained model on the validation period and save outputs
+ kge_valid, flows_valid = run_trained_model(
+ arr_dynamic,
+ arr_static,
+ q_stds,
+ window_size,
+ w,
+ valid_idx,
+ batch_size,
+ watershed_areas,
+ name_of_saved_model=name_of_saved_model,
+ remove_nans=False,
+ )
+ else:
+ kge_valid = None
+ flows_valid = None
+
+ if "test" in simulation_phases:
+ # Run the trained model on the testing period and save outputs
+ kge_test, flows_test = run_trained_model(
+ arr_dynamic,
+ arr_static,
+ q_stds,
+ window_size,
+ w,
+ test_idx,
+ batch_size,
+ watershed_areas,
+ name_of_saved_model=name_of_saved_model,
+ remove_nans=False,
+ )
+ else:
+ kge_test = None
+ flows_test = None
+
+ if "full" in simulation_phases:
+ # Run the trained model on the full period and save outputs
+ kge_full, flows_full = run_trained_model(
+ arr_dynamic,
+ arr_static,
+ q_stds,
+ window_size,
+ w,
+ all_idx,
+ batch_size,
+ watershed_areas,
+ name_of_saved_model=name_of_saved_model,
+ remove_nans=False,
+ )
+ else:
+ kge_full = None
+ flows_full = None
+
+ kge = [kge_train, kge_valid, kge_test, kge_full]
+ flows = [flows_train, flows_valid, flows_test, flows_full]
+
+ return kge, flows
+
+
+def run_model_after_training_local(
+ arr_dynamic: np.ndarray,
+ window_size: int,
+ train_idx: np.ndarray,
+ batch_size: int,
+ name_of_saved_model: str,
+ valid_idx: np.ndarray,
+ test_idx: np.ndarray,
+ all_idx: np.ndarray,
+ simulation_phases: list,
+):
+ """Simulate streamflow on given input data for a user-defined number of periods.
+
+ Parameters
+ ----------
+ arr_dynamic : np.ndarray
+ Tensor of size [time_steps x window_size x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training. The first element in axis=2 is the observed flow.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ train_idx : np.ndarray
+ Indices of the training period from the complete period. Contains 2 values per watershed: start and end indices.
+ batch_size : int
+ Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
+ GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
+ performance and final model skill, and should be handled accordingly.
+ name_of_saved_model : str
+ Path to the model that has been pre-trained if required for simulations.
+ valid_idx : np.ndarray
+ Indices of the validation period from the complete period. Contains 2 values per watershed: start and end
+ indices.
+ test_idx : np.ndarray
+ Indices of the testing period from the complete period. Contains 2 values per watershed: start and end indices.
+ test_idx : np.ndarray
+ Indices of the testing period from the complete period. Contains 2 values per watershed: start and end indices.
+ all_idx : np.ndarray
+ Indices of the full period. Contains 2 values per watershed: start and end indices.
+ simulation_phases : list of str
+ List of periods to generate the simulations. Can contain ['train','valid','test','full'], corresponding to the
+ training, validation, testing and complete periods, respectively.
+
+ Returns
+ -------
+ kge : list
+ A list of floats of size 4, with one float per period in ['train','valid','test','all']. Each KGE value is
+ comupted between observed and simulated flows for the watershed of interest and for all specified periods.
+ Unrequested periods return None.
+ flows : list
+ A list of np.ndarray objects of size 4, with one 2D np.ndarray per period in ['train','valid','test','all'].
+ Observed (column 1) and simulated (column 2) streamflows are computed for the watershed of interest and for all
+ specified periods. Unrequested periods return None.
+ """
+ # Run trained model on the training period and save outputs
+ if "train" in simulation_phases:
+ kge_train, flows_train = run_trained_model_local(
+ arr_dynamic,
+ window_size,
+ train_idx,
+ batch_size,
+ name_of_saved_model=name_of_saved_model,
+ remove_nans=False,
+ )
+ else:
+ kge_train = None
+ flows_train = None
+
+ if "valid" in simulation_phases:
+ # Run trained model on the validation period and save outputs
+ kge_valid, flows_valid = run_trained_model_local(
+ arr_dynamic,
+ window_size,
+ valid_idx,
+ batch_size,
+ name_of_saved_model=name_of_saved_model,
+ remove_nans=False,
+ )
+ else:
+ kge_valid = None
+ flows_valid = None
+
+ if "test" in simulation_phases:
+ # Run the trained model on the testing period and save outputs
+ kge_test, flows_test = run_trained_model_local(
+ arr_dynamic,
+ window_size,
+ test_idx,
+ batch_size,
+ name_of_saved_model=name_of_saved_model,
+ remove_nans=False,
+ )
+ else:
+ kge_test = None
+ flows_test = None
+
+ if "full" in simulation_phases:
+ # Run the trained model on the full period and save outputs
+ kge_full, flows_full = run_trained_model_local(
+ arr_dynamic,
+ window_size,
+ all_idx,
+ batch_size,
+ name_of_saved_model=name_of_saved_model,
+ remove_nans=False,
+ )
+ else:
+ kge_full = None
+ flows_full = None
+
+ kge = [kge_train, kge_valid, kge_test, kge_full]
+ flows = [flows_train, flows_valid, flows_test, flows_full]
+
+ return kge, flows
diff --git a/src/xhydro/lstm_tools/lstm_static.py b/src/xhydro/lstm_tools/lstm_static.py
new file mode 100644
index 00000000..dcc4128f
--- /dev/null
+++ b/src/xhydro/lstm_tools/lstm_static.py
@@ -0,0 +1,769 @@
+"""LSTM model definition and tools for LSTM model training."""
+
+import math
+
+import numpy as np
+import tensorflow as tf
+import tensorflow.keras.backend as k
+from tensorflow.keras.models import load_model
+
+from xhydro.modelling.obj_funcs import get_objective_function
+
+from .create_datasets import create_lstm_dataset, create_lstm_dataset_local
+
+__all__ = [
+ "TestingGenerator",
+ "TestingGeneratorLocal",
+ "TrainingGenerator",
+ "TrainingGeneratorLocal",
+ "get_list_of_LSTM_models",
+ "run_trained_model",
+ "run_trained_model_local",
+]
+
+
+def get_list_of_LSTM_models(model_structure): # noqa: N802
+ """Create a training generator to manage the GPU memory during training.
+
+ Parameters
+ ----------
+ model_structure : str
+ The name of the LSTM model to use for training. Must correspond to one of the models present in lstm_static.py.
+ The "model_structure_dict" must be updated when new models are added.
+
+ Returns
+ -------
+ function :
+ Handle to the LSTM model function.
+ """
+ # Create a list of available models:
+ try:
+ model_structure_dict = {
+ "simple_local_lstm": _simple_local_lstm,
+ "simple_regional_lstm": _simple_regional_lstm,
+ "dummy_local_lstm": _dummy_local_lstm,
+ "dummy_regional_lstm": _dummy_regional_lstm,
+ }
+ # FIXME: What kinds of exceptions are possible here?
+ except Exception: # noqa: BLE001
+ raise ValueError(
+ "The LSTM model structure desired is not present in the available model dictionary."
+ )
+
+ model_handle = model_structure_dict[model_structure]
+
+ return model_handle
+
+
+class TrainingGenerator(tf.keras.utils.Sequence):
+ """Create a training generator to manage the GPU memory during training.
+
+ Parameters
+ ----------
+ x_set : np.ndarray
+ Tensor of size [batch_size x window_size x n_dynamic_variables] that contains the batch of dynamic (i.e.
+ timeseries) variables that will be used during training.
+ x_set_static : np.ndarray
+ Tensor of size [batch_size x n_static_variables] that contains the batch of static (i.e. catchment descriptors)
+ variables that will be used during training.
+ x_set_q_stds : np.ndarray
+ Tensor of size [batch_size] that contains the standard deviation of scaled streamflow values for the catchment
+ associated to the data in x_set and x_set_static. Each data point could come from any catchment and this q_std
+ variable helps scale the objective function.
+ y_set : np.ndarray
+ Tensor of size [batch_size] containing the target variable for the same time point as in x_set, x_set_static and
+ x_set_q_stds. Usually the streamflow for the day associated to each of the training points.
+ batch_size : int
+ Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
+ GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
+ performance and final model skill, and should be handled accordingly.
+
+ Returns
+ -------
+ self : An object containing the subset of the total data that is selected for this batch.
+ """
+
+ def __init__(self, x_set, x_set_static, x_set_q_stds, y_set, batch_size):
+ self.x = x_set
+ self.x_static = x_set_static
+ self.x_q_stds = x_set_q_stds
+ self.y = y_set
+ self.batch_size = batch_size
+ self.indices = np.arange(self.x.shape[0])
+
+ def __len__(self):
+ """Get total number of batches to generate"""
+ return (np.ceil(len(self.y) / float(self.batch_size))).astype(int)
+
+ def __getitem__(self, idx):
+ """Get one of the batches by taking the 'batch_size' first elements from the list of remaining indices."""
+ inds = self.indices[idx * self.batch_size : (idx + 1) * self.batch_size]
+ batch_x = self.x[inds]
+ batch_x_static = self.x_static[inds]
+ batch_x_q_stds = self.x_q_stds[inds]
+ batch_y = self.y[inds]
+
+ return [np.array(batch_x), np.array(batch_x_static)], np.vstack(
+ (np.array(batch_y), np.array(batch_x_q_stds))
+ ).T
+
+ def on_epoch_end(self):
+ """Shuffle the dataset before the next batch sampling to ensure randomness, helping convergence."""
+ np.random.shuffle(self.indices)
+
+
+class TrainingGeneratorLocal(tf.keras.utils.Sequence):
+ """Create a training generator to manage the GPU memory during training.
+
+ Parameters
+ ----------
+ x_set : np.ndarray
+ Tensor of size [batch_size x window_size x n_dynamic_variables] that contains the batch of dynamic (i.e.
+ timeseries) variables that will be used during training.
+ y_set : np.ndarray
+ Tensor of size [batch_size] containing the target variable for the same time point as in x_set, x_set_static and
+ x_set_q_stds. Usually the streamflow for the day associated to each of the training points.
+ batch_size : int
+ Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
+ GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
+ performance and final model skill, and should be handled accordingly.
+
+ Returns
+ -------
+ self : An object containing the subset of the total data that is selected for this batch.
+ """
+
+ def __init__(self, x_set, y_set, batch_size):
+ self.x = x_set
+ self.y = y_set
+ self.batch_size = batch_size
+ self.indices = np.arange(self.x.shape[0])
+
+ def __len__(self):
+ """Get total number of batches to generate"""
+ return (np.ceil(len(self.y) / float(self.batch_size))).astype(int)
+
+ def __getitem__(self, idx):
+ """Get one of the batches by taking the 'batch_size' first elements from the list of remaining indices."""
+ inds = self.indices[idx * self.batch_size : (idx + 1) * self.batch_size]
+ batch_x = self.x[inds]
+ batch_y = self.y[inds]
+
+ return [np.array(batch_x)], [np.array(batch_y)]
+
+ def on_epoch_end(self):
+ """Shuffle the dataset before the next batch sampling to ensure randomness, helping convergence."""
+ np.random.shuffle(self.indices)
+
+
+class TestingGenerator(tf.keras.utils.Sequence):
+ """Create a testing generator to manage the GPU memory during training.
+
+ Parameters
+ ----------
+ x_set : np.ndarray
+ Tensor of size [batch_size x window_size x n_dynamic_variables] that contains the batch of dynamic (i.e.
+ timeseries) variables that will be used during training.
+ x_set_static : np.ndarray
+ Tensor of size [batch_size x n_static_variables] that contains the batch of static (i.e. catchment descriptors)
+ variables that will be used during training.
+ batch_size : int
+ Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
+ GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
+ performance and final model skill, and should be handled accordingly.
+
+ Returns
+ -------
+ self : An object containing the subset of the total data that is selected for this batch.
+ """
+
+ def __init__(self, x_set, x_set_static, batch_size):
+ self.x = x_set
+ self.x_static = x_set_static
+ self.batch_size = batch_size
+
+ def __len__(self):
+ """Get total number of batches to generate"""
+ return (np.ceil(self.x.shape[0] / float(self.batch_size))).astype(int)
+
+ def __getitem__(self, idx):
+ """Get one of the batches by taking the 'batch_size' first elements from the list of remaining indices."""
+ batch_x = self.x[idx * self.batch_size : (idx + 1) * self.batch_size]
+ batch_x_static = self.x_static[
+ idx * self.batch_size : (idx + 1) * self.batch_size
+ ]
+ return [np.array(batch_x), np.array(batch_x_static)], np.array(batch_x_static)
+
+
+class TestingGeneratorLocal(tf.keras.utils.Sequence):
+ """Create a testing generator to manage the GPU memory during training.
+
+ Parameters
+ ----------
+ x_set : np.ndarray
+ Tensor of size [batch_size x window_size x n_dynamic_variables] that contains the batch of dynamic (i.e.
+ timeseries) variables that will be used during training.
+ batch_size : int
+ Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
+ GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
+ performance and final model skill, and should be handled accordingly.
+
+ Returns
+ -------
+ self : An object containing the subset of the total data that is selected for this batch.
+ """
+
+ def __init__(self, x_set, batch_size):
+ self.x = x_set
+ self.batch_size = batch_size
+
+ def __len__(self):
+ """Get total number of batches to generate"""
+ return (np.ceil(self.x.shape[0] / float(self.batch_size))).astype(int)
+
+ def __getitem__(self, idx):
+ """Get one of the batches by taking the 'batch_size' first elements from the list of remaining indices."""
+ batch_x = self.x[idx * self.batch_size : (idx + 1) * self.batch_size]
+
+ return [np.array(batch_x)]
+
+
+def _kge_loss(data, y_pred):
+ """Compute the Kling-Gupta Efficiency (KGE) criterion under Keras for Tensorflow training.
+
+ Needs to be separate from the regular objective function calculations because it uses Keras/tensor arithmetic for
+ GPU.
+
+ Parameters
+ ----------
+ data : np.ndarray
+ Observed streamflow used as target for the LSTM training.
+ y_pred : np.ndarray
+ Simulated streamflow generated by the LSTM during training.
+
+ Returns
+ -------
+ kge
+ KGE metric computed using Keras tools.
+ """
+ y_true = data[:, 0]
+ y_pred = y_pred[:, 0]
+
+ # Compute the dimensionless correlation coefficient
+ r = k.sum((y_true - k.mean(y_true)) * (y_pred - k.mean(y_pred))) / (
+ k.sqrt(k.sum((y_true - k.mean(y_true)) ** 2))
+ * k.sqrt(k.sum((y_pred - k.mean(y_pred)) ** 2))
+ )
+
+ # Compute the dimensionless bias ratio b (beta)
+ b = k.mean(y_pred) / k.mean(y_true)
+
+ # Compute the dimensionless variability ratio g (gamma)
+ g = (k.std(y_pred) / k.mean(y_pred)) / (k.std(y_true) / k.mean(y_true))
+
+ # Compute the Kling-Gupta Efficiency (KGE) modified criterion
+ kge = 1 - (1 - k.sqrt((r - 1) ** 2 + (b - 1) ** 2 + (g - 1) ** 2))
+
+ return kge
+
+
+def _nse_scaled_loss(data, y_pred):
+ """Compute the modified NSE loss for regional training.
+
+ Applies a random noise element for robustness, as well scales the flows according to their standard deviations to be
+ able to compute the nse with data from multiple catchments all scaled and normalized.
+
+ Parameters
+ ----------
+ data : np.ndarray
+ Tensor of the target variable (column 1) and observed streamflow standard deviations (column 2).
+ y_pred : np.ndarray
+ Tensor of the predicted variable from the LSTM model.
+
+ Returns
+ -------
+ scaled_loss
+ Scaled modified NSE metric for training on regional models.
+ """
+ y_true = data[:, 0]
+ q_stds = data[:, 1]
+ y_pred = y_pred[:, 0]
+
+ eps = 0.1
+ squared_error = (y_pred - y_true) ** 2
+ weights = 1 / (q_stds + eps) ** 2
+ scaled_loss = weights * squared_error
+
+ return scaled_loss
+
+
+def _simple_regional_lstm(
+ window_size: int,
+ n_dynamic_features: int,
+ n_static_features: int,
+ training_func: str,
+ checkpoint_path: str = "tmp.h5",
+):
+ """Define the LSTM model structure and hyperparameters to use. Must be updated by users to modify model structures.
+
+ Parameters
+ ----------
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ n_dynamic_features : int
+ Number of dynamic (i.e. time-series) variables that are used for training and simulating the model.
+ n_static_features : int
+ Number of static (i.e. catchment descriptor) variables that are used to define the regional LSTM model and allow
+ it to modulate simulations according to catchment properties.
+ training_func : str
+ Name of the objective function used for training. For a regional model, it is highly recommended to use the
+ scaled nse_loss variable that uses the standard deviation of streamflow as inputs.
+ checkpoint_path : str
+ String containing the path of the file where the trained model will be saved.
+
+ Returns
+ -------
+ model_lstm : Tensorflow model
+ The tensorflow model that will be trained, along with all of its default hyperparameters and training options.
+ callback : list
+ List of tensorflow objects that allow performing operations after each training epoch.
+ """
+ x_in_365 = tf.keras.layers.Input(shape=(window_size, n_dynamic_features))
+ x_in_static = tf.keras.layers.Input(shape=n_static_features)
+
+ # LSTM 365 day
+ x_365 = tf.keras.layers.LSTM(128, return_sequences=True)(x_in_365)
+ x_365 = tf.keras.layers.LSTM(128, return_sequences=False)(x_365)
+ x_365 = tf.keras.layers.Dropout(0.1)(x_365)
+
+ # Dense statics
+ x_static = tf.keras.layers.Dense(24, activation="relu")(x_in_static)
+ x_static = tf.keras.layers.Dropout(0.2)(x_static)
+
+ # Concatenate the model
+ x = tf.keras.layers.Concatenate()([x_365, x_static])
+ x = tf.keras.layers.Dense(12, activation="relu")(x)
+ x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)
+ x_out = tf.keras.layers.Dense(1, activation="relu")(x)
+
+ model_lstm = tf.keras.models.Model([x_in_365, x_in_static], [x_out])
+ if training_func == "nse_scaled":
+ model_lstm.compile(
+ loss=_nse_scaled_loss, optimizer=tf.keras.optimizers.AdamW(clipnorm=0.1)
+ )
+ elif training_func == "kge":
+ model_lstm.compile(
+ loss=_kge_loss, optimizer=tf.keras.optimizers.AdamW(clipnorm=0.1)
+ )
+
+ callback = [
+ tf.keras.callbacks.ModelCheckpoint(
+ checkpoint_path,
+ save_freq="epoch",
+ save_best_only=True,
+ monitor="val_loss",
+ mode="min",
+ verbose=1,
+ ),
+ tf.keras.callbacks.EarlyStopping(
+ monitor="val_loss", mode="min", verbose=1, patience=15
+ ),
+ tf.keras.callbacks.LearningRateScheduler(_step_decay),
+ ]
+
+ return model_lstm, callback
+
+
+def _simple_local_lstm(
+ window_size: int,
+ n_dynamic_features: int,
+ training_func: str,
+ checkpoint_path: str = "tmp.h5",
+):
+ """Define the local LSTM model structure and hyperparameters to use.
+
+ Must be updated by users to modify model structures.
+
+ Parameters
+ ----------
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ n_dynamic_features : int
+ Number of dynamic (i.e. time-series) variables that are used for training and simulating the model.
+ training_func : str
+ Name of the objective function used for training. For a regional model, it is highly recommended to use the
+ scaled nse_loss variable that uses the standard deviation of streamflow as inputs.
+ checkpoint_path : str
+ String containing the path of the file where the trained model will be saved.
+
+ Returns
+ -------
+ model_lstm : Tensorflow model
+ The tensorflow model that will be trained, along with all of its default hyperparameters and training options.
+ callback : list
+ List of tensorflow objects that allow performing operations after each training epoch.
+ """
+ x_in_365 = tf.keras.layers.Input(shape=(window_size, n_dynamic_features))
+
+ # Single LSTM layer
+ x_365 = tf.keras.layers.LSTM(32, return_sequences=True)(x_in_365)
+ x_365 = tf.keras.layers.LSTM(32, return_sequences=False)(x_365)
+ x_365 = tf.keras.layers.Dropout(0.1)(x_365) # Add dropout layer for robustness
+
+ # Pass to a simple Dense layer with relu activation and LeakyReLU activation
+ x = tf.keras.layers.Dense(12, activation="relu")(x_365)
+ x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)
+
+ # pass to a 1-unit dense layer representing output flow.
+ x_out = tf.keras.layers.Dense(1, activation="relu")(x)
+
+ # Build model
+ model_lstm = tf.keras.models.Model([x_in_365], [x_out])
+
+ if training_func == "kge":
+ model_lstm.compile(
+ loss=_kge_loss, optimizer=tf.keras.optimizers.AdamW(clipnorm=0.1)
+ )
+ else:
+ raise ValueError("training_func can only be kge for the local training model.")
+
+ callback = [
+ tf.keras.callbacks.ModelCheckpoint(
+ checkpoint_path,
+ save_freq="epoch",
+ save_best_only=True,
+ monitor="val_loss",
+ mode="min",
+ verbose=1,
+ ),
+ tf.keras.callbacks.EarlyStopping(
+ monitor="val_loss", mode="min", verbose=1, patience=15
+ ),
+ tf.keras.callbacks.LearningRateScheduler(_step_decay),
+ ]
+
+ return model_lstm, callback
+
+
+def _dummy_regional_lstm(
+ window_size: int,
+ n_dynamic_features: int,
+ n_static_features: int,
+ training_func: str,
+ checkpoint_path: str = "tmp.h5",
+):
+ """Define the LSTM model structure and hyperparameters to use. Must be updated by users to modify model structures.
+
+ Parameters
+ ----------
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ n_dynamic_features : int
+ Number of dynamic (i.e. time-series) variables that are used for training and simulating the model.
+ n_static_features : int
+ Number of static (i.e. catchment descriptor) variables that are used to define the regional LSTM model and allow
+ it to modulate simulations according to catchment properties.
+ training_func : str
+ Name of the objective function used for training. For a regional model, it is highly recommended to use the
+ scaled nse_loss variable that uses the standard deviation of streamflow as inputs.
+ checkpoint_path : str
+ String containing the path of the file where the trained model will be saved.
+
+ Returns
+ -------
+ model_lstm : Tensorflow model
+ The tensorflow model that will be trained, along with all of its default hyperparameters and training options.
+ callback : list
+ List of tensorflow objects that allow performing operations after each training epoch.
+ """
+ x_in_365 = tf.keras.layers.Input(shape=(window_size, n_dynamic_features))
+ x_in_static = tf.keras.layers.Input(shape=n_static_features)
+
+ # LSTM 365 day
+ x_365 = tf.keras.layers.LSTM(64, return_sequences=False)(x_in_365)
+ x_365 = tf.keras.layers.Dropout(0.2)(x_365)
+
+ # Dense statics
+ x_static = tf.keras.layers.Dense(24, activation="relu")(x_in_static)
+ x_static = tf.keras.layers.Dropout(0.2)(x_static)
+
+ # Concatenate the model
+ x = tf.keras.layers.Concatenate()([x_365, x_static])
+ x = tf.keras.layers.Dense(8, activation="relu")(x)
+ x_out = tf.keras.layers.Dense(1, activation="relu")(x)
+
+ model_lstm = tf.keras.models.Model([x_in_365, x_in_static], [x_out])
+ if training_func == "nse_scaled":
+ model_lstm.compile(loss=_nse_scaled_loss, optimizer=tf.keras.optimizers.AdamW())
+ elif training_func == "kge":
+ model_lstm.compile(loss=_kge_loss, optimizer=tf.keras.optimizers.AdamW())
+
+ callback = [
+ tf.keras.callbacks.ModelCheckpoint(
+ checkpoint_path,
+ save_freq="epoch",
+ save_best_only=True,
+ monitor="val_loss",
+ mode="min",
+ verbose=1,
+ ),
+ tf.keras.callbacks.EarlyStopping(
+ monitor="val_loss", mode="min", verbose=1, patience=15
+ ),
+ tf.keras.callbacks.LearningRateScheduler(_step_decay),
+ ]
+
+ return model_lstm, callback
+
+
+def _dummy_local_lstm(
+ window_size: int,
+ n_dynamic_features: int,
+ training_func: str,
+ checkpoint_path: str = "tmp.h5",
+):
+ """Define the local LSTM model structure and hyperparameters to use.
+
+ Must be updated by users to modify model structures.
+
+ Parameters
+ ----------
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ n_dynamic_features : int
+ Number of dynamic (i.e. time-series) variables that are used for training and simulating the model.
+ training_func : str
+ Name of the objective function used for training. For a regional model, it is highly recommended to use the
+ scaled nse_loss variable that uses the standard deviation of streamflow as inputs.
+ checkpoint_path : str
+ String containing the path of the file where the trained model will be saved.
+
+ Returns
+ -------
+ model_lstm : Tensorflow model
+ The tensorflow model that will be trained, along with all of its default hyperparameters and training options.
+ callback : list
+ List of tensorflow objects that allow performing operations after each training epoch.
+ """
+ x_in_365 = tf.keras.layers.Input(shape=(window_size, n_dynamic_features))
+
+ # LSTM 365 day
+ x_365 = tf.keras.layers.LSTM(64, return_sequences=False)(
+ x_in_365
+ ) # Single LSTM layer
+ x_365 = tf.keras.layers.Dropout(0.2)(x_365) # Add dropout layer for robustness
+ x = tf.keras.layers.Dense(8, activation="relu")(
+ x_365
+ ) # Pass to a simple Dense layer with relu activation
+ x_out = tf.keras.layers.Dense(1, activation="relu")(
+ x
+ ) # pass to a 1-unit dense layer representing output flow.
+
+ model_lstm = tf.keras.models.Model([x_in_365], [x_out])
+
+ if training_func == "kge":
+ model_lstm.compile(loss=_kge_loss, optimizer=tf.keras.optimizers.AdamW())
+ else:
+ raise ValueError("training_func can only be kge for the local training model.")
+
+ callback = [
+ tf.keras.callbacks.ModelCheckpoint(
+ checkpoint_path,
+ save_freq="epoch",
+ save_best_only=True,
+ monitor="val_loss",
+ mode="min",
+ verbose=1,
+ ),
+ tf.keras.callbacks.EarlyStopping(
+ monitor="val_loss", mode="min", verbose=1, patience=15
+ ),
+ tf.keras.callbacks.LearningRateScheduler(_step_decay),
+ ]
+
+ return model_lstm, callback
+
+
+def _step_decay(epoch: int):
+ """Callback for learning rate tuning during LSTM model training.
+
+ Parameters
+ ----------
+ epoch : int
+ Current epoch number during training. Used to adapt the learning rate after a certain number of epochs to aid in
+ model training convergence.
+
+ Returns
+ -------
+ lrate
+ The updated learning rate.
+ """
+ initial_lrate = 0.0005
+ drop = 0.5
+ epochs_drop = 10.0
+ lrate = initial_lrate * math.pow(drop, math.floor((1 + epoch) / epochs_drop))
+
+ return lrate
+
+
+def run_trained_model(
+ arr_dynamic: np.ndarray,
+ arr_static: np.ndarray,
+ q_stds: np.ndarray,
+ window_size: int,
+ w: int,
+ idx_scenario: np.ndarray,
+ batch_size: int,
+ watershed_areas: np.ndarray,
+ name_of_saved_model: str,
+ remove_nans: bool,
+):
+ """Run the trained regional LSTM model on a single catchment from a larger set.
+
+ Parameters
+ ----------
+ arr_dynamic : np.ndarray
+ Tensor of size [time_steps x window_size x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training. The first element in axis=2 is the observed flow.
+ arr_static : np.ndarray
+ Tensor of size [time_steps x n_static_variables] that contains the static (i.e. catchment descriptors) variables
+ that will be used during training.
+ q_stds : np.ndarray
+ Tensor of size [time_steps] that contains the standard deviation of scaled streamflow values for the catchment
+ associated to the data in arr_dynamic and arr_static.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ w : int
+ Number of the watershed from the list of catchments that will be simulated.
+ idx_scenario : np.ndarray
+ 2-element array of indices of the beginning and end of the desired period for which the LSTM model should be
+ simulated.
+ batch_size : int
+ Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
+ GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
+ performance and final model skill, and should be handled accordingly.
+ watershed_areas : np.ndarray
+ Area of the watershed, in square kilometers, as taken from the training dataset initial input ncfile.
+ name_of_saved_model : str
+ Path to the model that has been pre-trained if required for simulations.
+ remove_nans : bool
+ Remove the periods for which observed streamflow is NaN for both observed and simulated flows.
+
+ Returns
+ -------
+ kge : float
+ KGE value between observed and simulated flows computed for the watershed of interest and for a specified
+ period.
+ flows : np.ndarray
+ Observed and simulated streamflows computed for the watershed of interest and for a specified period.
+ """
+ # Delete and reload the model to free the memory
+ k.clear_session()
+ model_lstm = load_model(
+ name_of_saved_model, compile=False, custom_objects={"loss": _nse_scaled_loss}
+ )
+
+ # Training Database
+ x, x_static, _, y = create_lstm_dataset(
+ arr_dynamic=arr_dynamic[:, :, :],
+ arr_static=arr_static,
+ q_stds=q_stds,
+ window_size=window_size,
+ watershed_list=w[np.newaxis],
+ idx=idx_scenario,
+ remove_nans=remove_nans,
+ )
+
+ y_pred = model_lstm.predict(TestingGenerator(x, x_static, batch_size=batch_size))
+ y_pred = np.squeeze(y_pred)
+
+ # Rescale observed and simulated streamflow from mm/d to m^3/s
+ drainage_area = watershed_areas[w]
+
+ y_pred = y_pred * drainage_area / 86.4
+ y = y * drainage_area / 86.4
+
+ # Compute the Kling-Gupta Efficiency (KGE) for the current watershed
+ kge = get_objective_function(qobs=y, qsim=y_pred, obj_func="kge")
+ flows = np.array([y, y_pred])
+
+ return kge, flows
+
+
+def run_trained_model_local(
+ arr_dynamic: np.ndarray,
+ window_size: int,
+ idx_scenario: np.ndarray,
+ batch_size: int,
+ name_of_saved_model: str,
+ remove_nans: bool,
+):
+ """Run the trained regional LSTM model on a single catchment from a larger set.
+
+ Parameters
+ ----------
+ arr_dynamic : np.ndarray
+ Tensor of size [time_steps x window_size x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
+ variables that will be used during training. The first element in axis=2 is the observed flow.
+ window_size : int
+ Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
+ allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
+ Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
+ can improve results.
+ idx_scenario : np.ndarray
+ 2-element array of indices of the beginning and end of the desired period for which the LSTM model should be
+ simulated.
+ batch_size : int
+ Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
+ GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
+ performance and final model skill, and should be handled accordingly.
+ name_of_saved_model : str
+ Path to the model that has been pre-trained if required for simulations.
+ remove_nans : bool
+ Remove the periods for which observed streamflow is NaN for both observed and simulated flows.
+
+ Returns
+ -------
+ kge : float
+ KGE value between observed and simulated flows computed for the watershed of interest and for a specified
+ period.
+ flows : np.ndarray
+ Observed and simulated streamflows computed for the watershed of interest and for a specified period.
+ """
+ # Delete and reload the model to free the memory
+ k.clear_session()
+ model_lstm = load_model(
+ name_of_saved_model, compile=False, custom_objects={"loss": _kge_loss}
+ )
+
+ # Training Database
+ x, y = create_lstm_dataset_local(
+ arr_dynamic=arr_dynamic,
+ window_size=window_size,
+ idx=idx_scenario,
+ remove_nans=remove_nans,
+ )
+
+ y_pred = model_lstm.predict(TestingGeneratorLocal(x, batch_size=batch_size))
+ y_pred = np.squeeze(y_pred)
+
+ # Compute the Kling-Gupta Efficiency (KGE) for the watershed
+ kge = get_objective_function(qobs=y, qsim=y_pred, obj_func="kge")
+ flows = np.array([y, y_pred])
+
+ return kge, flows
diff --git a/tests/test_lstm.py b/tests/test_lstm.py
new file mode 100644
index 00000000..3e58efc0
--- /dev/null
+++ b/tests/test_lstm.py
@@ -0,0 +1,177 @@
+"""Test suite for LSTM model implementations"""
+
+import pathlib
+
+import pooch
+import pytest
+
+try:
+ from xhydro.lstm_tools.lstm_controller import (
+ control_local_lstm_training,
+ control_regional_lstm_training,
+ )
+
+ has_lstm = True
+except ImportError:
+ has_lstm = False
+
+
+@pytest.mark.skipif(not has_lstm, reason="xhydro lstm module not available.")
+class TestLstmModels:
+ """Test suite for the LSTM models."""
+
+ # Set GitHub URL for getting files for tests
+ GITHUB_URL = "https://github.com/hydrologie/xhydro-testdata"
+ BRANCH_OR_COMMIT_HASH = "main"
+
+ # Get data with pooch
+ input_data_filename_regional = pooch.retrieve(
+ url=f"{GITHUB_URL}/raw/{BRANCH_OR_COMMIT_HASH}/data/LSTM_data/LSTM_test_data.nc",
+ known_hash="md5:e7f1ddba0cf3dc3c5c6aa28a0c504fa2",
+ )
+
+ # Get data with pooch
+ input_data_filename_local = pooch.retrieve(
+ url=f"{GITHUB_URL}/raw/{BRANCH_OR_COMMIT_HASH}/data/LSTM_data/LSTM_test_data_local.nc",
+ known_hash="md5:2abfe4dd0287a43c1ab40372f4fc4de8",
+ )
+
+ batch_size = 64 # batch size used in the training - multiple of 32
+ epochs = 2 # Number of epoch to train the LSTM model
+ window_size = 5 # Number of time step (days) to use in the LSTM model
+ train_pct = 10 # Percentage of watersheds used for the training
+ valid_pct = 5 # Percentage of watersheds used for the validation
+ use_cpu = (
+ True # Use CPU as GPU is not guaranteed to be installed with CUDA/CuDNN etc.
+ )
+ use_parallel = False
+ do_simulation = True
+ filename_base = "LSTM_results"
+ simulation_phases = ["test"]
+ # Tags for the dynamic variables in the netcdf files.
+ dynamic_var_tags = ["tasmax_MELCC", "rf", "Qsim"]
+
+ # Scale variable according to area. Used for simulated flow inputs.
+ qsim_pos = [False, False, False, False, False]
+
+ # static variables used to condition flows on catchment properties
+ static_var_tags = [
+ "drainage_area",
+ "elevation",
+ "coniferous_forest",
+ "silty_clay_loam",
+ "meanPrecip",
+ ]
+
+ def test_lstm_controller_regional(self):
+ """Test the regional LSTM model implementation."""
+ training_func = "nse_scaled"
+ do_train = True
+
+ kge_results, flow_results, name_of_saved_model = control_regional_lstm_training(
+ self.input_data_filename_regional,
+ self.dynamic_var_tags,
+ self.qsim_pos,
+ self.static_var_tags,
+ batch_size=self.batch_size,
+ epochs=self.epochs,
+ window_size=self.window_size,
+ train_pct=self.train_pct,
+ valid_pct=self.valid_pct,
+ use_cpu=self.use_cpu,
+ use_parallel=self.use_parallel,
+ do_train=do_train,
+ model_structure="dummy_regional_lstm",
+ do_simulation=self.do_simulation,
+ training_func=training_func,
+ filename_base=self.filename_base,
+ simulation_phases=self.simulation_phases,
+ )
+
+ assert len(kge_results[0]) == 4
+ assert len(flow_results[0]) == 4
+ assert pathlib.Path(name_of_saved_model).is_file()
+
+ # Do a sim with no training
+ do_train = False
+
+ kge_results, flow_results, name_of_saved_model = control_regional_lstm_training(
+ self.input_data_filename_regional,
+ self.dynamic_var_tags,
+ self.qsim_pos,
+ self.static_var_tags,
+ batch_size=self.batch_size,
+ epochs=self.epochs,
+ window_size=self.window_size,
+ train_pct=self.train_pct,
+ valid_pct=self.valid_pct,
+ use_cpu=self.use_cpu,
+ use_parallel=self.use_parallel,
+ do_train=do_train,
+ model_structure="dummy_regional_lstm",
+ do_simulation=self.do_simulation,
+ training_func=training_func,
+ filename_base=self.filename_base,
+ simulation_phases=self.simulation_phases,
+ name_of_saved_model=name_of_saved_model,
+ )
+
+ assert len(kge_results[0]) == 4
+ assert len(flow_results[0]) == 4
+ assert pathlib.Path(name_of_saved_model).is_file()
+
+ def test_train_single_catchment(self):
+ """Test the regional LSTM model simulation after training."""
+ training_func = "kge"
+ do_train = True
+
+ # Do a sim with no training
+ kge_results, flow_results, name_of_saved_model = control_local_lstm_training(
+ self.input_data_filename_local,
+ self.dynamic_var_tags,
+ self.qsim_pos,
+ batch_size=self.batch_size,
+ epochs=self.epochs,
+ window_size=self.window_size,
+ train_pct=self.train_pct,
+ valid_pct=self.valid_pct,
+ use_cpu=self.use_cpu,
+ use_parallel=self.use_parallel,
+ do_train=do_train,
+ model_structure="dummy_local_lstm",
+ do_simulation=self.do_simulation,
+ training_func=training_func,
+ filename_base=self.filename_base,
+ simulation_phases=self.simulation_phases,
+ )
+
+ assert len(kge_results) == 4
+ assert len(flow_results) == 4
+ assert pathlib.Path(name_of_saved_model).is_file()
+
+ training_func = "kge"
+ do_train = False
+ simulation_phases = ["train", "valid", "test", "all"]
+
+ # Do a sim with the trained model on all periods
+ kge_results, flow_results, name_of_saved_model = control_local_lstm_training(
+ self.input_data_filename_local,
+ self.dynamic_var_tags,
+ self.qsim_pos,
+ batch_size=self.batch_size,
+ epochs=self.epochs,
+ window_size=self.window_size,
+ train_pct=self.train_pct,
+ valid_pct=self.valid_pct,
+ use_cpu=self.use_cpu,
+ use_parallel=self.use_parallel,
+ do_train=do_train,
+ do_simulation=self.do_simulation,
+ training_func=training_func,
+ filename_base=self.filename_base,
+ simulation_phases=simulation_phases,
+ name_of_saved_model=name_of_saved_model,
+ )
+
+ assert len(kge_results) == 4
+ assert len(flow_results) == 4