-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LLMBlock concurrency #157
LLMBlock concurrency #157
Conversation
I'd like to see info like below (from aakankshaduggal#8) included in one of the commit messages
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Gabe!
@@ -290,6 +311,8 @@ def generate_data( | |||
model_family, | |||
model_name, | |||
num_instructions_to_generate, | |||
num_workers=num_cpus, | |||
batch_size=batch_size, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since no concurrency is used unless batch size is set ... I feel like we need a sensible default here. We certainly shouldn't require users to set it to get sensible behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, good point. This could get tricky to estimate. I think the primary elements of a heuristic would be expected memory overhead per batch element (likely low for IO-bound work), and expected concurrency limits on the client-side for IO-bound work. I'm not sure we would have any reasonable way of knowing these though. I have much less familiarity with the "real" workload, though. Is there some kind of fixed number that we think would make sense?
The other concern I have about enabling concurrency by default is the possibility for the sharding of the dataset to actually change the results. I think most blocks will treat rows of the Dataset
as independent entities and process them as such, but I could certainly imagine a case where the logic of a block does not treat the rows as independent. In this case, enabling concurrency might degrade the overall results if the individual block executions have less full context to work with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our users certainly aren't going to be able to do a better job of judging these questions (whether sharding is safe, what batch size to us)
If we can't come up with a sane default, it should be in the pipeline config somehow
@shivchander @xukai92 @aakankshaduggal @npalaska we need your help making a call on an appropriate default batch size or whether it needs to be configured per-pipeline or per-block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From @shivchander
we did some tests with the vllm endpoint and found that low batch size and increasing the number of workers worked best from a throughput perspective. So I’d recommend a batch size of 8 and setting num workers to num cpus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Further from @shivchander - it can be tricky to debug with this concurrency enabled, so would be nice to have an easy way to disable this for debugging (as a runtime parameter, not a pipeline config). It might make sense for --num-cpus=1
to disable this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so it sounds like we want:
batch_size=8
num_cpus=None
(<- The python default withnum_workers=None
is here)- If the user sets
num_cpus=1
, we fully disable usingThreadPoolExecutor
- This one can get annoying since there is no out-of-the-box
SynchronousExecutor
that offers the same API. I wrote one in a different project that we can probably borrow (here). The alternative is to maintain separate code paths for the synchronous version which can be pretty error prone.
- This one can get annoying since there is no out-of-the-box
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep - another reason would be for smaller datasets and small knowledge sources (say 1 page document), batching is an overkill and would probably end up taking more time due to I/O and regrouping. So I would incline towards
--num-cpus=1
as default behavior and triggering the batching when needed
My sense is that this will inevitably lead to users experiencing sub-par performance, complaining (or just walking way), and us telling them "oh, you need this go-faster flag"
I don't think we should be taking that route out of a concern that it might be a little slower with small datasets
Unless I'm misunderstanding something, I think we should enable the batching by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a fair point - im with you on making the distributed setting as a default
batch_size = 8
num_cpus = mp.cpu_count()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, just a note: The combination of batch size and concurrency also depends on the underlying accelerator being used and how the accelerators can handle the batches. (The total number of batches the server can handle depends on the compute profile and the memory available on the accelerator).
The general consensus is to have smaller batches per thread and maximize the number of threads to a point where we are not adding a lot of requests in the backend vLLM queue. As the requests start to queue up on the backend vLLM server the time to the first token will shoot up and some requests will result in timeout errors. However, having a small batch means only a few requests get penalized in case of an error.
For example, using a batch of 8 and num_workers 32 (256 total concurrent requests) on 4H100 yielded the most optimal performance for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the input! This makes sense and absolutely lines up with my assumptions about why a heuristic for this would be tough to nail down. Since we do have some strong ideas about the default environment where this would run, I think batch_size = 8
and num_cpus = None
(thereby using the python default of min(32, os.cpu_count() + 4)
) is probably a safe place to start.
src/instructlab/sdg/sdg.py
Outdated
input_splits = self._split_dataset(dataset) | ||
output_splits = [None] * len(input_splits) | ||
|
||
with ThreadPoolExecutor(max_workers=self.num_workers) as executor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has me wondering:
All threads enqueued to ThreadPoolExecutor will be joined before the interpreter can exit. Note that the exit handler which does this is executed before any exit handlers added using atexit. This means exceptions in the main thread must be caught and handled in order to signal threads to exit gracefully. For this reason, it is recommended that ThreadPoolExecutor not be used for long-running tasks.
Could you experiment with killing the parent while the pipeline is running, to check the behavior is graceful?
On the other hand, if we used ProcessPoolExecutor, I suspect we might run into pickling issues like I fixed in commit 7cfbaa9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, ProcessPoolExecutor
would very likely cause pickling problems. I thought about making it configurable, but decided against it to keep it simple. Probably worth a simple experiment to see if a Dataset
can easily pass between processes (my money is on "no").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, looks like Dataset
itself is actually pretty happy to pass between processes:
tmp.py
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from datasets import Dataset
import os
ds = Dataset.from_list([{"foo": 1}, {"foo": 2}])
pool_type = ThreadPoolExecutor if int(os.getenv("THREADS", "0")) else ProcessPoolExecutor
def doit(dataset):
return dataset.map(lambda r: {"foo": r["foo"] * 2})
if __name__ == "__main__":
print(f"Pool Type: {pool_type}")
with pool_type() as pool:
fut = pool.submit(doit, ds)
print(fut.result().to_list())
?> python tmp.py
Pool Type: <class 'concurrent.futures.process.ProcessPoolExecutor'>
Map: 100%|███████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:00<00:00, 2649.59 examples/s]
[{'foo': 2}, {'foo': 4}]
?> THREADS=1 python tmp.py
Pool Type: <class 'concurrent.futures.thread.ThreadPoolExecutor'>
Map: 100%|███████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:00<00:00, 1261.26 examples/s]
[{'foo': 2}, {'foo': 4}]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do suspect that if we put any non-primitives into the Dataset
(not sure if this is even allowed based on using pyarrow
behind the scenes), we'd hit pickling problems like you did with SSLContext
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you experiment with killing the parent while the pipeline is running, to check the behavior is graceful?
I was thinking about this a little. I'll see what I can do to drum up a full test of this. I suspect the behavior would be to block termination (short of SIGKILL
) until all non-error threads have fully terminated.
In caikit
, we built a wrapper (DestroyableThread) to handle this so that a given piece of work running in a thread could be cancelled without destroying the thread itself from the pool. This is probably overkill since caikit
is designed to be a long-running server and a single dead task should not cause harm to the central thread pool. In the case of SDG, it could still be useful to signal that other future
shards should stop early if one of the shards has failed, though I suspect that a simple SIGKILL
to the parent process would do the trick since this will all be driven by the CLI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, as suspected, it looks like SIGTERM
will wait for a "graceful" exit which means finishing all running futures, while SIGKILL
will unceremoniously kill everything:
tmp.py
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from datasets import Dataset
import os
import time
import atexit
from functools import partial
pool_type = ThreadPoolExecutor if int(os.getenv("THREADS", "0")) else ProcessPoolExecutor
def doit(dataset, sleep_time=0, should_raise=False):
if sleep_time:
time.sleep(sleep_time)
if should_raise:
raise RuntimeError("Yikes")
return dataset.map(lambda r: {"foo": r["foo"] * 2})
def end_report(futures):
print("Done: " + str([f"{i}: {fut.done()}" for i, fut in enumerate(futures)]))
print("Cancelled: " + str([f"{i}: {fut.cancelled()}" for i, fut in enumerate(futures)]))
print("Exception: " + str([f"{i}: {fut.exception(timeout=0.01)}" for i, fut in enumerate(futures)]))
if __name__ == "__main__":
print(f"Pool Type: {pool_type}")
ds1 = Dataset.from_list([{"foo": 1}, {"foo": 2}])
ds2 = Dataset.from_list([{"foo": 3}, {"foo": 4}])
ds3 = Dataset.from_list([{"foo": 5}, {"foo": 6}])
with pool_type() as pool:
futures = [
pool.submit(doit, ds, sleep_time=i * 3, should_raise=not i)
for i, ds in enumerate([ds1, ds2, ds3])
]
atexit.register(partial(end_report, futures))
for future in as_completed(futures):
print(future.result().to_list())
With SIGTERM
?> time THREADS=1 python tmp.py & proc=$! && kill $proc
[1] 24619
Pool Type: <class 'concurrent.futures.thread.ThreadPoolExecutor'>
Map: 100%|████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:00<00:00, 857.29 examples/s]
Map: 100%|████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:00<00:00, 818.96 examples/s]
Traceback (most recent call last):
File "/Users/ghart/Projects/github/instructlab/sdg/tmp.py", line 34, in <module>
print(future.result().to_list())
^^^^^^^^^^^^^^^
File "/Users/ghart/mambaforge/envs/instructlab-sdg/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/ghart/mambaforge/envs/instructlab-sdg/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/ghart/mambaforge/envs/instructlab-sdg/lib/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ghart/Projects/github/instructlab/sdg/tmp.py", line 14, in doit
raise RuntimeError("Yikes")
RuntimeError: Yikes
Done: ['0: True', '1: True', '2: True']
Cancelled: ['0: False', '1: False', '2: False']
Exception: ['0: Yikes', '1: None', '2: None']
real 0m6.612s
user 0m0.741s
sys 0m3.233s
[1]+ Exit 1 time THREADS=1 python tmp.py
With SIGKILL
(NOTE: This also kills time
, but it all happens "instantly")
?> time THREADS=1 python tmp.py & proc=$! && kill -9 $proc
[1] 24510
[1]+ Killed: 9 time THREADS=1 python tmp.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, letting it all run, including the exception raised in the first future, waits for the others to complete (as it should):
?> time THREADS=1 python tmp.py
Pool Type: <class 'concurrent.futures.thread.ThreadPoolExecutor'>
Map: 100%|███████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:00<00:00, 1006.07 examples/s]
Map: 100%|████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:00<00:00, 842.74 examples/s]
Traceback (most recent call last):
File "/Users/ghart/Projects/github/instructlab/sdg/tmp.py", line 34, in <module>
print(future.result().to_list())
^^^^^^^^^^^^^^^
File "/Users/ghart/mambaforge/envs/instructlab-sdg/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/ghart/mambaforge/envs/instructlab-sdg/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/ghart/mambaforge/envs/instructlab-sdg/lib/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ghart/Projects/github/instructlab/sdg/tmp.py", line 14, in doit
raise RuntimeError("Yikes")
RuntimeError: Yikes
Done: ['0: True', '1: True', '2: True']
Cancelled: ['0: False', '1: False', '2: False']
Exception: ['0: Yikes', '1: None', '2: None']
real 0m6.584s
user 0m0.672s
sys 0m3.332s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, as suspected, it looks like
SIGTERM
will wait for a "graceful" exit which means finishing all running futures
Ok, thanks. I guess that works as a reasonable UX for the CLI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gabe-l-hart I am seeing inconsistent behaviour in the test above. Can be graceful both for term and kill. It can also be ungraceful for both.
Also, one commit should be fine - please squash, and add:
|
Ah, very good call on the |
bd3cf54
to
7243cb5
Compare
@gabe-l-hart is it expected that I would be able to run this with a locally running ilab server? when I attempt to I get "openai.InternalServerError: Service Unavailable" I had set batchsize to 2 in the simple pipeline with 3 samples in the taxonomy |
@markmc @russellb @shivchander Question for you: I see that there is already a num_procs in PipelineContext (here). It looks like this is used in two places, both as an argument to
My inclination is to go with (1) for now, but I'm curious your opinions. |
7243cb5
to
d6eefca
Compare
I would also incline towards 1 - seems like the path of least resistance |
782c284
to
dab7222
Compare
@derekhiggins I don't think the matching should have any impact on this, but it's always possible there's a bug! Can you try the updated code and set the |
Community standards developing here: instructlab/dev-docs#110 e.g. for the error handling changes, I'm not a fan of keeping a long-term history of the 4 different iterations of this ... especially since the commit messages don't capture any of the "why" for the different approaches. But I do hugely value splitting logically separate changes into their own commits That said, it's not such a big deal here ... it's not like this is a string of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, but I'd like to see what Derek finds with his testing before we merge
src/instructlab/sdg/pipeline.py
Outdated
model_family: The family identifier for the model being updated | ||
num_instructions_to_generate: The total number of instructions the user | ||
wants to generate during this run | ||
batch_size: The size of the dataset batches for parallel generation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's definitely going to be confusion here about this batching versus the batching in LLMBlock
- see server_supports_batching
, batch_kwargs
, etc.
(I don't have an obvious and quick solution, so I'm fine with it for now)
batch_num_workers: Optional[int] = None | ||
|
||
@property | ||
def batching_enabled(self) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another example of that confusion ... I want to move server_supports_batched
from the OpenAI
client to PipelineContext
... and that sounds an awful lot like "batching enabled"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is definitely confusing and requires realizing the client/server relationship between this SDG code and the server.
Back several months ago the local server (started with "ilab serve" ) used to crash if multiple clients This restriction is still in place (and I believe its the reason I'm seeing service unavailable errors) i.e. the local server
|
Ok, and that is specific to the llama-cpp backend (get_uvicorn_config() is only used in @gabe-l-hart I think we need to reconcile this with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs to be disabled for llama-cpp. Great catch @derekhiggins !
Very good catch. I'll look into how to make the defaults conditional on the server. |
@markmc Thanks for the pointer on the standards. I really like the articulation of commits as revertable atomic units (though my personal preference is "legible trail of what happened within reason"). I'll refactor the history to reflect this. Also, as a side note, I think you're spot on that this really has to do with whether the repo is |
dab7222
to
7cdfade
Compare
This pull request has merge conflicts that must be resolved before it can be |
Problem statement from [email protected]: Overview The current implementation of LLMBlock sends the entire dataset as a single large batch of requests to the OpenAI server. This may lead to some requests waiting too long for the response, resulting in timeout errors and potentially overloading the backend server with extremely large batches. Proposed Changes Use concurrent processing using Python’s concurrent.futures package in LLMBlock. The key changes are: * Utilizes concurrent.futures for managing parallel tasks with threading for launching parallel tasks. * Allows users to specify the number of requests to send in each batch. * Allows users to specify the number of concurrent worker threads to handle batches. Example Usage If the user sets the concurrency to 8 and the batch size to 32, the system will run 8 concurrent threads, each sending a batch of 32 prompts, resulting in a total of 256 requests processed simultaneously by the backend server. Ref: aakankshaduggal#6 instructlab#135 Signed-off-by: Gabe Goodhart <[email protected]> Co-authored-by: Nikhil Palaskar <[email protected]> Co-authored-by: shiv <[email protected]> Co-authored-by: Kai Xu <[email protected]> Co-authored-by: Aakanksha Duggal <[email protected]>
instructlab#135 Signed-off-by: Gabe Goodhart <[email protected]>
7cdfade
to
afe12a6
Compare
@derekhiggins @markmc It looks like the decision of which backend to use is made here in |
Draft PR for changes in |
65005ff
to
432a842
Compare
I've filed #174 to capture some of the stuff we've discussed here, I'm fine with capturing it as a TODO for future |
Tested with both lamma_cpp and vllm, Do we want to address disabling concurrency with lamma_cpp here or address it later ? |
Derek also pointed out that CI isn't passing, my bad for not seeing that!
Something like, in generate_data():
could be a good way to get all this staged, ready to enable safely when the necessary ilab change merges by deleting those lines |
Ya, the e2e job is reporting success but there is a traceback in the logs
https://github.com/instructlab/sdg/actions/runs/9999879504/job/27641546924?pr=157#step:9:120 |
ok, there is nothing wrong with the CI job, the generate command spits out the trackback about the log string formatting and then continues and succeeds |
🤦 bad logging. Will fix now |
instructlab#135 Signed-off-by: Gabe Goodhart <[email protected]>
instructlab#135 Signed-off-by: Gabe Goodhart <[email protected]>
…text This allows None to be used as a default in generate_data and from the CLI instructlab#135 Signed-off-by: Gabe Goodhart <[email protected]>
instructlab#135 Signed-off-by: Gabe Goodhart <[email protected]>
432a842
to
68303f2
Compare
That makes sense because of the sequence of the updates. Will add that now. |
This is a mitigation to allow the `instructlab-sdg` library to merge and release before `instructlab` has updated the CLI invocation of generate_data to properly distinguish between backend types. It should be reverted once that change is made in the CLI. instructlab#135 Signed-off-by: Gabe Goodhart <[email protected]>
lgtm when CI passes |
E2E (NVIDIA A10G x4 - full pipeline) workflow launched on this PR: View run |
e2e workflow succeeded on this PR: View run, congrats! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thanks
Thanks for the thorough reviews @derekhiggins @markmc! |
Description
Supports #135
This PR ports the parallel generation PR from the research fork and adds unit tests plus arg plumbing up to
generate_data
.