BuildPy is a workflow engine to manage data analysis pipelines. It has following features:
- Integration for
- BigQuery (
"bq://project.dataset.table") - Google Cloud Storage (
"gs://bucket/blob") - S3 (
"s3://bucket/path/to/objec")
- BigQuery (
- Parallel processing (similar to
--jobsof GNU Make) - Checksum-based update scheme (similar to SCons)
- Dynamic job declaration
- Support for prioritized job declaration
- Job scheduling based on load average (similar to
--load-averageof GNU Make) - DOT-format output of a dependency graph (similar to
--prereqsof Rake) - Deferred error (similar to
--keep-goingof GNU Make) - Dry-run (similar to
--dry-runof GNU Make) - Declaration of multiple targets for a single job
- Versioned API (
buildpy.v1,buildpy.v2, ...)
BuildPy requires Python version ≥ 3.6 and is available from PyPI and conda-forge:
# Pip
pip install buildpy
# Conda
conda install -c defaults -c conda-forge buildpyThe typical form of build.py is as follows:
python build.py --help
python build.py all --jobs="$(nproc)" --keep-goingimport sys
import buildpy.vx
dsl = buildpy.vx.DSL(sys.argv)
file = dsl.file
phony = dsl.phony
sh = dsl.sh
all_jobs = []
test_jobs = []
all_jobs.append("test")
test_jobs.extend(["main.exe.log1", "main.exe.log2"])
@file(["main.exe.log1", "main.exe.log2"], ["main.exe"])
def _(j):
# j.ts: list of targets
# j.ds: list of dependencies
sh(f"./{j.ds[0]} 1> {j.ts[0]} 2> {j.ts[1]}")
all_jobs.append("build")
test_jobs.append("main.exe")
@file("main.exe", ["main.c"])
def _(j):
sh(f"gcc -o {j.ts[0]} {j.ds[0]}")
phony("all", all_jobs)
phony("test", test_jobs)
if __name__ == '__main__':
dsl.run()Please see ./build.py and buildpy/v*/tests/*.sh for more examples.
After importing the buildpy module, please make a DSL instance by dsl = buildpy.vx.DSL(sys.argv).
The instance, dsl, provides methods to construct a dependency graph and to execute the declared jobs.
dsl.file is used to declare the dependencies and the command to make target files.
dsl.file is used as follows:
# Make `target` from `dep1` and `dep2` by `cat dep1 dep2 >| target`.
# You are able to pass a description of the job via the `desc` keyword argument.
@dsl.file("target", ["dep1", "dep2"], desc="Optional description argument")
def _(job):
dsl.sh(f"cat {' '.join(job.ds)} >| {job.ts[0]}")
# You are able to declare a job to make multiple outputs via a single command invocation.
# In the following example, `target1` and `target2` are made by `diff dep1 dep2 1>| target1 2>| target2`.
@dsl.file(["target1", "target2"], ["dep1", "dep2"])
def _(job):
dsl.sh(f"diff {' '.join(job.ds)} 1>| {job.ts[0]} 2>| {job.ts[1]}")Like the task method of Rake or .PHONY rule of GNU Make, you are able to declare a job, which does not produce target files, by using dsl.phony.
dsl.phony is used as follows:
# Make a phony target named `taregetA`, which depends on `dep1` and `dep2`.
# An invocation of `targetA` executes the decorated method, `_`, and prints `targetA invoked.`
@dsl.phony("targetA", ["dep1", "dep2"], desc="Optional description argument")
def _(job):
print(job.ts[0] + " invoked.")
# Make a phony target named `taregetB`, which depends on `dep3` and `dep4`.
# An invocation of `targetB` executes no command.
dsl.phony("targetB", ["dep3", "dep4"])The phony target named all is invoked if no target is specified on the command line.
If you want to make libfinalproduct.so by default, please add the following line to your build.py:
dsl.phony("all", ["libfinalproduct.so"])To execute the declared jobs, please add the following line to your build.py:
dsl.run()hash/->hash/ts
buildpy_dirno longer containsos.getcwd().
with_symlinknow uses the relative path instead of the absolute path.
- Add
DSL.lazy_callandDSL.lazy_val. with_symlinknow modifies the DAG.
- Add
DSL.with_symlink. - Use SHA256 instead of SHA1.
- Support
argparse.Namespaceforj.ts,j.ds, andj.data.
- Support automatic target naming via
@file(auto=True).
- Support specifying targets and dependencies as
dicts.
- Support machine readable logging (
--execution_log_dir <dir>). - Add
DSL.check_existence_only.
- Make the termination of subprocesses optional.
- Set
use_hash=Trueby default.
- Remove the complicated dynamic graph update capability, which has been rarely used.
- Support
cutargument forDSL.phonyandDSL.file
-
Dynamic DAG (accessible through
j.tyandj.dy). -
Kill all subprocesses on a failure (if not
--keep-going) or SIGINT. -
Add
_Job.data.@file(ts, ds, data=dict(params=dict(a=1, b=2))) def this(j): print(j.data["params"])
- Improve error messages.
- Print more informative error messages.
- Cache clients.
-
Support BigQuery (
"bq://project.dataset.table") -
Support Google Cloud Storage (
"gs://bucket/blob")import sys import buildpy.v4 dsl = buildpy.v4.DSL() pyony = dsl.phony file = dsl.file sh = dsl.sh uriparse = dsl.uriparse @file(["bq://myproject.mydataset.mytable"], ["gs://mybucket/myblob.csv"]) def _(j): project, dataset, table = uriparse(j.ts[0]).netloc.split(".", 2) sh(f""" bq load --autodetect {project}:{dataset}.{table} {j.ds[0]} """) phony("all", ["bq://myproject.mydataset.mytable"]) if __name__ == "__main__": dsl.main(sys.argv)
- Add
DSL.cd.
- Add
DSL.serialize, a canonical serializer.
- Tweak cache directory naming convention
- Respect
job.prioritya bit more
- Support
{file,phony}(priority=)(smaller is higher).
- Support
DSL.rm("dir")
- Use the JSON format to store cache files
buildpy.vx.loggerno longer has handlers.
- Support parallel execution of serial jobs
Add "buildpy.v9" to setup.py.
cd buildpy
git mv v9 v10
cp -a vx v9
cd v9
grep -l buildpy.vx -R . | xargs -n1 sed -i'' -e 's/buildpy.vx/buildpy.v9/g'
find . -type f | grep -v done | xargs git add
cd ../..
# edit setup.py
python build.py sdistpython3 -m venv venv
venv/bin/python3 pip install -e .[dev]
venv/bin/python3 build.py -h
- v6: fully support the
--cutargument.