diff --git a/docs/modis_executables.md b/docs/modis_executables.md index 80510e47..0e895d47 100644 --- a/docs/modis_executables.md +++ b/docs/modis_executables.md @@ -87,6 +87,8 @@ In the case of 16 day MODIS NDVI products, both satellites can be interleaved to It might be required to perform a check on temporal continuity, which can be done using `--last-collected`. Here a MODIS julian date can be specified that needs to be the last timestep collected before the new collection starts. If that's not the case, the process will fail on exception. +With the `--tiles-required` option, a comma separated list of tiles (e.g `h20v08,h20v09,...`) can be specified which are the required set of tiles to be collected. `modis_collect` will check if all tiles are present in the input files, and if they contain the same timeps. If either more or less tiles are present, or if the timesteps between the tiles differes, the script will raise an exception. + !!! Danger It's the user's responsibility to ensure temporal continuity when creating and updating files. New timesteps are simply appended, only checking if they don't precede existing ones. There are no further checks if a timestep might be missing etc. This follows the "garbage in - garbage out" principle. @@ -109,6 +111,7 @@ Options: --last-collected [%Y%j] Last collected date in julian format (YYYYDDD - %Y%j) + --tiles-required TEXT Required tiles - supplied as csv list --help Show this message and exit. ``` diff --git a/modape/modis/collect.py b/modape/modis/collect.py index aafb453e..d5220670 100644 --- a/modape/modis/collect.py +++ b/modape/modis/collect.py @@ -148,7 +148,7 @@ def __init__(self, satset = {x[:3] for x in products} - if not self.vam_product_code == "VIM": + if interleave and not self.vam_product_code == "VIM": log.debug("Interleaving only possible for M{O|Y}D13 (VIM) products! Proceeding without interleave.") interleave = False diff --git a/modape/modis/smooth.py b/modape/modis/smooth.py index 2fe5cf7a..a60f4a74 100644 --- a/modape/modis/smooth.py +++ b/modape/modis/smooth.py @@ -267,9 +267,6 @@ def smooth(self, else: write_offset = 0 - # Create weights array - wts = arr_raw.copy() - if self.tinterpolate: log.debug("Temporal interpolation triggered!") arr_smt = np.full((smt_chunks[0], len(dix)), fill_value=nodata, dtype="double") @@ -307,7 +304,9 @@ def smooth(self, for arr_raw_chunk in chunk_generator: log.debug("Chunk %s", chunk_counter) - wts[...] = (arr_raw_chunk != nodata)*1 + # create weights + wts = (arr_raw_chunk != nodata) + wts = wts.astype("uint8") ndix = np.sum(wts, 1) >= (arr_raw.shape[1] * 0.2) # 20%+ data map_index = np.where(ndix)[0] @@ -317,6 +316,7 @@ def smooth(self, for ix in map_index: + w = wts[ix, :].astype("double") if soptimize: if srange is None: lag_correlation = lag1corr(arr_raw[ix, :-1], arr_raw[ix, 1:], nodata) @@ -332,12 +332,12 @@ def smooth(self, if p is not None: arr_raw[ix, :], arr_sgrid[ix] = ws2doptvp(y=arr_raw[ix, :], - w=wts[ix, :], + w=w, llas=array("d", sr), p=p) else: arr_raw[ix, :], arr_sgrid[ix] = ws2doptv(y=arr_raw[ix, :], - w=wts[ix, :], + w=w, llas=array("d", sr)) else: @@ -347,9 +347,9 @@ def smooth(self, s = 10 ** svalue if p is None: - arr_raw[ix, :] = ws2d(y=arr_raw[ix, :], lmda=s, w=wts[ix, :]) + arr_raw[ix, :] = ws2d(y=arr_raw[ix, :], lmda=s, w=w) else: - arr_raw[ix, :] = ws2dp(y=arr_raw[ix, :], lmda=s, w=wts[ix, :], p=p) + arr_raw[ix, :] = ws2dp(y=arr_raw[ix, :], lmda=s, w=w, p=p) if self.tinterpolate: diff --git a/modape/scripts/modis_collect.py b/modape/scripts/modis_collect.py index 66b96380..bdcf8b8a 100644 --- a/modape/scripts/modis_collect.py +++ b/modape/scripts/modis_collect.py @@ -24,6 +24,7 @@ @click.option('--cleanup', is_flag=True, help='Remove collected HDF files') @click.option('--force', is_flag=True, help='Force collect process not failing on corrupt inputs') @click.option('--last-collected', type=click.DateTime(formats=['%Y%j']), help='Last collected date in julian format (YYYYDDD - %Y%j)') +@click.option("--tiles-required", type=click.STRING, help="Required tiles - supplied as csv list") def cli(src_dir: str, targetdir: str, compression: str, @@ -33,6 +34,7 @@ def cli(src_dir: str, cleanup: bool, force: bool, last_collected: datetime, + tiles_required: str, ) -> None: """Collect raw MODIS hdf files into a raw MODIS HDF5 file. @@ -55,6 +57,7 @@ def cli(src_dir: str, cleanup (bool): Remove collected HDF files. force (bool): When corrupt HDF input is encountered, use nodata instead of raising exception. last_collected (datetime.datetime): Julian day of last collected raw timestep. + tiles_required (str): MODIS tiles required to be in input (as csv string of tile IDs). """ log = logging.getLogger(__name__) @@ -74,6 +77,17 @@ def cli(src_dir: str, if last_collected is not None: last_collected = last_collected.strftime("%Y%j") + # parse tiles required and check if OK + if tiles_required is not None: + tile_regxp = re.compile(r'^h\d{2}v\d{2}$') + _tiles = [] + + for tile_req in tiles_required.split(','): + assert re.match(tile_regxp, tile_req.lower()) + _tiles.append(tile_req.lower()) + + tiles_required = frozenset(_tiles) + click.echo("Starting MODIS COLLECT!") hdf_files = list(input_dir.glob('*hdf')) @@ -105,12 +119,32 @@ def cli(src_dir: str, tile = [''] tiles.append(*tile) + if tiles_required is not None: + log.debug("Checking for required tiles ...") + + for product in set(products): + log.debug("Product: %s", product) + datetiles = [".".join(x.name.split(".")[1:3]) for x in hdf_files if re.match("^" + product, x.name)] + product_tiles = {x.split(".")[1] for x in datetiles} + + timestamps = [] + if product_tiles != tiles_required: + raise ValueError("Tiles for product %s don't match tiles-required!" % product) + for reqtile in tiles_required: + timestamps.append({x.split(".")[0] for x in datetiles if reqtile in x}) + + iterator = iter(timestamps) + reference = next(iterator) + if not all(x == reference for x in iterator): + raise ValueError("Not all tiles have same timesteps for product %s!" % product) + log.debug("Timestamps OK for %s", product) + groups = [".*".join(x) for x in zip(products, tiles, versions)] groups = list({re.sub('(M.{1})(D.+)', 'M.'+'\\2', x) if REGEX_PATTERNS["VIM"].match(x) else x for x in groups}) # Join MOD13/MYD13 groups.sort() log.debug("Parsed groups: %s", groups) - processing_dict = {} + to_process = {} collected = [] @@ -119,18 +153,31 @@ def cli(src_dir: str, group_files = [str(x) for x in hdf_files if group_pattern.match(x.name)] group_files.sort() - parameters = dict( - targetdir=targetdir, - files=group_files, - interleave=interleave, - group_id=group.replace('M.', 'MX'), - compression=compression, - vam_product_code=vam_code, - last_collected=last_collected, - force=force, - ) + _raw_h5 = ModisRawH5(files=group_files, + targetdir=targetdir, + vam_product_code=vam_code, + interleave=interleave) + + if last_collected is not None: + + if not _raw_h5.exists: + raise ValueError("Output H5 %s does not exist! Can't check last-collected!" % _raw_h5.filename) - processing_dict.update({group: parameters}) + last_collected_infile = _raw_h5.last_collected + + if not last_collected_infile: + raise ValueError(f"No last_collected recorded in {_raw_h5.filename}") + + if not last_collected == last_collected_infile: + raise ValueError(f"Last collected date in file is {last_collected_infile} not {last_collected}") + + to_process.update({ + group: { + "raw_h5": _raw_h5, + "compression": compression, + "force": force, + } + }) log.debug("Start processing!") @@ -145,21 +192,12 @@ def cli(src_dir: str, with ProcessPoolExecutor(parallel_tiles) as executor: - for group, parameters in processing_dict.items(): + for group, parameters in to_process.items(): log.debug("Submitting %s", group) futures.append( - executor.submit( - _worker, - parameters["files"], - parameters["targetdir"], - parameters["vam_product_code"], - parameters["interleave"], - parameters["compression"], - parameters["last_collected"], - parameters["force"], - ) + executor.submit(_worker, **parameters) ) _ = wait(futures) @@ -174,19 +212,13 @@ def cli(src_dir: str, log.debug("Processing groups sequentially") - for group, parameters in processing_dict.items(): + for group, parameters in to_process.items(): log.debug("Processing %s", group) - collected.extend(_worker( - parameters["files"], - parameters["targetdir"], - parameters["vam_product_code"], - parameters["interleave"], - parameters["compression"], - parameters["last_collected"], - parameters["force"], - )) + collected.extend( + _worker(**parameters) + ) if cleanup and collected: tracefile = targetdir.joinpath(".collected") @@ -200,28 +232,11 @@ def cli(src_dir: str, click.echo("MODIS COLLECT completed!") -def _worker(files, targetdir, vam_code, interleave, compression, last_collected, force): - - raw_h5 = ModisRawH5( - files=files, - targetdir=targetdir, - vam_product_code=vam_code, - interleave=interleave, - ) +def _worker(raw_h5, compression, force): if not raw_h5.exists: raw_h5.create(compression=compression) - if last_collected is not None: - - last_collected_infile = raw_h5.last_collected - - if not last_collected_infile: - raise ValueError(f"No last_collected recorded in {raw_h5.filename}") - - assert last_collected == last_collected_infile, \ - f"Last collected date in file is {last_collected_infile} not {last_collected}" - collected = raw_h5.update(force=force) return collected diff --git a/tests/test_cli.py b/tests/test_cli.py index fd421e25..883fcf0a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -6,9 +6,10 @@ from pathlib import Path import shutil import unittest -from unittest.mock import patch, Mock, MagicMock, call +from unittest.mock import patch, Mock, MagicMock from click.testing import CliRunner +from modape.modis.collect import ModisRawH5 from modape.scripts.modis_download import cli as modis_download_cli from modape.scripts.modis_collect import cli as modis_collect_cli from modape.scripts.modis_smooth import cli as modis_smooth_cli @@ -165,20 +166,23 @@ def test_modis_collect(self): file_path = data_dir.joinpath(file) file_path.touch() - calls = [] for product in ["MOD11", "MYD11"]: product_files = [str(x) for x in data_dir.glob("*hdf") if product in x.name] product_files.sort() - calls.append( - call(product_files, data_dir, None, False, "gzip", None, False), - ) with patch("modape.scripts.modis_collect._worker") as mocked_worker: mocked_worker.return_value = self.lst_files result = self.runner.invoke(modis_collect_cli, ["/tmp/data"]) mocked_worker.assert_called() self.assertEqual(mocked_worker.call_count, 2) - mocked_worker.assert_has_calls(calls, any_order=False) + for call in mocked_worker.call_args_list: + args, kwargs = call + self.assertEqual(len(args), 0) + self.assertEqual(len(kwargs), 3) + self.assertEqual(type(kwargs["raw_h5"]), ModisRawH5) + self.assertEqual(kwargs["compression"], 'gzip') + self.assertEqual(kwargs["force"], False) + self.assertEqual(result.exit_code, 0) _ = [x.unlink() for x in data_dir.glob("*hdf")] @@ -193,7 +197,6 @@ def test_modis_collect(self): result = self.runner.invoke(modis_collect_cli, ["/tmp/data", "--interleave", "--cleanup"]) mocked_worker.assert_called_once() product_files.sort() - mocked_worker.assert_called_with(product_files, data_dir, None, True, "gzip", None, False) self.assertEqual(result.exit_code, 0) tracefile = data_dir.joinpath(".collected") @@ -205,25 +208,64 @@ def test_modis_collect(self): for file in self.vim_files: file_path = data_dir.joinpath(file) file_path.touch() + file_path = data_dir.joinpath(file.replace("h18v06", "h19v06")) + file_path.touch() + with patch("modape.scripts.modis_collect._worker") as mocked_worker: - mocked_worker.return_value = True product_files = [str(x) for x in data_dir.glob("*hdf")] product_files.sort() + mocked_worker.return_value = product_files + + # wrong last-collected input result = self.runner.invoke(modis_collect_cli, ["/tmp/data", "--interleave", "--cleanup", "--last-collected", "2002-01-01"]) mocked_worker.assert_not_called() self.assertEqual(result.exit_code, 2) - with patch("modape.scripts.modis_collect.ModisRawH5") as mocked_rawfile: - mocked_rawfile.return_value = MagicMock(exists=True, last_collected="2020001") - result = self.runner.invoke(modis_collect_cli, ["/tmp/data", "--interleave", "--cleanup", "--last-collected", "2020365"]) - mocked_rawfile.assert_called_once() - self.assertEqual(result.exit_code, 1) + # test last-collected checks + with patch("modape.scripts.modis_collect.ModisRawH5") as mocked_rawfile: + mocked_rawfile.return_value = MagicMock(exists=True, last_collected="2020001") + result = self.runner.invoke(modis_collect_cli, ["/tmp/data", "--interleave", "--last-collected", "2020365"]) + mocked_rawfile.assert_called_once() + mocked_worker.assert_not_called() + self.assertEqual(result.exit_code, 1) + self.assertEqual(result.exc_info[0], ValueError) + + mocked_rawfile.reset_mock() + + result = self.runner.invoke(modis_collect_cli, ["/tmp/data", "--interleave", "--last-collected", "2020001"]) + self.assertEqual(mocked_rawfile.call_count, 2) + self.assertEqual(mocked_worker.call_count, 2) + self.assertEqual(result.exit_code, 0) + + mocked_worker.reset_mock() + + result = self.runner.invoke(modis_collect_cli, ["/tmp/data", "--interleave"]) - mocked_rawfile.reset_mock() - result = self.runner.invoke(modis_collect_cli, ["/tmp/data", "--interleave", "--cleanup", "--last-collected", "2020001"]) - mocked_rawfile.assert_called_once() self.assertEqual(result.exit_code, 0) + self.assertEqual(mocked_worker.call_count, 2) + + mocked_worker.reset_mock() + + # check if tile-required works + result = self.runner.invoke(modis_collect_cli, ["/tmp/data", "--interleave", "--tiles-required", "h18v06,h19v06"]) + self.assertEqual(result.exit_code, 0) + self.assertEqual(mocked_worker.call_count, 2) + + mocked_worker.reset_mock() + + result = self.runner.invoke(modis_collect_cli, ["/tmp/data", "--interleave", "--tiles-required", "h18v06,h19v06,h20v06"]) + self.assertEqual(mocked_worker.call_count, 0) + self.assertEqual(result.exit_code, 1) + self.assertEqual(result.exc_info[0], ValueError) + + mocked_worker.reset_mock() + + Path(product_files[-1]).unlink() + result = self.runner.invoke(modis_collect_cli, ["/tmp/data", "--interleave", "--tiles-required", "h18v06,h19v06"]) + mocked_worker.assert_not_called() + self.assertEqual(result.exit_code, 1) + self.assertEqual(result.exc_info[0], ValueError) def test_modis_smooth(self): """Test modis_smooth.py""" diff --git a/tests/test_modis.py b/tests/test_modis.py index 8349daac..586cb941 100644 --- a/tests/test_modis.py +++ b/tests/test_modis.py @@ -90,7 +90,7 @@ def create_h5temp(rows: int, dset = h5f.create_dataset("data", shape=(rows*cols, 4), - dtype="Int16", + dtype="int16", maxshape=(rows*cols, None), chunks=((rows*cols)//25, 10), compression="gzip", @@ -137,7 +137,7 @@ def create_h5temp_global() -> Path: dset = h5f.create_dataset("data", shape=(rows*cols, 4), - dtype="Int16", + dtype="int16", maxshape=(rows*cols, None), chunks=((rows*cols)//25, 10), compression="gzip",