Skip to content

Commit 7381747

Browse files
authoredSep 2, 2022
Reorganize send_recv benchmarks backends and unify them (#873)
A substantial part of the send_recv and send_recv_core benchmarks consist of common code. By reorganizing the implementation into separate backends it's possible to unify them, thus reducing code duplication and opening up the possibility to introduce additional backends in more structured way.
1 parent da2ff7f commit 7381747

File tree

6 files changed

+582
-666
lines changed

6 files changed

+582
-666
lines changed
 

‎ci/gpu/build.sh

+2-2
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ function run_tests() {
101101
py.test --cache-clear -vs ucp/_libs/tests
102102

103103
gpuci_logger "Run local benchmark"
104-
python -m ucp.benchmarks.send_recv -o cupy --server-dev 0 --client-dev 0 --reuse-alloc
105-
python -m ucp.benchmarks.send_recv_core -o cupy --server-dev 0 --client-dev 0 --reuse-alloc
104+
python -m ucp.benchmarks.send_recv -o cupy --server-dev 0 --client-dev 0 --reuse-alloc --backend ucp-async
105+
python -m ucp.benchmarks.send_recv -o cupy --server-dev 0 --client-dev 0 --reuse-alloc --backend ucp-core
106106
python -m ucp.benchmarks.cudf_merge --chunks-per-dev 4 --chunk-size 10000 --rmm-init-pool-size 2097152
107107
}
108108

‎ucp/benchmarks/backends/base.py

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from abc import ABC, abstractmethod
2+
from argparse import Namespace
3+
from queue import Queue
4+
from typing import Any
5+
6+
7+
class BaseServer(ABC):
8+
@abstractmethod
9+
def __init__(self, args: Namespace, xp: Any, queue: Queue):
10+
"""
11+
Benchmark server.
12+
13+
Parameters
14+
----------
15+
args: argparse.Namespace
16+
Parsed command-line arguments that will be used as parameters during
17+
the `run` method.
18+
xp: module
19+
Module implementing the NumPy API to use for data generation.
20+
queue: Queue
21+
Queue object where server will put the port it is listening at.
22+
"""
23+
pass
24+
25+
@abstractmethod
26+
def run(self):
27+
"""
28+
Run the benchmark server.
29+
30+
The server is executed as follows:
31+
1. Start the listener and put port where it is listening into the queue
32+
registered in constructor;
33+
2. Setup any additional context (Active Message registration, memory buffers
34+
to reuse, etc.);
35+
3. Transfer data back-and-forth with client;
36+
4. Shutdown server.
37+
"""
38+
pass
39+
40+
41+
class BaseClient(ABC):
42+
@abstractmethod
43+
def __init__(
44+
self, args: Namespace, xp: Any, queue: Queue, server_address: str, port: int
45+
):
46+
"""
47+
Benchmark client.
48+
49+
Parameters
50+
----------
51+
args
52+
Parsed command-line arguments that will be used as parameters during
53+
the `run` method.
54+
xp
55+
Module implementing the NumPy API to use for data generation.
56+
queue
57+
Queue object where to put timing results.
58+
server_address
59+
Hostname or IP address where server is listening at.
60+
port
61+
Port where server is listening at.
62+
"""
63+
pass
64+
65+
@abstractmethod
66+
def run(self):
67+
"""
68+
Run the benchmark client.
69+
70+
The client is executed as follows:
71+
1. Connects to listener;
72+
2. Setup any additional context (Active Message registration, memory buffers
73+
to reuse, etc.);
74+
3. Transfer data back-and-forth with server;
75+
4. Shutdown client;
76+
5. Put timing results into the queue registered in constructor.
77+
"""
78+
pass
79+
80+
def print_backend_specific_config(self):
81+
"""
82+
Pretty print configuration specific to backend implementation.
83+
"""
84+
pass

‎ucp/benchmarks/backends/ucp_async.py

+146
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
import asyncio
2+
from argparse import Namespace
3+
from queue import Queue
4+
from time import monotonic
5+
from typing import Any
6+
7+
import ucp
8+
from ucp._libs.arr import Array
9+
from ucp._libs.utils import print_key_value
10+
from ucp.benchmarks.backends.base import BaseClient, BaseServer
11+
12+
13+
def register_am_allocators(args: Namespace):
14+
"""
15+
Register Active Message allocator in worker to correct memory type if the
16+
benchmark is set to use the Active Message API.
17+
18+
Parameters
19+
----------
20+
args
21+
Parsed command-line arguments that will be used as parameters during to
22+
determine whether the caller is using the Active Message API and what
23+
memory type.
24+
"""
25+
if not args.enable_am:
26+
return
27+
28+
import numpy as np
29+
30+
ucp.register_am_allocator(lambda n: np.empty(n, dtype=np.uint8), "host")
31+
32+
if args.object_type == "cupy":
33+
import cupy as cp
34+
35+
ucp.register_am_allocator(lambda n: cp.empty(n, dtype=cp.uint8), "cuda")
36+
elif args.object_type == "rmm":
37+
import rmm
38+
39+
ucp.register_am_allocator(lambda n: rmm.DeviceBuffer(size=n), "cuda")
40+
41+
42+
class UCXPyAsyncServer(BaseServer):
43+
def __init__(self, args: Namespace, xp: Any, queue: Queue):
44+
self.args = args
45+
self.xp = xp
46+
self.queue = queue
47+
48+
async def run(self):
49+
ucp.init()
50+
51+
register_am_allocators(self.args)
52+
53+
async def server_handler(ep):
54+
if not self.args.enable_am:
55+
msg_recv_list = []
56+
if self.args.reuse_alloc:
57+
t = Array(self.xp.zeros(self.args.n_bytes, dtype="u1"))
58+
for _ in range(self.args.n_iter + self.args.n_warmup_iter):
59+
msg_recv_list.append(t)
60+
else:
61+
for _ in range(self.args.n_iter + self.args.n_warmup_iter):
62+
msg_recv_list.append(
63+
self.xp.zeros(self.args.n_bytes, dtype="u1")
64+
)
65+
66+
assert msg_recv_list[0].nbytes == self.args.n_bytes
67+
68+
for i in range(self.args.n_iter + self.args.n_warmup_iter):
69+
if self.args.enable_am is True:
70+
recv = await ep.am_recv()
71+
await ep.am_send(recv)
72+
else:
73+
await ep.recv(msg_recv_list[i])
74+
await ep.send(msg_recv_list[i])
75+
await ep.close()
76+
lf.close()
77+
78+
lf = ucp.create_listener(server_handler, port=self.args.port)
79+
self.queue.put(lf.port)
80+
81+
while not lf.closed():
82+
await asyncio.sleep(0.5)
83+
84+
85+
class UCXPyAsyncClient(BaseClient):
86+
def __init__(
87+
self, args: Namespace, xp: Any, queue: Queue, server_address: str, port: int
88+
):
89+
self.args = args
90+
self.xp = xp
91+
self.queue = queue
92+
self.server_address = server_address
93+
self.port = port
94+
95+
async def run(self):
96+
ucp.init()
97+
98+
register_am_allocators(self.args)
99+
100+
ep = await ucp.create_endpoint(self.server_address, self.port)
101+
102+
if self.args.enable_am:
103+
msg = self.xp.arange(self.args.n_bytes, dtype="u1")
104+
else:
105+
msg_send_list = []
106+
msg_recv_list = []
107+
if self.args.reuse_alloc:
108+
t1 = Array(self.xp.arange(self.args.n_bytes, dtype="u1"))
109+
t2 = Array(self.xp.zeros(self.args.n_bytes, dtype="u1"))
110+
for i in range(self.args.n_iter + self.args.n_warmup_iter):
111+
msg_send_list.append(t1)
112+
msg_recv_list.append(t2)
113+
else:
114+
for i in range(self.args.n_iter + self.args.n_warmup_iter):
115+
msg_send_list.append(self.xp.arange(self.args.n_bytes, dtype="u1"))
116+
msg_recv_list.append(self.xp.zeros(self.args.n_bytes, dtype="u1"))
117+
118+
assert msg_send_list[0].nbytes == self.args.n_bytes
119+
assert msg_recv_list[0].nbytes == self.args.n_bytes
120+
121+
if self.args.cuda_profile:
122+
self.xp.cuda.profiler.start()
123+
times = []
124+
for i in range(self.args.n_iter + self.args.n_warmup_iter):
125+
start = monotonic()
126+
if self.args.enable_am:
127+
await ep.am_send(msg)
128+
await ep.am_recv()
129+
else:
130+
await ep.send(msg_send_list[i])
131+
await ep.recv(msg_recv_list[i])
132+
stop = monotonic()
133+
if i >= self.args.n_warmup_iter:
134+
times.append(stop - start)
135+
if self.args.cuda_profile:
136+
self.xp.cuda.profiler.stop()
137+
self.queue.put(times)
138+
139+
def print_backend_specific_config(self):
140+
print_key_value(
141+
key="Transfer API", value=f"{'AM' if self.args.enable_am else 'TAG'}"
142+
)
143+
print_key_value(key="UCX_TLS", value=f"{ucp.get_config()['TLS']}")
144+
print_key_value(
145+
key="UCX_NET_DEVICES", value=f"{ucp.get_config()['NET_DEVICES']}"
146+
)

‎ucp/benchmarks/backends/ucp_core.py

+291
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
from argparse import Namespace
2+
from queue import Queue
3+
from threading import Lock
4+
from time import monotonic
5+
from typing import Any
6+
7+
import ucp
8+
from ucp._libs import ucx_api
9+
from ucp._libs.arr import Array
10+
from ucp._libs.utils import print_key_value
11+
from ucp._libs.utils_test import (
12+
blocking_am_recv,
13+
blocking_am_send,
14+
blocking_recv,
15+
blocking_send,
16+
non_blocking_recv,
17+
non_blocking_send,
18+
)
19+
from ucp.benchmarks.backends.base import BaseClient, BaseServer
20+
21+
WireupMessage = bytearray(b"wireup")
22+
23+
24+
def register_am_allocators(args: Namespace, worker: ucx_api.UCXWorker):
25+
"""
26+
Register Active Message allocator in worker to correct memory type if the
27+
benchmark is set to use the Active Message API.
28+
29+
Parameters
30+
----------
31+
args
32+
Parsed command-line arguments that will be used as parameters during to
33+
determine whether the caller is using the Active Message API and what
34+
memory type.
35+
worker
36+
UCX-Py core Worker object where to register the allocator.
37+
"""
38+
if not args.enable_am:
39+
return
40+
41+
import numpy as np
42+
43+
worker.register_am_allocator(
44+
lambda n: np.empty(n, dtype=np.uint8), ucx_api.AllocatorType.HOST
45+
)
46+
47+
if args.object_type == "cupy":
48+
import cupy as cp
49+
50+
worker.register_am_allocator(
51+
lambda n: cp.empty(n, dtype=cp.uint8), ucx_api.AllocatorType.CUDA
52+
)
53+
elif args.object_type == "rmm":
54+
import rmm
55+
56+
worker.register_am_allocator(
57+
lambda n: rmm.DeviceBuffer(size=n), ucx_api.AllocatorType.CUDA
58+
)
59+
60+
61+
class UCXPyCoreServer(BaseServer):
62+
def __init__(self, args: Namespace, xp: Any, queue: Queue):
63+
self.args = args
64+
self.xp = xp
65+
self.queue = queue
66+
67+
def run(self):
68+
self.ep = None
69+
70+
ctx = ucx_api.UCXContext(
71+
feature_flags=(
72+
ucx_api.Feature.AM if self.args.enable_am else ucx_api.Feature.TAG,
73+
)
74+
)
75+
worker = ucx_api.UCXWorker(ctx)
76+
77+
register_am_allocators(self.args, worker)
78+
79+
op_lock = Lock()
80+
finished = [0]
81+
outstanding = [0]
82+
83+
def op_started():
84+
with op_lock:
85+
outstanding[0] += 1
86+
87+
def op_completed():
88+
with op_lock:
89+
outstanding[0] -= 1
90+
finished[0] += 1
91+
92+
def _send_handle(request, exception, msg):
93+
# Notice, we pass `msg` to the handler in order to make sure
94+
# it doesn't go out of scope prematurely.
95+
assert exception is None
96+
op_completed()
97+
98+
def _tag_recv_handle(request, exception, ep, msg):
99+
assert exception is None
100+
req = ucx_api.tag_send_nb(
101+
ep, msg, msg.nbytes, tag=0, cb_func=_send_handle, cb_args=(msg,)
102+
)
103+
if req is None:
104+
op_completed()
105+
106+
def _am_recv_handle(recv_obj, exception, ep):
107+
assert exception is None
108+
msg = Array(recv_obj)
109+
ucx_api.am_send_nbx(
110+
ep, msg, msg.nbytes, cb_func=_send_handle, cb_args=(msg,)
111+
)
112+
113+
def _listener_handler(conn_request, msg):
114+
self.ep = ucx_api.UCXEndpoint.create_from_conn_request(
115+
worker,
116+
conn_request,
117+
endpoint_error_handling=True,
118+
)
119+
120+
# Wireup before starting to transfer data
121+
if self.args.enable_am is True:
122+
ucx_api.am_recv_nb(self.ep, cb_func=_am_recv_handle, cb_args=(self.ep,))
123+
else:
124+
wireup = Array(bytearray(len(WireupMessage)))
125+
op_started()
126+
ucx_api.tag_recv_nb(
127+
worker,
128+
wireup,
129+
wireup.nbytes,
130+
tag=0,
131+
cb_func=_tag_recv_handle,
132+
cb_args=(self.ep, wireup),
133+
)
134+
135+
for i in range(self.args.n_iter + self.args.n_warmup_iter):
136+
if self.args.enable_am is True:
137+
ucx_api.am_recv_nb(
138+
self.ep, cb_func=_am_recv_handle, cb_args=(self.ep,)
139+
)
140+
else:
141+
if not self.args.reuse_alloc:
142+
msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1"))
143+
144+
op_started()
145+
ucx_api.tag_recv_nb(
146+
worker,
147+
msg,
148+
msg.nbytes,
149+
tag=0,
150+
cb_func=_tag_recv_handle,
151+
cb_args=(self.ep, msg),
152+
)
153+
154+
if not self.args.enable_am and self.args.reuse_alloc:
155+
msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1"))
156+
else:
157+
msg = None
158+
159+
listener = ucx_api.UCXListener(
160+
worker=worker,
161+
port=self.args.port or 0,
162+
cb_func=_listener_handler,
163+
cb_args=(msg,),
164+
)
165+
self.queue.put(listener.port)
166+
167+
while outstanding[0] == 0:
168+
worker.progress()
169+
170+
# +1 to account for wireup message
171+
if self.args.delay_progress:
172+
while finished[0] < self.args.n_iter + self.args.n_warmup_iter + 1 and (
173+
outstanding[0] >= self.args.max_outstanding
174+
or finished[0] + self.args.max_outstanding
175+
>= self.args.n_iter + self.args.n_warmup_iter + 1
176+
):
177+
worker.progress()
178+
else:
179+
while finished[0] != self.args.n_iter + self.args.n_warmup_iter + 1:
180+
worker.progress()
181+
182+
del self.ep
183+
184+
185+
class UCXPyCoreClient(BaseClient):
186+
def __init__(
187+
self, args: Namespace, xp: Any, queue: Queue, server_address: str, port: int
188+
):
189+
self.args = args
190+
self.xp = xp
191+
self.queue = queue
192+
self.server_address = server_address
193+
self.port = port
194+
195+
def run(self):
196+
ctx = ucx_api.UCXContext(
197+
feature_flags=(
198+
ucx_api.Feature.AM
199+
if self.args.enable_am is True
200+
else ucx_api.Feature.TAG,
201+
)
202+
)
203+
worker = ucx_api.UCXWorker(ctx)
204+
register_am_allocators(self.args, worker)
205+
ep = ucx_api.UCXEndpoint.create(
206+
worker,
207+
self.server_address,
208+
self.port,
209+
endpoint_error_handling=True,
210+
)
211+
212+
send_msg = self.xp.arange(self.args.n_bytes, dtype="u1")
213+
if self.args.reuse_alloc:
214+
recv_msg = self.xp.zeros(self.args.n_bytes, dtype="u1")
215+
216+
if self.args.enable_am:
217+
blocking_am_send(worker, ep, send_msg)
218+
blocking_am_recv(worker, ep)
219+
else:
220+
wireup_recv = bytearray(len(WireupMessage))
221+
blocking_send(worker, ep, WireupMessage)
222+
blocking_recv(worker, ep, wireup_recv)
223+
224+
op_lock = Lock()
225+
finished = [0]
226+
outstanding = [0]
227+
228+
def maybe_progress():
229+
while outstanding[0] >= self.args.max_outstanding:
230+
worker.progress()
231+
232+
def op_started():
233+
with op_lock:
234+
outstanding[0] += 1
235+
236+
def op_completed():
237+
with op_lock:
238+
outstanding[0] -= 1
239+
finished[0] += 1
240+
241+
if self.args.cuda_profile:
242+
self.xp.cuda.profiler.start()
243+
244+
times = []
245+
last_iter = self.args.n_iter + self.args.n_warmup_iter - 1
246+
for i in range(self.args.n_iter + self.args.n_warmup_iter):
247+
start = monotonic()
248+
249+
if self.args.enable_am:
250+
blocking_am_send(worker, ep, send_msg)
251+
blocking_am_recv(worker, ep)
252+
else:
253+
if not self.args.reuse_alloc:
254+
recv_msg = self.xp.zeros(self.args.n_bytes, dtype="u1")
255+
256+
if self.args.delay_progress:
257+
non_blocking_recv(worker, ep, recv_msg, op_started, op_completed)
258+
non_blocking_send(worker, ep, send_msg, op_started, op_completed)
259+
maybe_progress()
260+
else:
261+
blocking_send(worker, ep, send_msg)
262+
blocking_recv(worker, ep, recv_msg)
263+
264+
if i == last_iter and self.args.delay_progress:
265+
while finished[0] != 2 * (self.args.n_iter + self.args.n_warmup_iter):
266+
worker.progress()
267+
268+
stop = monotonic()
269+
if i >= self.args.n_warmup_iter:
270+
times.append(stop - start)
271+
272+
if self.args.cuda_profile:
273+
self.xp.cuda.profiler.stop()
274+
275+
self.queue.put(times)
276+
277+
def print_backend_specific_config(self):
278+
delay_progress_str = (
279+
f"True ({self.args.max_outstanding})"
280+
if self.args.delay_progress is True
281+
else "False"
282+
)
283+
284+
print_key_value(
285+
key="Transfer API", value=f"{'AM' if self.args.enable_am else 'TAG'}"
286+
)
287+
print_key_value(key="Delay progress", value=f"{delay_progress_str}")
288+
print_key_value(key="UCX_TLS", value=f"{ucp.get_config()['TLS']}")
289+
print_key_value(
290+
key="UCX_NET_DEVICES", value=f"{ucp.get_config()['NET_DEVICES']}"
291+
)

‎ucp/benchmarks/send_recv.py

+59-103
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import asyncio
2222
import multiprocessing as mp
2323
import os
24-
from time import perf_counter as clock
2524

2625
import ucp
2726
from ucp._libs.utils import (
@@ -30,29 +29,16 @@
3029
print_key_value,
3130
print_separator,
3231
)
32+
from ucp.benchmarks.backends.ucp_async import (
33+
UCXPyAsyncClient,
34+
UCXPyAsyncServer,
35+
)
36+
from ucp.benchmarks.backends.ucp_core import UCXPyCoreClient, UCXPyCoreServer
3337
from ucp.utils import get_event_loop
3438

3539
mp = mp.get_context("spawn")
3640

3741

38-
def register_am_allocators(args):
39-
if not args.enable_am:
40-
return
41-
42-
import numpy as np
43-
44-
ucp.register_am_allocator(lambda n: np.empty(n, dtype=np.uint8), "host")
45-
46-
if args.object_type == "cupy":
47-
import cupy as cp
48-
49-
ucp.register_am_allocator(lambda n: cp.empty(n, dtype=cp.uint8), "cuda")
50-
elif args.object_type == "rmm":
51-
import rmm
52-
53-
ucp.register_am_allocator(lambda n: rmm.DeviceBuffer(size=n), "cuda")
54-
55-
5642
def server(queue, args):
5743
if args.server_cpu_affinity >= 0:
5844
os.sched_setaffinity(0, [args.server_cpu_affinity])
@@ -77,43 +63,16 @@ def server(queue, args):
7763
xp.cuda.runtime.setDevice(args.server_dev)
7864
xp.cuda.set_allocator(rmm.rmm_cupy_allocator)
7965

80-
ucp.init()
81-
82-
register_am_allocators(args)
83-
84-
async def run():
85-
async def server_handler(ep):
86-
87-
if not args.enable_am:
88-
msg_recv_list = []
89-
if not args.reuse_alloc:
90-
for _ in range(args.n_iter + args.n_warmup_iter):
91-
msg_recv_list.append(xp.zeros(args.n_bytes, dtype="u1"))
92-
else:
93-
t = xp.zeros(args.n_bytes, dtype="u1")
94-
for _ in range(args.n_iter + args.n_warmup_iter):
95-
msg_recv_list.append(t)
96-
97-
assert msg_recv_list[0].nbytes == args.n_bytes
66+
if args.backend == "ucp-async":
67+
server = UCXPyAsyncServer(args, xp, queue)
68+
elif args.backend == "ucp-core":
69+
server = UCXPyCoreServer(args, xp, queue)
9870

99-
for i in range(args.n_iter + args.n_warmup_iter):
100-
if args.enable_am is True:
101-
recv = await ep.am_recv()
102-
await ep.am_send(recv)
103-
else:
104-
await ep.recv(msg_recv_list[i])
105-
await ep.send(msg_recv_list[i])
106-
await ep.close()
107-
lf.close()
108-
109-
lf = ucp.create_listener(server_handler, port=args.port)
110-
queue.put(lf.port)
111-
112-
while not lf.closed():
113-
await asyncio.sleep(0.5)
114-
115-
loop = get_event_loop()
116-
loop.run_until_complete(run())
71+
if asyncio.iscoroutinefunction(server.run):
72+
loop = get_event_loop()
73+
loop.run_until_complete(server.run())
74+
else:
75+
server.run()
11776

11877

11978
def client(queue, port, server_address, args):
@@ -142,53 +101,19 @@ def client(queue, port, server_address, args):
142101
xp.cuda.runtime.setDevice(args.client_dev)
143102
xp.cuda.set_allocator(rmm.rmm_cupy_allocator)
144103

145-
ucp.init()
146-
147-
register_am_allocators(args)
148-
149-
async def run():
150-
ep = await ucp.create_endpoint(server_address, port)
151-
152-
if args.enable_am:
153-
msg = xp.arange(args.n_bytes, dtype="u1")
154-
else:
155-
msg_send_list = []
156-
msg_recv_list = []
157-
if not args.reuse_alloc:
158-
for i in range(args.n_iter + args.n_warmup_iter):
159-
msg_send_list.append(xp.arange(args.n_bytes, dtype="u1"))
160-
msg_recv_list.append(xp.zeros(args.n_bytes, dtype="u1"))
161-
else:
162-
t1 = xp.arange(args.n_bytes, dtype="u1")
163-
t2 = xp.zeros(args.n_bytes, dtype="u1")
164-
for i in range(args.n_iter + args.n_warmup_iter):
165-
msg_send_list.append(t1)
166-
msg_recv_list.append(t2)
167-
assert msg_send_list[0].nbytes == args.n_bytes
168-
assert msg_recv_list[0].nbytes == args.n_bytes
169-
170-
if args.cuda_profile:
171-
xp.cuda.profiler.start()
172-
times = []
173-
for i in range(args.n_iter + args.n_warmup_iter):
174-
start = clock()
175-
if args.enable_am:
176-
await ep.am_send(msg)
177-
await ep.am_recv()
178-
else:
179-
await ep.send(msg_send_list[i])
180-
await ep.recv(msg_recv_list[i])
181-
stop = clock()
182-
if i >= args.n_warmup_iter:
183-
times.append(stop - start)
184-
if args.cuda_profile:
185-
xp.cuda.profiler.stop()
186-
queue.put(times)
187-
188-
loop = get_event_loop()
189-
loop.run_until_complete(run())
104+
if args.backend == "ucp-async":
105+
client = UCXPyAsyncClient(args, xp, queue, server_address, port)
106+
elif args.backend == "ucp-core":
107+
client = UCXPyCoreClient(args, xp, queue, server_address, port)
108+
109+
if asyncio.iscoroutinefunction(client.run):
110+
loop = get_event_loop()
111+
loop.run_until_complete(client.run())
112+
else:
113+
client.run()
190114

191115
times = queue.get()
116+
192117
assert len(times) == args.n_iter
193118
bw_avg = format_bytes(2 * args.n_iter * args.n_bytes / sum(times))
194119
bw_med = format_bytes(2 * args.n_bytes / np.median(times))
@@ -201,9 +126,7 @@ async def run():
201126
print_key_value(key="Bytes", value=f"{format_bytes(args.n_bytes)}")
202127
print_key_value(key="Object type", value=f"{args.object_type}")
203128
print_key_value(key="Reuse allocation", value=f"{args.reuse_alloc}")
204-
print_key_value(key="Transfer API", value=f"{'AM' if args.enable_am else 'TAG'}")
205-
print_key_value(key="UCX_TLS", value=f"{ucp.get_config()['TLS']}")
206-
print_key_value(key="UCX_NET_DEVICES", value=f"{ucp.get_config()['NET_DEVICES']}")
129+
client.print_backend_specific_config()
207130
print_separator(separator="=")
208131
if args.object_type == "numpy":
209132
print_key_value(key="Device(s)", value="CPU-only")
@@ -375,12 +298,45 @@ def parse_args():
375298
action="store_true",
376299
help="Disable detailed report per iteration.",
377300
)
301+
parser.add_argument(
302+
"-l",
303+
"--backend",
304+
default="ucp-async",
305+
type=str,
306+
help="Backend Library (-l) to use, options are: 'ucp-async' (default) and "
307+
"'ucp-core'.",
308+
)
309+
parser.add_argument(
310+
"--delay-progress",
311+
default=False,
312+
action="store_true",
313+
help="Only applies to 'ucp-core' backend: delay ucp_worker_progress calls "
314+
"until a minimum number of outstanding operations is reached, implies "
315+
"non-blocking send/recv. The --max-outstanding argument may be used to "
316+
"control number of maximum outstanding operations. (Default: disabled)",
317+
)
318+
parser.add_argument(
319+
"--max-outstanding",
320+
metavar="N",
321+
default=32,
322+
type=int,
323+
help="Only applies to 'ucp-core' backend: number of maximum outstanding "
324+
"operations, see --delay-progress. (Default: 32)",
325+
)
378326

379327
args = parser.parse_args()
328+
380329
if args.cuda_profile and args.object_type == "numpy":
381330
raise RuntimeError(
382331
"`--cuda-profile` requires `--object_type=cupy` or `--object_type=rmm`"
383332
)
333+
334+
if not any([args.backend == b for b in ["ucp-async", "ucp-core"]]):
335+
raise RuntimeError(f"Unknown backend {args.backend}")
336+
337+
if args.backend != "ucp-core" and args.delay_progress:
338+
raise RuntimeError("`--delay-progress` requires `--backend=ucp-core`")
339+
384340
return args
385341

386342

‎ucp/benchmarks/send_recv_core.py

-561
This file was deleted.

0 commit comments

Comments
 (0)
Please sign in to comment.