Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial draft of a potential Http Server for streaming Zarr #15

Merged
merged 5 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Contributors

## How to Contribute

We are currently not open for contributions on this project.

## Contributing Organisations

Significant contributions have been made by the following organisations:
- [ECMWF](https://www.ecmwf.int/)
13 changes: 0 additions & 13 deletions Notes.md

This file was deleted.

13 changes: 0 additions & 13 deletions contributors.md

This file was deleted.

File renamed without changes.
50 changes: 50 additions & 0 deletions server/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env python

import json

import fsspec
import requests
import zarr

view = {
"requests": [
{
"cls": "od",
"stream": "oper",
"expver": 1,
"typ": "fc",
"levtype": "sfc",
"date_time": "2025-01-01T00:00:00",
"steps": [0, 1, 2, 3, 4],
"params": ["165", "166"],
},
{
"cls": "od",
"stream": "oper",
"expver": 1,
"typ": "fc",
"levtype": "pl",
"level": ["50", "100"],
"date_time": "2025-01-01T00:00:00",
"steps": [0, 1, 2, 3, 4],
"params": ["133", "130"],
},
]
}

if __name__ == "__main__":
url = "http://localhost:5000/create"
headers = {"Content-Type": "application/json"}
response = requests.post(url, headers=headers, data=json.dumps(view))

print(response.content)
hash = response.json()["hash"]
store = fsspec.get_mapper(f"http://localhost:5000/get/zarr/{hash}")
z_grp = zarr.open(store, mode="r")

print(z_grp.attrs)

for x in z_grp.attrs.items():
print(x)

print(z_grp["data"][0, 0, 0, 0:30])
3 changes: 3 additions & 0 deletions server/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
flask
aiohttp
requests
157 changes: 157 additions & 0 deletions server/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#!/usr/bin/env python

import argparse
import json
import logging
import os
import pathlib
import sys

import numpy as np
import pyfdb
import pygribjump
from flask import Flask, Response, jsonify, request

import zfdb

app = Flask(__name__)

view_hashes = {}


def map_requests_from_json(json) -> list[zfdb.Request]:
for r in json["requests"]:
r["date_time"] = np.datetime64(r["date_time"])
return [zfdb.Request(**r) for r in json["requests"]]


@app.route("/create", methods=["POST"])
def process_json():
data = request.get_json()
if not data:
return jsonify({"error": "Invalid JSON"}), 400
hashed_request = hash(json.dumps(data))

if hashed_request not in view_hashes:
try:
requests = map_requests_from_json(data)
mapping = zfdb.make_forecast_data_view(
request=requests,
fdb=fdb,
gribjump=gribjump,
)
except Exception as e:
logger.info(f"Create view failed with exception: {e}")
return jsonify({"error": f"Invalid Request - {e}"}), 400

view_hashes[hashed_request] = mapping
logger.debug(
f"Created new zfdb view {hashed_request}, {len(view_hashes)} views are now opened"
)
else:
logger.debug("Using create request")

return Response(
response=json.dumps({"hash": hashed_request}),
status=200,
content_type="application-type/json",
)


@app.route("/get/zarr/<hash>/<path:zarr_path>", methods=["GET"])
def retrieve_zarr(hash, zarr_path):
try:
mapping = view_hashes[int(hash)]
except KeyError:
return Response(response=f"Couldn't find hash in {hash}", status=500)

try:
content = mapping[zarr_path]
except KeyError:
return Response(response=f"Didn't find {zarr_path} for mapping of hash {int(hash)}", status=404)

return Response(response=content, status=200)


def log_environment():
variables = [
"FDB_HOME",
"FDB5_CONFIG_FILE",
"FDB_ENABLE_GRIBJUMP",
"GRIBJUMP_HOME",
"GRIBJUMP_CONFIG_FILE",
"GRIBJUMP_IGNORE_GRID",
]
for var in variables:
try:
val = os.environ[var]
except KeyError:
val = "<NOT-SET>"
logger.info(f"{var}={val}")


def connect_to_fdb(args):
if args.fdb_config:
abs_path = args.fdb_config.expanduser().resolve()
if not abs_path.is_file():
raise Exception(f"Cannot find fdb config file {abs_path}")
os.environ["FDB5_CONFIG_FILE"] = f"{abs_path}"
os.environ["FDB_ENABLE_GRIBJUMP"] = "1"
if args.gribjump_config:
abs_path = args.gribjump_config.expanduser().resolve()
if not abs_path.is_file():
raise Exception(f"Cannot find gribjump config file {abs_path}")
os.environ["GRIBJUMP_CONFIG_FILE"] = f"{abs_path}"
os.environ["GRIBJUMP_IGNORE_GRID"] = "1"

log_environment()

global fdb
fdb = pyfdb.FDB()
global gribjump
gribjump = pygribjump.GribJump()


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"-v",
"--verbose",
help="Enables verbose output, use -vv or -vvv for even more verbose output",
action="count",
default=0,
)
parser.add_argument("--debug", help="Enables flask debug", action="store_true")
parser.add_argument(
"--fdb-config",
help="path to fdb config file, if not specified fdb searchs as usual",
type=pathlib.Path,
default=None,
)
parser.add_argument(
"--gribjump-config",
help="path to gribjump config file, if not specified gribjump searchs as usual",
type=pathlib.Path,
default=None,
)

