-
Notifications
You must be signed in to change notification settings - Fork 1
Use Async functions for OpenAi functions #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Async functions for OpenAi functions #24
Conversation
|
|
||
| public FunctionExecutor(FunctionContext context, String functionName) { | ||
| this.metricTracker = new FunctionMetricTracker(context, functionName); | ||
| this.executorService = Executors.newFixedThreadPool(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, I couldn’t find a way to share the executor. The most optimal approach would be to borrow one from the context, but as far as I know, such a feature doesn’t exist. Would it make sense to make the pool size configurable via an environment variable, specific to the function? For instance, something like FUNCTION_NAME.toUpperCase() + "_POOL_SIZE".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine, we can have one threadpool per function and use a single environment variable ASYNC_FUNCTION_THREAD_POOL_SIZE to control it across all functions. We can later decide if we need to make it function specific.
mbroecheler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Two smallish changes, please
|
|
||
| public FunctionExecutor(FunctionContext context, String functionName) { | ||
| this.metricTracker = new FunctionMetricTracker(context, functionName); | ||
| this.executorService = Executors.newFixedThreadPool(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine, we can have one threadpool per function and use a single environment variable ASYNC_FUNCTION_THREAD_POOL_SIZE to control it across all functions. We can later decide if we need to make it function specific.
| metricTracker.increaseCallCount(); | ||
| final long start = System.nanoTime(); | ||
|
|
||
| T result = executeWithRetry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flink async functions have retry build-in now:
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/config/#table-exec-async-scalar-retry-strategy
Please remove the retry logic and just keep the metrics piece.
… Adding ASYNC_FUNCTION_THREAD_POOL_SIZE environment variable to control function threadpool size.
No description provided.