Skip to content

Commit

Permalink
Merge pull request #158 from WFP-VAM/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
valpesendorfer authored Feb 17, 2021
2 parents 35eaf62 + 308df13 commit 27daad3
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 78 deletions.
3 changes: 3 additions & 0 deletions docs/modis_executables.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ In the case of 16 day MODIS NDVI products, both satellites can be interleaved to

It might be required to perform a check on temporal continuity, which can be done using `--last-collected`. Here a MODIS julian date can be specified that needs to be the last timestep collected before the new collection starts. If that's not the case, the process will fail on exception.

With the `--tiles-required` option, a comma separated list of tiles (e.g `h20v08,h20v09,...`) can be specified which are the required set of tiles to be collected. `modis_collect` will check if all tiles are present in the input files, and if they contain the same timeps. If either more or less tiles are present, or if the timesteps between the tiles differes, the script will raise an exception.

!!! Danger
It's the user's responsibility to ensure temporal continuity when creating and updating files. New timesteps are simply appended, only checking if they don't precede existing ones. There are no further checks if a timestep might be missing etc. This follows the "garbage in - garbage out" principle.

Expand All @@ -109,6 +111,7 @@ Options:
--last-collected [%Y%j] Last collected date in julian format (YYYYDDD -
%Y%j)
--tiles-required TEXT Required tiles - supplied as csv list
--help Show this message and exit.
```
Expand Down
2 changes: 1 addition & 1 deletion modape/modis/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def __init__(self,

satset = {x[:3] for x in products}

if not self.vam_product_code == "VIM":
if interleave and not self.vam_product_code == "VIM":
log.debug("Interleaving only possible for M{O|Y}D13 (VIM) products! Proceeding without interleave.")
interleave = False

Expand Down
16 changes: 8 additions & 8 deletions modape/modis/smooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,6 @@ def smooth(self,
else:
write_offset = 0

# Create weights array
wts = arr_raw.copy()

if self.tinterpolate:
log.debug("Temporal interpolation triggered!")
arr_smt = np.full((smt_chunks[0], len(dix)), fill_value=nodata, dtype="double")
Expand Down Expand Up @@ -307,7 +304,9 @@ def smooth(self,
for arr_raw_chunk in chunk_generator:
log.debug("Chunk %s", chunk_counter)

wts[...] = (arr_raw_chunk != nodata)*1
# create weights
wts = (arr_raw_chunk != nodata)
wts = wts.astype("uint8")

ndix = np.sum(wts, 1) >= (arr_raw.shape[1] * 0.2) # 20%+ data
map_index = np.where(ndix)[0]
Expand All @@ -317,6 +316,7 @@ def smooth(self,

for ix in map_index:

w = wts[ix, :].astype("double")
if soptimize:
if srange is None:
lag_correlation = lag1corr(arr_raw[ix, :-1], arr_raw[ix, 1:], nodata)
Expand All @@ -332,12 +332,12 @@ def smooth(self,

if p is not None:
arr_raw[ix, :], arr_sgrid[ix] = ws2doptvp(y=arr_raw[ix, :],
w=wts[ix, :],
w=w,
llas=array("d", sr),
p=p)
else:
arr_raw[ix, :], arr_sgrid[ix] = ws2doptv(y=arr_raw[ix, :],
w=wts[ix, :],
w=w,
llas=array("d", sr))

else:
Expand All @@ -347,9 +347,9 @@ def smooth(self,
s = 10 ** svalue

if p is None:
arr_raw[ix, :] = ws2d(y=arr_raw[ix, :], lmda=s, w=wts[ix, :])
arr_raw[ix, :] = ws2d(y=arr_raw[ix, :], lmda=s, w=w)
else:
arr_raw[ix, :] = ws2dp(y=arr_raw[ix, :], lmda=s, w=wts[ix, :], p=p)
arr_raw[ix, :] = ws2dp(y=arr_raw[ix, :], lmda=s, w=w, p=p)


if self.tinterpolate:
Expand Down
117 changes: 66 additions & 51 deletions modape/scripts/modis_collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
@click.option('--cleanup', is_flag=True, help='Remove collected HDF files')
@click.option('--force', is_flag=True, help='Force collect process not failing on corrupt inputs')
@click.option('--last-collected', type=click.DateTime(formats=['%Y%j']), help='Last collected date in julian format (YYYYDDD - %Y%j)')
@click.option("--tiles-required", type=click.STRING, help="Required tiles - supplied as csv list")
def cli(src_dir: str,
targetdir: str,
compression: str,
Expand All @@ -33,6 +34,7 @@ def cli(src_dir: str,
cleanup: bool,
force: bool,
last_collected: datetime,
tiles_required: str,
) -> None:
"""Collect raw MODIS hdf files into a raw MODIS HDF5 file.
Expand All @@ -55,6 +57,7 @@ def cli(src_dir: str,
cleanup (bool): Remove collected HDF files.
force (bool): When corrupt HDF input is encountered, use nodata instead of raising exception.
last_collected (datetime.datetime): Julian day of last collected raw timestep.
tiles_required (str): MODIS tiles required to be in input (as csv string of tile IDs).
"""

log = logging.getLogger(__name__)
Expand All @@ -74,6 +77,17 @@ def cli(src_dir: str,
if last_collected is not None:
last_collected = last_collected.strftime("%Y%j")

# parse tiles required and check if OK
if tiles_required is not None:
tile_regxp = re.compile(r'^h\d{2}v\d{2}$')
_tiles = []

for tile_req in tiles_required.split(','):
assert re.match(tile_regxp, tile_req.lower())
_tiles.append(tile_req.lower())

tiles_required = frozenset(_tiles)

click.echo("Starting MODIS COLLECT!")

hdf_files = list(input_dir.glob('*hdf'))
Expand Down Expand Up @@ -105,12 +119,32 @@ def cli(src_dir: str,
tile = ['']
tiles.append(*tile)

if tiles_required is not None:
log.debug("Checking for required tiles ...")

for product in set(products):
log.debug("Product: %s", product)
datetiles = [".".join(x.name.split(".")[1:3]) for x in hdf_files if re.match("^" + product, x.name)]
product_tiles = {x.split(".")[1] for x in datetiles}

timestamps = []
if product_tiles != tiles_required:
raise ValueError("Tiles for product %s don't match tiles-required!" % product)
for reqtile in tiles_required:
timestamps.append({x.split(".")[0] for x in datetiles if reqtile in x})

iterator = iter(timestamps)
reference = next(iterator)
if not all(x == reference for x in iterator):
raise ValueError("Not all tiles have same timesteps for product %s!" % product)
log.debug("Timestamps OK for %s", product)

groups = [".*".join(x) for x in zip(products, tiles, versions)]
groups = list({re.sub('(M.{1})(D.+)', 'M.'+'\\2', x) if REGEX_PATTERNS["VIM"].match(x) else x for x in groups}) # Join MOD13/MYD13
groups.sort()
log.debug("Parsed groups: %s", groups)

processing_dict = {}
to_process = {}

collected = []

Expand All @@ -119,18 +153,31 @@ def cli(src_dir: str,
group_files = [str(x) for x in hdf_files if group_pattern.match(x.name)]
group_files.sort()

parameters = dict(
targetdir=targetdir,
files=group_files,
interleave=interleave,
group_id=group.replace('M.', 'MX'),
compression=compression,
vam_product_code=vam_code,
last_collected=last_collected,
force=force,
)
_raw_h5 = ModisRawH5(files=group_files,
targetdir=targetdir,
vam_product_code=vam_code,
interleave=interleave)

if last_collected is not None:

if not _raw_h5.exists:
raise ValueError("Output H5 %s does not exist! Can't check last-collected!" % _raw_h5.filename)

processing_dict.update({group: parameters})
last_collected_infile = _raw_h5.last_collected

if not last_collected_infile:
raise ValueError(f"No last_collected recorded in {_raw_h5.filename}")

if not last_collected == last_collected_infile:
raise ValueError(f"Last collected date in file is {last_collected_infile} not {last_collected}")

to_process.update({
group: {
"raw_h5": _raw_h5,
"compression": compression,
"force": force,
}
})

log.debug("Start processing!")

Expand All @@ -145,21 +192,12 @@ def cli(src_dir: str,

with ProcessPoolExecutor(parallel_tiles) as executor:

for group, parameters in processing_dict.items():
for group, parameters in to_process.items():

log.debug("Submitting %s", group)

futures.append(
executor.submit(
_worker,
parameters["files"],
parameters["targetdir"],
parameters["vam_product_code"],
parameters["interleave"],
parameters["compression"],
parameters["last_collected"],
parameters["force"],
)
executor.submit(_worker, **parameters)
)

_ = wait(futures)
Expand All @@ -174,19 +212,13 @@ def cli(src_dir: str,

log.debug("Processing groups sequentially")

for group, parameters in processing_dict.items():
for group, parameters in to_process.items():

log.debug("Processing %s", group)

collected.extend(_worker(
parameters["files"],
parameters["targetdir"],
parameters["vam_product_code"],
parameters["interleave"],
parameters["compression"],
parameters["last_collected"],
parameters["force"],
))
collected.extend(
_worker(**parameters)
)

if cleanup and collected:
tracefile = targetdir.joinpath(".collected")
Expand All @@ -200,28 +232,11 @@ def cli(src_dir: str,

click.echo("MODIS COLLECT completed!")

def _worker(files, targetdir, vam_code, interleave, compression, last_collected, force):

raw_h5 = ModisRawH5(
files=files,
targetdir=targetdir,
vam_product_code=vam_code,
interleave=interleave,
)
def _worker(raw_h5, compression, force):

if not raw_h5.exists:
raw_h5.create(compression=compression)

if last_collected is not None:

last_collected_infile = raw_h5.last_collected

if not last_collected_infile:
raise ValueError(f"No last_collected recorded in {raw_h5.filename}")

assert last_collected == last_collected_infile, \
f"Last collected date in file is {last_collected_infile} not {last_collected}"

collected = raw_h5.update(force=force)

return collected
Expand Down
Loading

0 comments on commit 27daad3

Please sign in to comment.