return parser.parse_args()


if __name__ == "__main__":
args = parse_args()
if args.verbose == 0:
log_level = logging.WARNING
elif args.verbose == 1:
log_level = logging.INFO
else:
log_level = logging.DEBUG

logging.basicConfig(
format="%(asctime)s %(message)s", stream=sys.stdout, level=log_level
)
global logger
logger = logging.getLogger(__name__)
logger.info("Statring ZFDB Server")
connect_to_fdb(args)
app.run(debug=args.debug)
36 changes: 25 additions & 11 deletions src/zfdb/datasources.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,27 @@ def __init__(
self._gribjump = gribjump
self._requests = requests
self._gj = pygribjump.GribJump()
steps_count = requests[0].steps_count
fields_count = sum([r.field_count for r in requests])

first_field = next(
self._fdb.list(requests[0].as_mars_request_for_step_index(0), keys=True)
steps_count = self._requests[0].steps_count
fields_count = sum([r.field_count for r in self._requests])
values_count = self._query_number_of_values_in_field()
self._shape = (int(steps_count), fields_count, int(1), int(values_count))
self._chunks = (1, fields_count, 1, values_count)
self._chunks_per_dimension = tuple(
[math.ceil(a / b) for (a, b) in zip(self._shape, self._chunks)]
)

def _query_number_of_values_in_field(self) -> int:
try:
res_iter = self._fdb.list(
self._requests[0].as_mars_request_for_step_index(0), keys=True
)
first_field = next(res_iter)
except StopIteration:
raise ZfdbError(
"No data found for first request / first step to establish size of fields."
)

# TODO(kkratz): What errors can be emited from retrieve?
msg = self._fdb.retrieve(first_field["keys"])
tmp_path = pathlib.Path("tmp.grib")
tmp_path.write_bytes(msg.read())
Expand All @@ -170,13 +185,12 @@ def __init__(
gid = eccodes.codes_new_from_file(f, eccodes.CODES_PRODUCT_GRIB)
values_count = eccodes.codes_get(gid, "numberOfValues")
eccodes.codes_release(gid)
if not isinstance(values_count, int):
raise ZfdbError(
"Grib message does not contain 'numberOfValues', cannot establish size of fields"
)
tmp_path.unlink()

self._shape = (int(steps_count), fields_count, int(1), values_count)
self._chunks = (1, fields_count, 1, values_count)
self._chunks_per_dimension = tuple(
[math.ceil(a / b) for (a, b) in zip(self._shape, self._chunks)]
)
return values_count

def create_dot_zarr_array(self) -> DotZarrArray:
return DotZarrArray(
Expand Down
4 changes: 4 additions & 0 deletions src/zfdb/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ def make_forecast_data_view(
request: Request | list[Request],
) -> FdbZarrMapping:
requests = request if isinstance(request, list) else [request]
if len(requests) > 1 and not all(
map(lambda x: x[0].matches_on_time_axis(x[1]), zip(requests[:-1], requests[1:]))
):
raise ZfdbError("Requests are not matching on time axis")

if not fdb:
fdb = pyfdb.FDB()
Expand Down
1 change: 0 additions & 1 deletion tests/zfdb/test_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

import pytest
import yaml

from zfdb.mapping import extract_mars_requests_from_recipe
Expand Down