Skip to content

Commit

Permalink
Backscatter partitioner (#1023)
Browse files Browse the repository at this point in the history
* sar_backscatter: try to preserve partitioner
#1018

* sar_backscatter: print partitioner in test, may need an assert
#1018

* sar_backscatter: print partitioner in test, may need an assert
#1018

* sar_backscatter: should now get partitioner from scala
#1018

* sar_backscatter: should now get partitioner from scala
#1018

* sar_backscatter: add assert on partitioner
#1018
  • Loading branch information
jdries authored Jan 28, 2025
1 parent 7e15bec commit 9fabf5c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
27 changes: 17 additions & 10 deletions openeogeotrellis/collections/s1backscatter_orfeo.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,19 @@ def __init__(self, jvm: JVMView = None):
def _load_feature_rdd(
self, file_rdd_factory: JavaObject, projected_polygons, from_date: str, to_date: str, zoom: int,
tile_size: int, datacubeParams=None
) -> Tuple[pyspark.RDD, JavaObject]:
) -> Tuple[pyspark.RDD, JavaObject, JavaObject]:
logger.info("Loading feature JSON RDD from {f}".format(f=file_rdd_factory))
json_rdd = file_rdd_factory.loadSpatialFeatureJsonRDD(projected_polygons, from_date, to_date, zoom, tile_size,datacubeParams)
jrdd = json_rdd._1()
layer_metadata_sc = json_rdd._2()
json_rdd_partitioner = file_rdd_factory.loadSpatialFeatureJsonRDD(projected_polygons, from_date, to_date, zoom, tile_size,datacubeParams)
jrdd = json_rdd_partitioner._1()
layer_metadata_sc = json_rdd_partitioner._2()

# Decode/unwrap the JavaRDD of JSON blobs we built in Scala,
# additionally pickle-serialized by the PySpark adaption layer.
j2p_rdd = self.jvm.SerDe.javaToPython(jrdd)
serializer = pyspark.serializers.PickleSerializer()
pyrdd = geopyspark.create_python_rdd(j2p_rdd, serializer=serializer)
pyrdd = pyrdd.map(json.loads)
return pyrdd, layer_metadata_sc
return pyrdd, layer_metadata_sc,json_rdd_partitioner._3()

def _build_feature_rdd(
self,
Expand Down Expand Up @@ -158,14 +158,14 @@ def _build_feature_rdd(
file_rdd_factory = self.jvm.org.openeo.geotrellis.file.FileRDDFactory(
opensearch_client, collection_id, attributeValues, correlation_id,self.jvm.geotrellis.raster.CellSize(resolution[0], resolution[1])
)
feature_pyrdd, layer_metadata_sc = self._load_feature_rdd(
feature_pyrdd, layer_metadata_sc,partitioner = self._load_feature_rdd(
file_rdd_factory, projected_polygons=projected_polygons, from_date=from_date, to_date=to_date,
zoom=zoom, tile_size=tile_size, datacubeParams=datacubeParams
)
layer_metadata_py = convert_scala_metadata(
layer_metadata_sc, epoch_ms_to_datetime=_instant_ms_to_day, logger=logger
)
return feature_pyrdd, layer_metadata_py
return feature_pyrdd, layer_metadata_py,partitioner

# Mapping of `sar_backscatter` coefficient value to `SARCalibration` Lookup table value
_coefficient_mapping = {
Expand Down Expand Up @@ -542,7 +542,7 @@ def creodias(

debug_mode = smart_bool(sar_backscatter_arguments.options.get("debug"))

feature_pyrdd, layer_metadata_py = self._build_feature_rdd(
feature_pyrdd, layer_metadata_py,partitioner = self._build_feature_rdd(
collection_id=collection_id, projected_polygons=projected_polygons,
from_date=from_date, to_date=to_date, extra_properties=extra_properties,
tile_size=tile_size, zoom=zoom, correlation_id=
Expand Down Expand Up @@ -819,7 +819,7 @@ def creodias(

# an RDD of Python objects (basically SpaceTimeKey + feature) with gps.Metadata
target_resolution = sar_backscatter_arguments.options.get("resolution", (10.0, 10.0))
feature_pyrdd, layer_metadata_py = self._build_feature_rdd(
feature_pyrdd, layer_metadata_py,partitioner = self._build_feature_rdd(
collection_id=collection_id, projected_polygons=projected_polygons,
from_date=from_date, to_date=to_date, extra_properties=extra_properties,
tile_size=tile_size, zoom=zoom, correlation_id=
Expand Down Expand Up @@ -1032,8 +1032,15 @@ def partitionByPath(tuple):
numpy_rdd=tile_rdd,
metadata=layer_metadata_py
)

the_rdd = tile_layer.srdd

logger.info(f"sar-backscatter: partitioning with {str(partitioner)}")
if(partitioner is not None):
the_rdd = the_rdd.partitionByPartitioner(partitioner)

# Merge any keys that have more than one tile.
contextRDD = self.jvm.org.openeo.geotrellis.OpenEOProcesses().mergeTiles(tile_layer.srdd.rdd())
contextRDD = self.jvm.org.openeo.geotrellis.OpenEOProcesses().mergeTiles(the_rdd.rdd())
temporal_tiled_raster_layer = self.jvm.geopyspark.geotrellis.TemporalTiledRasterLayer
srdd = temporal_tiled_raster_layer.apply(self.jvm.scala.Option.apply(zoom), contextRDD)
merged_tile_layer = geopyspark.TiledRasterLayer(geopyspark.LayerType.SPACETIME, srdd)
Expand Down
2 changes: 2 additions & 0 deletions tests/data_collections/test_s1backscatter_orfeo.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ def test_creodias_s1_backscatter(
)
datacube = catalog.load_collection("Creodias-S1-Backscatter", load_params=load_params, env=EvalEnv())

assert "SpacePartitioner" in str(datacube.get_max_level().srdd.rdd().partitioner())

# Compare actual with the expected result.
filename = tmp_path / "s1backscatter.tiff"
datacube.save_result(filename, format="GTiff")
Expand Down

0 comments on commit 9fabf5c

Please sign in to comment.