From be0e085b651be5b256dd90c4ac0979e7ae53340e Mon Sep 17 00:00:00 2001 From: Kai Kratz Date: Tue, 18 Feb 2025 11:24:24 +0100 Subject: [PATCH 1/5] Updated Contributors/Readme --- CONTRIBUTORS.md | 10 +++ Notes.md | 13 --- contributors.md | 13 --- acknowledgements.md => server/__init__.py | 0 server/client.py | 26 ++++++ server/server.py | 100 ++++++++++++++++++++++ test.json | 25 ++++++ 7 files changed, 161 insertions(+), 26 deletions(-) create mode 100644 CONTRIBUTORS.md delete mode 100644 Notes.md delete mode 100644 contributors.md rename acknowledgements.md => server/__init__.py (100%) create mode 100644 server/client.py create mode 100644 server/server.py create mode 100644 test.json diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md new file mode 100644 index 0000000..59035fb --- /dev/null +++ b/CONTRIBUTORS.md @@ -0,0 +1,10 @@ +# Contributors + +## How to Contribute + +We are currently not open for contributions on this project. + +## Contributing Organisations + +Significant contributions have been made by the following organisations: +- [ECMWF](https://www.ecmwf.int/) diff --git a/Notes.md b/Notes.md deleted file mode 100644 index 204a980..0000000 --- a/Notes.md +++ /dev/null @@ -1,13 +0,0 @@ -This is a Document listing all issues with zarr -=============================================== - -# TODOS: -- How is the indexing working in general? - - Different types: - - Orthogonal indexing - - Fancy indexing - - Slicing -- Zarr chunking is responsible for which subsections of the data is loaded: - - How to map the zarr chunks to gribjump calls, vice versa? - - - diff --git a/contributors.md b/contributors.md deleted file mode 100644 index 0c42a7b..0000000 --- a/contributors.md +++ /dev/null @@ -1,13 +0,0 @@ -## How to Contribute - -Please see the [read the docs](https://zfdb.readthedocs.io/en/latest/dev/contributing.html). - - -## Contributors - -Thank you to all the wonderful people who have contributed to ZarrFDB. Contributions can come in many forms, including code, documentation, bug reports, feature suggestions, design, and more. A list of code-based contributors can be found [here](https://github.com/ecmwf/zfdb/graphs/contributors). - - -## Contributing Organisations - -Significant contributions have been made by the following organisations: [ECMWF](https://www.ecmwf.int/) \ No newline at end of file diff --git a/acknowledgements.md b/server/__init__.py similarity index 100% rename from acknowledgements.md rename to server/__init__.py diff --git a/server/client.py b/server/client.py new file mode 100644 index 0000000..482a8af --- /dev/null +++ b/server/client.py @@ -0,0 +1,26 @@ +import json +import pathlib +import requests +import zarr +import fsspec + +if __name__ == "__main__": + json_request = None + with open(pathlib.Path("./test.json"), "r") as request_file: + json_request = json.loads(request_file.read()) + + url = "http://localhost:5000/create" + headers = {"Content-Type": "application/json"} + response = requests.post(url, headers=headers, data=json.dumps(json_request)) + + print(response.content) + hash = response.json()["hash"] + store = fsspec.get_mapper(f'http://localhost:5000/get/zarr/{hash}') + z_grp = zarr.open(store, mode="r") + + print(z_grp.attrs) + + for x in z_grp.attrs.items(): + print(x) + + print(z_grp["data"][0,0,0,0:30]) diff --git a/server/server.py b/server/server.py new file mode 100644 index 0000000..d46b406 --- /dev/null +++ b/server/server.py @@ -0,0 +1,100 @@ +from flask import Flask, Response, request, jsonify +import json +import zfdb + +import pyfdb +import pygribjump + +import numpy as np + + +app = Flask(__name__) + +view_hashes = {} + +fdb = pyfdb.FDB() +gribjump = pygribjump.GribJump() + + +def map_requests_from_json(json) -> list[zfdb.Request]: + for r in json["requests"]: + print(np.datetime64(r["date_time"])) + r["date_time"] = np.datetime64(r["date_time"]) + + print(f"{json}") + + return [zfdb.Request(**r) for r in json["requests"]] + + +def to_response(hased_request: int) -> Response: + return_dict = {"hash": str(hased_request)} + return Response( + response=json.dumps(return_dict), + status=200, + content_type="application-type/json", + ) + + +@app.route("/") +def hello_world(): + return "Hello, World!" + + +@app.route("/create", methods=["POST"]) +def process_json(): + print(f"Request: {request}") + data = request.get_json() + if not data: + return jsonify({"error": "Invalid JSON"}), 400 + + # In case we have this view for the requests already + hashed_request = hash(json.dumps(data)) + + print(f"Computed Hash: {hashed_request}") + if hashed_request in view_hashes: + return to_response(hashed_request) + + requests = map_requests_from_json(data) + + mapping = zfdb.make_forecast_data_view( + request=requests, + fdb=fdb, + gribjump=gribjump, + ) + + view_hashes[hashed_request] = mapping + + print("Retrieved the following requests:") + for r in requests: + print(r) + + return to_response(hashed_request) + +@app.route("/get/zarr/", methods=["GET"]) +@app.route("/get/zarr//", methods=["GET"]) +@app.route("/get/zarr///", methods=["GET"]) +@app.route( "/get/zarr////", methods=["GET"]) +def retrieve_zarr(hash, root_grp=None, group_lvl_one=None, group_lvl_two=None): + + print("Routes", hash, root_grp, group_lvl_one, group_lvl_two) + print(view_hashes) + + zarr_path = "/".join([frag for frag in [root_grp, group_lvl_one, group_lvl_two] if frag]) + + try: + mapping = view_hashes[int(hash)] + except KeyError as ke: + return Response(response=f"Couldn't find hash in {hash}", status=500) + + print(f"Zarr path: {zarr_path}") + + try: + content = mapping[zarr_path] + except KeyError as ke: + return Response(response="", status=404) + + return Response(response=content, status=200) + + +if __name__ == "__main__": + app.run(debug=True) diff --git a/test.json b/test.json new file mode 100644 index 0000000..b90554c --- /dev/null +++ b/test.json @@ -0,0 +1,25 @@ +{ + "requests": [ + { + "cls": "od", + "stream": "oper", + "expver": 1, + "typ": "fc", + "levtype": "sfc", + "date_time": "2025-01-01T00:00:00", + "steps": [0, 1, 2, 3, 4], + "params": ["165", "166"] + }, + { + "cls": "od", + "stream": "oper", + "expver": 1, + "typ": "fc", + "levtype": "pl", + "level": ["50", "100"], + "date_time": "2025-01-01T00:00:00", + "steps": [0, 1, 2, 3, 4], + "params": ["133", "130"] + } + ] +} From 589b0670e10ff742f6c0d451d622b2a7c24b2308 Mon Sep 17 00:00:00 2001 From: Kai Kratz Date: Fri, 21 Feb 2025 09:45:24 +0100 Subject: [PATCH 2/5] Check if listing succeds on fdb during init --- src/zfdb/datasources.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/zfdb/datasources.py b/src/zfdb/datasources.py index 4e048a6..f31a3a7 100644 --- a/src/zfdb/datasources.py +++ b/src/zfdb/datasources.py @@ -156,12 +156,23 @@ def __init__( self._gribjump = gribjump self._requests = requests self._gj = pygribjump.GribJump() - steps_count = requests[0].steps_count - fields_count = sum([r.field_count for r in requests]) - - first_field = next( - self._fdb.list(requests[0].as_mars_request_for_step_index(0), keys=True) + steps_count = self._requests[0].steps_count + fields_count = sum([r.field_count for r in self._requests]) + values_count = self._query_number_of_values_in_field() + self._shape = (int(steps_count), fields_count, int(1), int(values_count)) + self._chunks = (1, fields_count, 1, values_count) + self._chunks_per_dimension = tuple( + [math.ceil(a / b) for (a, b) in zip(self._shape, self._chunks)] ) + + def _query_number_of_values_in_field(self) -> int: + try: + res_iter = self._fdb.list(self._requests[0].as_mars_request_for_step_index(0), keys=True) + first_field = next(res_iter) + except StopIteration: + raise ZfdbError("No data found for first request / first step to establish size of fields.") + + # TODO(kkratz): What errors can be emited from retrieve? msg = self._fdb.retrieve(first_field["keys"]) tmp_path = pathlib.Path("tmp.grib") tmp_path.write_bytes(msg.read()) @@ -170,13 +181,10 @@ def __init__( gid = eccodes.codes_new_from_file(f, eccodes.CODES_PRODUCT_GRIB) values_count = eccodes.codes_get(gid, "numberOfValues") eccodes.codes_release(gid) + if not isinstance(values_count, int): + raise ZfdbError("Grib message does not contain 'numberOfValues', cannot establish size of fields") tmp_path.unlink() - - self._shape = (int(steps_count), fields_count, int(1), values_count) - self._chunks = (1, fields_count, 1, values_count) - self._chunks_per_dimension = tuple( - [math.ceil(a / b) for (a, b) in zip(self._shape, self._chunks)] - ) + return values_count def create_dot_zarr_array(self) -> DotZarrArray: return DotZarrArray( From db005e8d96891d834e88ef2d7156dbac6444e23b Mon Sep 17 00:00:00 2001 From: Kai Kratz Date: Fri, 21 Feb 2025 10:18:15 +0100 Subject: [PATCH 3/5] Ruffed code --- server/client.py | 6 +++--- server/server.py | 10 +++++++--- src/zfdb/datasources.py | 12 +++++++++--- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/server/client.py b/server/client.py index 482a8af..e89f71a 100644 --- a/server/client.py +++ b/server/client.py @@ -14,8 +14,8 @@ response = requests.post(url, headers=headers, data=json.dumps(json_request)) print(response.content) - hash = response.json()["hash"] - store = fsspec.get_mapper(f'http://localhost:5000/get/zarr/{hash}') + hash = response.json()["hash"] + store = fsspec.get_mapper(f"http://localhost:5000/get/zarr/{hash}") z_grp = zarr.open(store, mode="r") print(z_grp.attrs) @@ -23,4 +23,4 @@ for x in z_grp.attrs.items(): print(x) - print(z_grp["data"][0,0,0,0:30]) + print(z_grp["data"][0, 0, 0, 0:30]) diff --git a/server/server.py b/server/server.py index d46b406..1ed4e1d 100644 --- a/server/server.py +++ b/server/server.py @@ -70,16 +70,20 @@ def process_json(): return to_response(hashed_request) + @app.route("/get/zarr/", methods=["GET"]) @app.route("/get/zarr//", methods=["GET"]) @app.route("/get/zarr///", methods=["GET"]) -@app.route( "/get/zarr////", methods=["GET"]) +@app.route( + "/get/zarr////", methods=["GET"] +) def retrieve_zarr(hash, root_grp=None, group_lvl_one=None, group_lvl_two=None): - print("Routes", hash, root_grp, group_lvl_one, group_lvl_two) print(view_hashes) - zarr_path = "/".join([frag for frag in [root_grp, group_lvl_one, group_lvl_two] if frag]) + zarr_path = "/".join( + [frag for frag in [root_grp, group_lvl_one, group_lvl_two] if frag] + ) try: mapping = view_hashes[int(hash)] diff --git a/src/zfdb/datasources.py b/src/zfdb/datasources.py index f31a3a7..ca25bc3 100644 --- a/src/zfdb/datasources.py +++ b/src/zfdb/datasources.py @@ -167,10 +167,14 @@ def __init__( def _query_number_of_values_in_field(self) -> int: try: - res_iter = self._fdb.list(self._requests[0].as_mars_request_for_step_index(0), keys=True) + res_iter = self._fdb.list( + self._requests[0].as_mars_request_for_step_index(0), keys=True + ) first_field = next(res_iter) except StopIteration: - raise ZfdbError("No data found for first request / first step to establish size of fields.") + raise ZfdbError( + "No data found for first request / first step to establish size of fields." + ) # TODO(kkratz): What errors can be emited from retrieve? msg = self._fdb.retrieve(first_field["keys"]) @@ -182,7 +186,9 @@ def _query_number_of_values_in_field(self) -> int: values_count = eccodes.codes_get(gid, "numberOfValues") eccodes.codes_release(gid) if not isinstance(values_count, int): - raise ZfdbError("Grib message does not contain 'numberOfValues', cannot establish size of fields") + raise ZfdbError( + "Grib message does not contain 'numberOfValues', cannot establish size of fields" + ) tmp_path.unlink() return values_count From a543305d0196e4ed501537f529550f27a03309b6 Mon Sep 17 00:00:00 2001 From: Kai Kratz Date: Fri, 21 Feb 2025 13:35:00 +0100 Subject: [PATCH 4/5] Extend error handling / add cli args --- server/client.py | 38 ++++++-- server/requirements.txt | 3 + server/server.py | 181 ++++++++++++++++++++++++-------------- src/zfdb/mapping.py | 4 + test.json | 25 ------ tests/zfdb/test_recipe.py | 1 - 6 files changed, 155 insertions(+), 97 deletions(-) mode change 100644 => 100755 server/client.py create mode 100644 server/requirements.txt mode change 100644 => 100755 server/server.py delete mode 100644 test.json diff --git a/server/client.py b/server/client.py old mode 100644 new mode 100755 index e89f71a..4799ac3 --- a/server/client.py +++ b/server/client.py @@ -1,17 +1,41 @@ +#!/usr/bin/env python + import json -import pathlib + +import fsspec import requests import zarr -import fsspec -if __name__ == "__main__": - json_request = None - with open(pathlib.Path("./test.json"), "r") as request_file: - json_request = json.loads(request_file.read()) +view = { + "requests": [ + { + "cls": "od", + "stream": "oper", + "expver": 1, + "typ": "fc", + "levtype": "sfc", + "date_time": "2025-01-01T00:00:00", + "steps": [0, 1, 2, 3, 4], + "params": ["165", "166"], + }, + { + "cls": "od", + "stream": "oper", + "expver": 1, + "typ": "fc", + "levtype": "pl", + "level": ["50", "100"], + "date_time": "2025-01-01T00:00:00", + "steps": [0, 1, 2, 3, 4], + "params": ["133", "130"], + }, + ] +} +if __name__ == "__main__": url = "http://localhost:5000/create" headers = {"Content-Type": "application/json"} - response = requests.post(url, headers=headers, data=json.dumps(json_request)) + response = requests.post(url, headers=headers, data=json.dumps(view)) print(response.content) hash = response.json()["hash"] diff --git a/server/requirements.txt b/server/requirements.txt new file mode 100644 index 0000000..92d4f31 --- /dev/null +++ b/server/requirements.txt @@ -0,0 +1,3 @@ +flask +aiohttp +requests diff --git a/server/server.py b/server/server.py old mode 100644 new mode 100755 index 1ed4e1d..c582ab4 --- a/server/server.py +++ b/server/server.py @@ -1,104 +1,157 @@ -from flask import Flask, Response, request, jsonify +#!/usr/bin/env python + +import argparse import json -import zfdb +import logging +import os +import pathlib +import sys +import numpy as np import pyfdb import pygribjump +from flask import Flask, Response, jsonify, request -import numpy as np - +import zfdb app = Flask(__name__) view_hashes = {} -fdb = pyfdb.FDB() -gribjump = pygribjump.GribJump() - def map_requests_from_json(json) -> list[zfdb.Request]: for r in json["requests"]: - print(np.datetime64(r["date_time"])) r["date_time"] = np.datetime64(r["date_time"]) - - print(f"{json}") - return [zfdb.Request(**r) for r in json["requests"]] -def to_response(hased_request: int) -> Response: - return_dict = {"hash": str(hased_request)} - return Response( - response=json.dumps(return_dict), - status=200, - content_type="application-type/json", - ) - - -@app.route("/") -def hello_world(): - return "Hello, World!" - - @app.route("/create", methods=["POST"]) def process_json(): - print(f"Request: {request}") data = request.get_json() if not data: return jsonify({"error": "Invalid JSON"}), 400 - - # In case we have this view for the requests already hashed_request = hash(json.dumps(data)) - print(f"Computed Hash: {hashed_request}") - if hashed_request in view_hashes: - return to_response(hashed_request) + if hashed_request not in view_hashes: + try: + requests = map_requests_from_json(data) + mapping = zfdb.make_forecast_data_view( + request=requests, + fdb=fdb, + gribjump=gribjump, + ) + except Exception as e: + logger.info(f"Create view failed with exception: {e}") + return jsonify({"error": f"Invalid Request - {e}"}), 400 + + view_hashes[hashed_request] = mapping + logger.debug( + f"Created new zfdb view {hashed_request}, {len(view_hashes)} views are now opened" + ) + else: + logger.debug("Using create request") - requests = map_requests_from_json(data) - - mapping = zfdb.make_forecast_data_view( - request=requests, - fdb=fdb, - gribjump=gribjump, + return Response( + response=json.dumps({"hash": hashed_request}), + status=200, + content_type="application-type/json", ) - view_hashes[hashed_request] = mapping - - print("Retrieved the following requests:") - for r in requests: - print(r) - - return to_response(hashed_request) - - -@app.route("/get/zarr/", methods=["GET"]) -@app.route("/get/zarr//", methods=["GET"]) -@app.route("/get/zarr///", methods=["GET"]) -@app.route( - "/get/zarr////", methods=["GET"] -) -def retrieve_zarr(hash, root_grp=None, group_lvl_one=None, group_lvl_two=None): - print("Routes", hash, root_grp, group_lvl_one, group_lvl_two) - print(view_hashes) - - zarr_path = "/".join( - [frag for frag in [root_grp, group_lvl_one, group_lvl_two] if frag] - ) +@app.route("/get/zarr//", methods=["GET"]) +def retrieve_zarr(hash, zarr_path): try: mapping = view_hashes[int(hash)] - except KeyError as ke: + except KeyError: return Response(response=f"Couldn't find hash in {hash}", status=500) - print(f"Zarr path: {zarr_path}") - try: content = mapping[zarr_path] - except KeyError as ke: + except KeyError: return Response(response="", status=404) return Response(response=content, status=200) +def log_environment(): + variables = [ + "FDB_HOME", + "FDB5_CONFIG_FILE", + "FDB_ENABLE_GRIBJUMP", + "GRIBJUMP_HOME", + "GRIBJUMP_CONFIG_FILE", + "GRIBJUMP_IGNORE_GRID", + ] + for var in variables: + try: + val = os.environ[var] + except KeyError: + val = "" + logger.info(f"{var}={val}") + + +def connect_to_fdb(args): + if args.fdb_config: + abs_path = args.fdb_config.expanduser().resolve() + if not abs_path.is_file(): + raise Exception(f"Cannot find fdb config file {abs_path}") + os.environ["FDB5_CONFIG_FILE"] = f"{abs_path}" + os.environ["FDB_ENABLE_GRIBJUMP"] = "1" + if args.gribjump_config: + abs_path = args.gribjump_config.expanduser().resolve() + if not abs_path.is_file(): + raise Exception(f"Cannot find gribjump config file {abs_path}") + os.environ["GRIBJUMP_CONFIG_FILE"] = f"{abs_path}" + os.environ["GRIBJUMP_IGNORE_GRID"] = "1" + + log_environment() + + global fdb + fdb = pyfdb.FDB() + global gribjump + gribjump = pygribjump.GribJump() + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument( + "-v", + "--verbose", + help="Enables verbose output, use -vv or -vvv for even more verbose output", + action="count", + default=0, + ) + parser.add_argument("--debug", help="Enables flask debug", action="store_true") + parser.add_argument( + "--fdb-config", + help="path to fdb config file, if not specified fdb searchs as usual", + type=pathlib.Path, + default=None, + ) + parser.add_argument( + "--gribjump-config", + help="path to gribjump config file, if not specified gribjump searchs as usual", + type=pathlib.Path, + default=None, + ) + + return parser.parse_args() + + if __name__ == "__main__": - app.run(debug=True) + args = parse_args() + if args.verbose == 0: + log_level = logging.WARNING + elif args.verbose == 1: + log_level = logging.INFO + else: + log_level = logging.DEBUG + + logging.basicConfig( + format="%(asctime)s %(message)s", stream=sys.stdout, level=log_level + ) + global logger + logger = logging.getLogger(__name__) + logger.info("Statring ZFDB Server") + connect_to_fdb(args) + app.run(debug=args.debug) diff --git a/src/zfdb/mapping.py b/src/zfdb/mapping.py index 1cd65c6..0d90cb3 100644 --- a/src/zfdb/mapping.py +++ b/src/zfdb/mapping.py @@ -106,6 +106,10 @@ def make_forecast_data_view( request: Request | list[Request], ) -> FdbZarrMapping: requests = request if isinstance(request, list) else [request] + if len(requests) > 1 and not all( + map(lambda x: x[0].matches_on_time_axis(x[1]), zip(requests[:-1], requests[1:])) + ): + raise ZfdbError("Requests are not matching on time axis") if not fdb: fdb = pyfdb.FDB() diff --git a/test.json b/test.json deleted file mode 100644 index b90554c..0000000 --- a/test.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "requests": [ - { - "cls": "od", - "stream": "oper", - "expver": 1, - "typ": "fc", - "levtype": "sfc", - "date_time": "2025-01-01T00:00:00", - "steps": [0, 1, 2, 3, 4], - "params": ["165", "166"] - }, - { - "cls": "od", - "stream": "oper", - "expver": 1, - "typ": "fc", - "levtype": "pl", - "level": ["50", "100"], - "date_time": "2025-01-01T00:00:00", - "steps": [0, 1, 2, 3, 4], - "params": ["133", "130"] - } - ] -} diff --git a/tests/zfdb/test_recipe.py b/tests/zfdb/test_recipe.py index dd23e3c..4ea238c 100644 --- a/tests/zfdb/test_recipe.py +++ b/tests/zfdb/test_recipe.py @@ -6,7 +6,6 @@ # granted to it by virtue of its status as an intergovernmental organisation # nor does it submit to any jurisdiction. -import pytest import yaml from zfdb.mapping import extract_mars_requests_from_recipe From 2305d8f3f7a69873c7eeddcd4ba4d61370b885ac Mon Sep 17 00:00:00 2001 From: Tobias Kremer Date: Fri, 21 Feb 2025 15:18:29 +0100 Subject: [PATCH 5/5] Minor: Added error message for 404 In case the mapping isn't aware of a given path, fill the returned 404 error with a message. --- server/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/server.py b/server/server.py index c582ab4..8b7ad6b 100755 --- a/server/server.py +++ b/server/server.py @@ -68,7 +68,7 @@ def retrieve_zarr(hash, zarr_path): try: content = mapping[zarr_path] except KeyError: - return Response(response="", status=404) + return Response(response=f"Didn't find {zarr_path} for mapping of hash {int(hash)}", status=404) return Response(response=content, status=200)