Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors pygama.flow away from the legacy FileDB/DataLoader-based workflow toward a new “structured query” API that builds LH5Iterator instances and exposes higher-level helpers for querying tables and producing histograms.
Changes:
- Removed the legacy flow stack (
DataLoader,FileDB, and related utils). - Added new query-oriented APIs:
build_iterator,query_data,query_hist, andquery_evt. - Updated
pygama.flowpublic exports and module documentation to reflect the new query interface.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
| src/pygama/flow/init.py | Replaces the public surface/docs from DataLoader/FileDB to query functions. |
| src/pygama/flow/build_iterator.py | New helper to construct an LH5Iterator across tiers based on metadata queries. |
| src/pygama/flow/query_data.py | New table query helper built on build_iterator + LH5Iterator.query. |
| src/pygama/flow/query_hist.py | New histogram query helper built on build_iterator + LH5Iterator.hist. |
| src/pygama/flow/query_evt.py | New evt-tier-only query helper using query_runs + direct iterator construction. |
| src/pygama/flow/utils.py | Removed legacy utility functions used by the old flow stack. |
| src/pygama/flow/file_db.py | Removed legacy FileDB implementation. |
| src/pygama/flow/data_loader.py | Removed legacy DataLoader implementation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| runs: str | ak.Array | Mapping[np.ndarray] | pd.DataFrame, | ||
| channels: str | ak.Array | Mapping[np.ndarray] | pd.DataFrame, |
There was a problem hiding this comment.
Type annotation Mapping[np.ndarray] is invalid on Python 3.10+ and will raise at import time. Use Mapping[str, np.ndarray] (or similar) or remove the subscripting.
| runs: str | ak.Array | Mapping[np.ndarray] | pd.DataFrame, | |
| channels: str | ak.Array | Mapping[np.ndarray] | pd.DataFrame, | |
| runs: str | ak.Array | Mapping[str, np.ndarray] | pd.DataFrame, | |
| channels: str | ak.Array | Mapping[str, np.ndarray] | pd.DataFrame, |
| import pandas as pd | ||
| from dbetto import Props | ||
| from legendmeta.query import _format_vars, parse_query_paths, query_runs | ||
| from lh5 import LH5Iterator |
There was a problem hiding this comment.
This imports LH5Iterator from a top-level lh5 module, but the project dependencies (pyproject.toml) don’t currently include a package that provides lh5 (the rest of the codebase uses lgdo.lh5). This is likely to fail at runtime unless the dependency change is included here.
| from lh5 import LH5Iterator | |
| from lgdo.lh5 import LH5Iterator |
| import pandas as pd | ||
| from dbetto import Props | ||
| from legendmeta.query import _format_vars, parse_query_paths, query_meta | ||
| from lh5 import LH5Iterator |
There was a problem hiding this comment.
This imports LH5Iterator from a top-level lh5 module, but pyproject.toml does not declare a dependency that provides lh5 (the codebase elsewhere uses lgdo.lh5). As-is, this is likely to fail at runtime in CI/user installs. Either switch to the existing import path (lgdo.lh5) or add/lock the new dependency providing lh5 as part of this PR.
| from lh5 import LH5Iterator | |
| from lgdo.lh5 import LH5Iterator |
| if executor is None and isinstance(processes, int): | ||
| executor = ProcessPoolExecutor(processes) | ||
|
|
||
| lh5_it, alias_map = build_iterator( | ||
| {f for f, _, _ in field_info + entries_fields}, | ||
| runs, | ||
| channels, | ||
| dataflow_config=dataflow_config, | ||
| return_alias_map=True, | ||
| processes=processes, | ||
| executor=executor, | ||
| **kwargs, | ||
| ) | ||
|
|
||
| fields = {} | ||
| for _, alias, path in field_info: | ||
| if path in alias_map: | ||
| fields[alias_map[path]] = None | ||
| else: | ||
| fields[path] = alias | ||
|
|
||
| ret = lh5_it.query( | ||
| entries, | ||
| fields=fields if not return_query_vals else None, | ||
| processes=processes, | ||
| executor=executor, | ||
| library=library, | ||
| ) | ||
|
|
||
| if return_alias_map: | ||
| return ret, alias_map | ||
| return ret |
There was a problem hiding this comment.
When executor is not provided, this function creates a ProcessPoolExecutor but never shuts it down. That can leave worker processes running and hang interpreters/tests. If you create the executor internally, ensure it’s shut down (e.g., use a context manager or call shutdown() in a finally block after lh5_it.query(...)).
| if executor is None and isinstance(processes, int): | |
| executor = ProcessPoolExecutor(processes) | |
| lh5_it, alias_map = build_iterator( | |
| {f for f, _, _ in field_info + entries_fields}, | |
| runs, | |
| channels, | |
| dataflow_config=dataflow_config, | |
| return_alias_map=True, | |
| processes=processes, | |
| executor=executor, | |
| **kwargs, | |
| ) | |
| fields = {} | |
| for _, alias, path in field_info: | |
| if path in alias_map: | |
| fields[alias_map[path]] = None | |
| else: | |
| fields[path] = alias | |
| ret = lh5_it.query( | |
| entries, | |
| fields=fields if not return_query_vals else None, | |
| processes=processes, | |
| executor=executor, | |
| library=library, | |
| ) | |
| if return_alias_map: | |
| return ret, alias_map | |
| return ret | |
| created_executor = False | |
| if executor is None and isinstance(processes, int): | |
| executor = ProcessPoolExecutor(processes) | |
| created_executor = True | |
| try: | |
| lh5_it, alias_map = build_iterator( | |
| {f for f, _, _ in field_info + entries_fields}, | |
| runs, | |
| channels, | |
| dataflow_config=dataflow_config, | |
| return_alias_map=True, | |
| processes=processes, | |
| executor=executor, | |
| **kwargs, | |
| ) | |
| fields = {} | |
| for _, alias, path in field_info: | |
| if path in alias_map: | |
| fields[alias_map[path]] = None | |
| else: | |
| fields[path] = alias | |
| ret = lh5_it.query( | |
| entries, | |
| fields=fields if not return_query_vals else None, | |
| processes=processes, | |
| executor=executor, | |
| library=library, | |
| ) | |
| if return_alias_map: | |
| return ret, alias_map | |
| return ret | |
| finally: | |
| if created_executor and executor is not None: | |
| executor.shutdown(wait=True) |
| { | ||
| "cuspEmax_ctc_cal": axis.Regular(300, 0, 3000, label="Energy (keV)"), | ||
| "@det.name": axis.StrCategory(label="Detector", growth=True)" | ||
| } |
There was a problem hiding this comment.
Docstring example has a stray trailing quote and several examples in this file end with mismatched backticks (e.g., ``...````). These will render incorrectly in the generated docs and can produce Sphinx warnings. Please fix the quoting/backtick markup in the examples.
| lh5_it = new_it | ||
| else: | ||
| lh5_it.add_friend(new_it) | ||
|
|
There was a problem hiding this comment.
lh5_it can remain None if no suitable tiers are found/kept (e.g., tiers filters everything or all tier table formats include channel variables and are skipped). The unconditional lh5_it.reset_field_mask(...) will then raise AttributeError. Add a check after the loop and raise a clear error when no iterator could be built.
| if lh5_it is None: | |
| msg = ( | |
| "no suitable LH5 tiers were found for the requested fields; " | |
| "check 'tiers', 'tables', and requested field paths" | |
| ) | |
| raise ValueError(msg) |
| from legendmeta.query import query_meta, query_runs | ||
|
|
||
| from .build_iterator import build_iterator | ||
| from .query_data import query_data | ||
| from .query_evt import query_evt | ||
| from .query_hist import query_hist | ||
|
|
||
| __all__ = ["DataLoader", "FileDB"] | ||
| __all__ = [ | ||
| "query_runs", | ||
| "query_meta", | ||
| "query_data", | ||
| "query_hist", | ||
| "query_evt", | ||
| "build_iterator", | ||
| ] |
There was a problem hiding this comment.
This module no longer exports DataLoader and FileDB, and the corresponding implementation files were removed. The repository’s test suite still imports these symbols (tests/flow/test_data_loader.py, tests/flow/test_filedb.py), so CI will fail unless tests (and any downstream API expectations) are updated or a deprecation/compatibility layer is provided.
| runs: str | ak.Array | Mapping[np.ndarray] | pd.DataFrame, | ||
| channels: str | ak.Array | Mapping[np.ndarray] | pd.DataFrame, |
There was a problem hiding this comment.
Type annotation Mapping[np.ndarray] is invalid on Python 3.10+ (Mapping expects two type parameters). This will raise at import time and break the module. Use Mapping[str, np.ndarray] (or an appropriate key/value pairing) or drop the subscripting if the key type isn’t important.
| runs: str | ak.Array | Mapping[np.ndarray] | pd.DataFrame, | |
| channels: str | ak.Array | Mapping[np.ndarray] | pd.DataFrame, | |
| runs: str | ak.Array | Mapping[str, np.ndarray] | pd.DataFrame, | |
| channels: str | ak.Array | Mapping[str, np.ndarray] | pd.DataFrame, |
| *, | ||
| dataflow_config: Path | str | Mapping = "$REFPROD/dataflow-config.yaml", | ||
| tiers: Collection[str] = None, | ||
| tables: Collection[str] = None, |
There was a problem hiding this comment.
tables is annotated as Collection[str] but is used as a mapping (tables.items() and tables[tier]). This mismatch can hide real runtime errors. Update the type to something like Mapping[str, str] and consider validating that required tier keys exist before indexing to avoid KeyError when paths contains tiers not present in tables.
| tables: Collection[str] = None, | |
| tables: Mapping[str, str] | None = None, |
| runs: str | ak.Array | Mapping[np.ndarray] | pd.DataFrame, | ||
| channels: str | ak.Array | Mapping[np.ndarray] | pd.DataFrame, |
There was a problem hiding this comment.
Type annotation Mapping[np.ndarray] is invalid on Python 3.10+ and will raise at import time. Use Mapping[str, np.ndarray] (or similar) or remove the subscripting.
| runs: str | ak.Array | Mapping[np.ndarray] | pd.DataFrame, | |
| channels: str | ak.Array | Mapping[np.ndarray] | pd.DataFrame, | |
| runs: str | ak.Array | Mapping | pd.DataFrame, | |
| channels: str | ak.Array | Mapping | pd.DataFrame, |
Replaced existing flow module with one that includes a set of structured queries for accessing data. This depends on legend-exp/pylegendmeta#115 and legend-exp/legend-lh5io#9
Note that no tests exist for these functions at this time.