From a8ac010eb67601ce21ae0c10606971686faa81ff Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 27 Oct 2023 11:56:54 -0500 Subject: [PATCH 1/5] remove filesystem --- tests/tpch/conftest.py | 4 ++-- tests/tpch/test_dask.py | 46 ++++++++++++++++++++--------------------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/tests/tpch/conftest.py b/tests/tpch/conftest.py index bb70c2d0c4..c5bc671a67 100644 --- a/tests/tpch/conftest.py +++ b/tests/tpch/conftest.py @@ -172,8 +172,8 @@ def cluster_spec(scale): } elif scale == 100: return { - "worker_vm_types": ["m6i.large"], - "n_workers": 16, + "worker_vm_types": ["m6i.xlarge"], + "n_workers": 8, **everywhere, } elif scale == 1000: diff --git a/tests/tpch/test_dask.py b/tests/tpch/test_dask.py index d848c6ff2c..13fa526735 100644 --- a/tests/tpch/test_dask.py +++ b/tests/tpch/test_dask.py @@ -5,7 +5,7 @@ def test_query_1(client, dataset_path, fs): VAR1 = datetime(1998, 9, 2) - lineitem_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs) + lineitem_ds = dd.read_parquet(dataset_path + "lineitem") lineitem_filtered = lineitem_ds[lineitem_ds.l_shipdate <= VAR1] lineitem_filtered["sum_qty"] = lineitem_filtered.l_quantity @@ -45,11 +45,11 @@ def test_query_2(client, dataset_path, fs): var2 = "BRASS" var3 = "EUROPE" - region_ds = dd.read_parquet(dataset_path + "region", filesystem=fs) - nation_filtered = dd.read_parquet(dataset_path + "nation", filesystem=fs) - supplier_filtered = dd.read_parquet(dataset_path + "supplier", filesystem=fs) - part_filtered = dd.read_parquet(dataset_path + "part", filesystem=fs) - partsupp_filtered = dd.read_parquet(dataset_path + "partsupp", filesystem=fs) + region_ds = dd.read_parquet(dataset_path + "region") + nation_filtered = dd.read_parquet(dataset_path + "nation") + supplier_filtered = dd.read_parquet(dataset_path + "supplier") + part_filtered = dd.read_parquet(dataset_path + "part") + partsupp_filtered = dd.read_parquet(dataset_path + "partsupp") region_filtered = region_ds[(region_ds["r_name"] == var3)] r_n_merged = nation_filtered.merge( @@ -112,9 +112,9 @@ def test_query_3(client, dataset_path, fs): var1 = datetime.strptime("1995-03-15", "%Y-%m-%d") var2 = "BUILDING" - lineitem_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs) - orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs) - cutomer_ds = dd.read_parquet(dataset_path + "customer", filesystem=fs) + lineitem_ds = dd.read_parquet(dataset_path + "lineitem") + orders_ds = dd.read_parquet(dataset_path + "orders") + cutomer_ds = dd.read_parquet(dataset_path + "customer") lsel = lineitem_ds.l_shipdate > var1 osel = orders_ds.o_orderdate < var1 @@ -137,8 +137,8 @@ def test_query_4(client, dataset_path, fs): date1 = datetime.strptime("1993-10-01", "%Y-%m-%d") date2 = datetime.strptime("1993-07-01", "%Y-%m-%d") - line_item_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs) - orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs) + line_item_ds = dd.read_parquet(dataset_path + "lineitem") + orders_ds = dd.read_parquet(dataset_path + "orders") lsel = line_item_ds.l_commitdate < line_item_ds.l_receiptdate osel = (orders_ds.o_orderdate < date1) & (orders_ds.o_orderdate >= date2) @@ -160,12 +160,12 @@ def test_query_5(client, dataset_path, fs): date1 = datetime.strptime("1994-01-01", "%Y-%m-%d") date2 = datetime.strptime("1995-01-01", "%Y-%m-%d") - region_ds = dd.read_parquet(dataset_path + "region", filesystem=fs) - nation_ds = dd.read_parquet(dataset_path + "nation", filesystem=fs) - customer_ds = dd.read_parquet(dataset_path + "customer", filesystem=fs) - line_item_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs) - orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs) - supplier_ds = dd.read_parquet(dataset_path + "supplier", filesystem=fs) + region_ds = dd.read_parquet(dataset_path + "region") + nation_ds = dd.read_parquet(dataset_path + "nation") + customer_ds = dd.read_parquet(dataset_path + "customer") + line_item_ds = dd.read_parquet(dataset_path + "lineitem") + orders_ds = dd.read_parquet(dataset_path + "orders") + supplier_ds = dd.read_parquet(dataset_path + "supplier") rsel = region_ds.r_name == "ASIA" osel = (orders_ds.o_orderdate >= date1) & (orders_ds.o_orderdate < date2) @@ -190,7 +190,7 @@ def test_query_6(client, dataset_path, fs): date2 = datetime.strptime("1995-01-01", "%Y-%m-%d") var3 = 24 - line_item_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs) + line_item_ds = dd.read_parquet(dataset_path + "lineitem") sel = ( (line_item_ds.l_shipdate >= date1) @@ -208,11 +208,11 @@ def test_query_7(client, dataset_path, fs): var1 = datetime.strptime("1995-01-01", "%Y-%m-%d") var2 = datetime.strptime("1997-01-01", "%Y-%m-%d") - nation_ds = dd.read_parquet(dataset_path + "nation", filesystem=fs) - customer_ds = dd.read_parquet(dataset_path + "customer", filesystem=fs) - line_item_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs) - orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs) - supplier_ds = dd.read_parquet(dataset_path + "supplier", filesystem=fs) + nation_ds = dd.read_parquet(dataset_path + "nation") + customer_ds = dd.read_parquet(dataset_path + "customer") + line_item_ds = dd.read_parquet(dataset_path + "lineitem") + orders_ds = dd.read_parquet(dataset_path + "orders") + supplier_ds = dd.read_parquet(dataset_path + "supplier") lineitem_filtered = line_item_ds[ (line_item_ds["l_shipdate"] >= var1) & (line_item_ds["l_shipdate"] < var2) From b5ac48ad0a84093e144aa0521cd1c38d6e598dd1 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 27 Oct 2023 20:27:19 -0500 Subject: [PATCH 2/5] Minor updates to TPC-H --- tests/tpch/conftest.py | 10 +++++----- tests/tpch/test_polars.py | 5 ++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/tests/tpch/conftest.py b/tests/tpch/conftest.py index c5bc671a67..41b35600ce 100644 --- a/tests/tpch/conftest.py +++ b/tests/tpch/conftest.py @@ -172,13 +172,13 @@ def cluster_spec(scale): } elif scale == 100: return { - "worker_vm_types": ["m6i.xlarge"], - "n_workers": 8, + "worker_vm_types": ["m6i.large"], + "n_workers": 16, **everywhere, } elif scale == 1000: return { - "worker_vm_types": ["m6i.large"], + "worker_vm_types": ["m6i.xlarge"], "n_workers": 32, **everywhere, } @@ -325,7 +325,7 @@ def machine_spec(scale): } elif scale == 1000: return { - "vm_type": "m6i.16xlarge", + "vm_type": "m6i.32xlarge", } elif scale == 10000: return { @@ -399,7 +399,7 @@ def make_chart(request, name, tmp_path_factory, local, scale): with lock: generate( - outfile=os.path.join("charts", f"{local}-{scale}-query-{name}.json"), + outfile=os.path.join("charts", f"{local}-{scale}-{name}.json"), name=name, scale=scale, ) diff --git a/tests/tpch/test_polars.py b/tests/tpch/test_polars.py index 6925513381..18e313f85e 100644 --- a/tests/tpch/test_polars.py +++ b/tests/tpch/test_polars.py @@ -7,10 +7,9 @@ def read_data(filename): - pyarrow_dataset = dataset(filename, format="parquet") - return pl.scan_pyarrow_dataset(pyarrow_dataset) - if filename.startswith("s3://"): + pyarrow_dataset = dataset(filename, format="parquet") + return pl.scan_pyarrow_dataset(pyarrow_dataset) import boto3 session = boto3.session.Session() From 4ca0483b16a5fde1f13797a5890378c27b4e1e4a Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 28 Oct 2023 10:16:26 -0500 Subject: [PATCH 3/5] more random changes --- tests/tpch/conftest.py | 14 ++++++++------ tests/tpch/generate-data.py | 19 +++++++++++++------ tests/tpch/generate_plot.py | 6 ++++-- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/tests/tpch/conftest.py b/tests/tpch/conftest.py index 41b35600ce..80d4f625dd 100644 --- a/tests/tpch/conftest.py +++ b/tests/tpch/conftest.py @@ -63,8 +63,8 @@ def dataset_path(local, scale): } local_paths = { 1: "./tpch-data/scale-1/", - 10: "./tpch-data/scale10/", - 100: "./tpch-data/scale100/", + 10: "./tpch-data/scale-10/", + 100: "./tpch-data/scale-100/", } if local: @@ -167,7 +167,7 @@ def cluster_spec(scale): if scale == 10: return { "worker_vm_types": ["m6i.large"], - "n_workers": 16, + "n_workers": 8, **everywhere, } elif scale == 100: @@ -203,8 +203,9 @@ def cluster( make_chart, ): if local: - with LocalCluster() as cluster: - yield cluster + with dask.config.set({"distributed.scheduler.worker-saturation": 4}): + with LocalCluster() as cluster: + yield cluster else: kwargs = dict( name=f"tpch-{module}-{scale}-{name}", @@ -317,7 +318,7 @@ def fs(local): def machine_spec(scale): if scale == 10: return { - "vm_type": "m6i.8xlarge", + "vm_type": "m6i.4xlarge", } elif scale == 100: return { @@ -402,4 +403,5 @@ def make_chart(request, name, tmp_path_factory, local, scale): outfile=os.path.join("charts", f"{local}-{scale}-{name}.json"), name=name, scale=scale, + local=local, ) diff --git a/tests/tpch/generate-data.py b/tests/tpch/generate-data.py index c5476cd71a..e2b94d8770 100644 --- a/tests/tpch/generate-data.py +++ b/tests/tpch/generate-data.py @@ -1,4 +1,5 @@ import functools +import multiprocessing import pathlib import tempfile import warnings @@ -19,7 +20,7 @@ def generate( scale: int = 10, partition_size: str = "128 MiB", path: str = "./tpch-data", - relaxed_schema: bool = False, + relaxed_schema: bool = True, ): if str(path).startswith("s3"): path += "/" if not path.endswith("/") else "" @@ -56,7 +57,14 @@ def generate( jobs = client.map(_tpch_data_gen, range(0, scale), **kwargs) client.gather(jobs) else: - _tpch_data_gen(step=None, **kwargs) + with dask.distributed.LocalCluster( + threads_per_worker=1, + memory_limit=dask.distributed.system.MEMORY_LIMIT, + n_workers=multiprocessing.cpu_count() // 2, + ) as cluster: + with cluster.get_client() as client: + jobs = client.map(_tpch_data_gen, range(0, scale), **kwargs) + client.gather(jobs) def retry(f): @@ -117,7 +125,6 @@ def _tpch_data_gen( f""" SET memory_limit='{psutil.virtual_memory().available // 2**30 }G'; SET preserve_insertion_order=false; - SET threads TO 1; SET enable_progress_bar=false; """ ) @@ -166,8 +173,8 @@ def _tpch_data_gen( (format parquet, per_thread_output true, filename_pattern "{table}_{{uuid}}", overwrite_or_ignore) """ ) - print(f"Finished exporting table {table}!") - print("Finished exporting all data!") + print(f"Finished exporting table {table}") + print("Finished exporting all data") def rows_approx_mb(con, table_name, partition_size: str): @@ -246,7 +253,7 @@ def get_bucket_region(path: str): ) @click.option( "--relaxed-schema", - default=False, + default=True, flag_value=True, help="Set flag to convert official TPC-H types decimal -> float and date -> timestamp_s", ) diff --git a/tests/tpch/generate_plot.py b/tests/tpch/generate_plot.py index 198ddee0f1..c3063d65da 100644 --- a/tests/tpch/generate_plot.py +++ b/tests/tpch/generate_plot.py @@ -3,7 +3,7 @@ import pandas as pd -def generate(outfile="chart.json", name=None, scale=None): +def generate(outfile="chart.json", name=None, scale=None, local=None): df = pd.read_sql_table(table_name="test_run", con="sqlite:///benchmark.db") df = df[ @@ -47,7 +47,9 @@ def recent(df): ), tooltip=["library", "duration"], ) - .properties(title=f"TPC-H -- scale:{df.scale.iloc[0]} name:{df.name.iloc[0]}") + .properties( + title=f"TPC-H: {local} scale {df.scale.iloc[0]} -- {df.name.iloc[0]}" + ) .configure_title( fontSize=20, ) From ed65234f0fa167fe732e6ede67c8a2eff0b9f941 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 30 Oct 2023 06:25:54 -0500 Subject: [PATCH 4/5] minor changes --- tests/tpch/generate-data.py | 5 +++-- tests/tpch/test_polars.py | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/tpch/generate-data.py b/tests/tpch/generate-data.py index e2b94d8770..46d4af1411 100644 --- a/tests/tpch/generate-data.py +++ b/tests/tpch/generate-data.py @@ -20,7 +20,7 @@ def generate( scale: int = 10, partition_size: str = "128 MiB", path: str = "./tpch-data", - relaxed_schema: bool = True, + relaxed_schema: bool = False, ): if str(path).startswith("s3"): path += "/" if not path.endswith("/") else "" @@ -124,6 +124,7 @@ def _tpch_data_gen( con.sql( f""" SET memory_limit='{psutil.virtual_memory().available // 2**30 }G'; + SET threads TO 1; SET preserve_insertion_order=false; SET enable_progress_bar=false; """ @@ -253,7 +254,7 @@ def get_bucket_region(path: str): ) @click.option( "--relaxed-schema", - default=True, + default=False, flag_value=True, help="Set flag to convert official TPC-H types decimal -> float and date -> timestamp_s", ) diff --git a/tests/tpch/test_polars.py b/tests/tpch/test_polars.py index 18e313f85e..99ad4a9734 100644 --- a/tests/tpch/test_polars.py +++ b/tests/tpch/test_polars.py @@ -15,11 +15,12 @@ def read_data(filename): session = boto3.session.Session() credentials = session.get_credentials() return pl.scan_parquet( - filename, + filename + "/*", storage_options={ "aws_access_key_id": credentials.access_key, "aws_secret_access_key": credentials.secret_key, "region": "us-east-2", + "session_token": credentials.token, }, ) else: From 0d61d2827c387366e8a4f0850dfe772eef676588 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 30 Oct 2023 09:46:16 -0500 Subject: [PATCH 5/5] [skip ci] use spot for data generation --- tests/tpch/generate-data.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/tpch/generate-data.py b/tests/tpch/generate-data.py index 46d4af1411..512f1069ab 100644 --- a/tests/tpch/generate-data.py +++ b/tests/tpch/generate-data.py @@ -50,9 +50,10 @@ def generate( # workload is best with 1vCPU and ~3-4GiB memory worker_vm_types=["m7a.medium", "m3.medium"], worker_options={"nthreads": 1}, + spot_policy="spot_with_fallback", region=REGION, ) as cluster: - cluster.adapt(minimum=1, maximum=350) + cluster.adapt(minimum=1, maximum=500) with cluster.get_client() as client: jobs = client.map(_tpch_data_gen, range(0, scale), **kwargs) client.gather(jobs)