Skip to content

Commit 9b4e0ff

Browse files
committed
New version of setting & add on_done hook function
1 parent 2e484bc commit 9b4e0ff

File tree

2 files changed

+70
-40
lines changed

2 files changed

+70
-40
lines changed

README.md

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def each_task(id: int, config, task, log):
8888
responses = worker_dispatcher.start({
8989
'task': {
9090
'list': ['ORD_AH001', 'ORD_KL502', '...' , 'ORD_GR393'],
91-
'callback': each_task,
91+
'function': each_task,
9292
'config': {
9393
'my_endpoint': 'https://your.name/order-handler/'
9494
},
@@ -112,7 +112,7 @@ if __name__ == '__main__':
112112
results = worker_dispatcher.start({
113113
'task': {
114114
'list': 10,
115-
'callback': each_task,
115+
'function': each_task,
116116
},
117117
'worker': {
118118
'use_processing': True
@@ -155,10 +155,13 @@ import worker_dispatcher
155155
results = worker_dispatcher.start({
156156
'debug': False,
157157
'task': {
158-
'list': [], # Support list and integer. Integer represent the number of tasks to be generated.
159-
'callback': callback_sample,
158+
'list': [], # Support list and integer. Integer represent the number of tasks to be generated.
159+
'function': task_function_sample, # The main function to execute per task
160160
'config': {},
161-
'result_callback': False
161+
'callback': {
162+
'on_done': False, # Called with each task's result after each task completes; the return value will overwrite and define the task result
163+
'on_all_done': False, # Called with each task's result after all tasks complete; the return value will overwrite and define the task result
164+
}
162165
},
163166
'worker': {
164167
'number': 8,
@@ -185,9 +188,10 @@ results = worker_dispatcher.start({
185188
|:-- |:-- |:-- |:-- |
186189
|debug |bool |False |Debug mode |
187190
|task.list |multitype|list |The tasks for dispatching to each worker. *<br>- List: Each value will be passed as a parameter to your callback function. <br>- Integer: The number of tasks to be generated.|
188-
|[task.callback](#taskcallback) |callable |(sample) |The callback function called by each worker runs|
191+
|[task.function](#taskfunction) |callable |(sample) |The main function to execute per task|
189192
|task.config |multitype|list |The custom variable to be passed to the callback function|
190-
|[task.result_callback](#taskresult_callback) |callable |Null |The callback function called when each task processes the result|
193+
|[task.callback.on_done](#taskcallbackon_done) |callable |Null |The callback function is called with each task's result after each task completes; the return value will overwrite and define the task result|
194+
|[task.callback.on_all_done](#taskcallbackon_all_done) |callable |Null |The callback function is called with each task's result after all tasks complete; the return value will overwrite and define the task result|
191195
|worker.number |int |(auto) |The number of workers to fork. <br>(The default value is the number of local CPU cores)|
192196
|worker.frequency_mode.enabled |bool |False |Changing from assigning tasks to a fixed number of workers once, to assigning tasks and workers frequently.|
193197
|worker.frequency_mode.interval |float |1 |The second(s) of interval.|
@@ -200,12 +204,12 @@ results = worker_dispatcher.start({
200204
|verbose |bool |True |Enables or disables verbose mode for detailed output.|
201205

202206

203-
#### task.callback
207+
#### task.function
204208

205-
The callback function called by each worker runs
209+
The main function to execute per task
206210

207211
```python
208-
callback_function (id: int, config, task, log: dict) -> Any
212+
task_function (id: int, config, task, log: dict) -> Any
209213
```
210214

211215
|Argument |Type |Deafult |Description|
@@ -218,12 +222,28 @@ callback_function (id: int, config, task, log: dict) -> Any
218222
> The return value can be `False` to indicate task failure in TPS logs.
219223
> Alternatively, it can be a `requests.Response`, indicating failure if the status code is not 200.
220224
221-
#### task.result_callback
225+
#### task.callback.on_done
226+
227+
The callback function is called with each task's result after each task completes; the return value will overwrite and define the task result
228+
229+
```python
230+
callback_on_done_function (id: int, config, result, log: dict) -> Any
231+
```
232+
233+
|Argument |Type |Deafult |Description|
234+
|:-- |:-- |:-- |:-- |
235+
|id |int |(auto) |The sequence number generated by each task starting from 1|
236+
|config |multitype|{} |The custom variable to be passed to the callback function|
237+
|result |multitype|(custom) |Each value returned back from `task.callback`|
238+
|log |dict |(auto) |Reference: [get_logs()](#get_logs)|
239+
240+
241+
#### task.callback.on_all_done
222242

223-
The callback function called when each task processes the result
243+
The callback function is called with each task's result after all tasks complete; the return value will overwrite and define the task result
224244

225245
```python
226-
result_callback_function (id: int, config, result, log: dict) -> Any
246+
callback_on_all_done_function (id: int, config, result, log: dict) -> Any
227247
```
228248

229249
|Argument |Type |Deafult |Description|
@@ -305,7 +325,7 @@ def each_task(id, config, task, log):
305325
responses = worker_dispatcher.start({
306326
'task': {
307327
'list': 600,
308-
'callback': each_task,
328+
'function': each_task,
309329
'config': {
310330
'my_endpoint': 'https://your.name/api'
311331
},

src/worker_dispatcher/worker_dispatcher.py

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
import time, datetime, copy, requests, math, os, platform
33
import queue as Queue
44

5-
# Sample callback function
6-
def callback_sample(id: int, config=None, task=None):
5+
# Sample task function
6+
def task_function_sample(id: int, config=None, task=None, log: dict=None):
77
if id == 1:
88
print("Runing sample task function, please customize yours according to the actual usage.")
99
result = {
@@ -12,7 +12,7 @@ def callback_sample(id: int, config=None, task=None):
1212
return result
1313

1414
# Sample result callback function
15-
def result_callback_sample(id: int, config=None, result=None, log: dict=None):
15+
def callback_sample(id: int, config=None, result=None, log: dict=None):
1616
if id == 1:
1717
print("Runing sample result function, please customize yours according to the actual usage.")
1818
return result
@@ -21,10 +21,13 @@ def result_callback_sample(id: int, config=None, result=None, log: dict=None):
2121
default_config = {
2222
'debug': False,
2323
'task': {
24-
'list': [], # Support list and integer. Integer represent the number of tasks to be generated.
25-
'callback': callback_sample,
24+
'list': [], # Support list and integer. Integer represent the number of tasks to be generated.
25+
'function': task_function_sample, # The main function to execute per task
2626
'config': {},
27-
'result_callback': False
27+
'callback': {
28+
'on_done': False, # Called with each task's result after each task completes; the return value will overwrite and define the task result
29+
'on_all_done': False, # Called with each task's result after all tasks complete; the return value will overwrite and define the task result
30+
}
2831
},
2932
'worker': {
3033
'number': multiprocessing.cpu_count(),
@@ -84,9 +87,9 @@ def start(user_config: dict) -> list:
8487
print("Configuration Dictionary:")
8588
print(config)
8689

87-
# Callback check
88-
if not callable(config['task']['callback']):
89-
exit("Callback function is invalied")
90+
# Task function check
91+
if not callable(config['task']['function']):
92+
exit("Task function is invalied")
9093

9194
# Task list to queue
9295
task_list = []
@@ -342,15 +345,15 @@ def start(user_config: dict) -> list:
342345
chunk_log = chunk_result['log_list']
343346
except Exception as e:
344347
break
345-
exit(f"Fatal error occurred in task.callback function: {e}")
348+
exit(f"Fatal error occurred in task.function: {e}")
346349
# print(chunk_log);exit
347350
for log in chunk_log:
348351
result = log['result']
349-
if callable(config['task']['result_callback']):
352+
if callable(config['task']['callback']['on_all_done']):
350353
try:
351-
result = config['task']['result_callback'](config=config['task']['config'], id=log['task_id'], result=result, log=log)
354+
result = config['task']['callback']['on_all_done'](config=config['task']['config'], id=log['task_id'], result=result, log=log)
352355
except Exception as e:
353-
exit(f"Fatal error occurred in task.result_callback function: {e}")
356+
exit(f"Fatal error occurred in task.callback.on_all_done function: {e}")
354357
logs.append(log)
355358
results.append(result)
356359

@@ -371,13 +374,13 @@ def start(user_config: dict) -> list:
371374
try:
372375
log = future.result()
373376
except Exception as e:
374-
exit(f"Fatal error occurred in task.callback function: {e}")
377+
exit(f"Fatal error occurred in task.function: {e}")
375378
result = log['result']
376-
if callable(config['task']['result_callback']):
379+
if callable(config['task']['callback']['on_all_done']):
377380
try:
378-
result = config['task']['result_callback'](config=config['task']['config'], id=log['task_id'], result=result, log=log)
381+
result = config['task']['callback']['on_all_done'](config=config['task']['config'], id=log['task_id'], result=result, log=log)
379382
except Exception as e:
380-
exit(f"Fatal error occurred in task.result_callback function: {e}")
383+
exit(f"Fatal error occurred in task.callback.on_all_done function: {e}")
381384
logs.append(log)
382385
results.append(result)
383386
# results = [result.result() for result in concurrent.futures.as_completed(pool_results)]
@@ -454,12 +457,12 @@ def _consume_tasks(task_list, config,
454457
try:
455458
log = future.result()
456459
except Exception as e:
457-
exit(f"Fatal error occurred in task.callback function: {e}")
458-
if callable(config['task']['result_callback']):
460+
exit(f"Fatal error occurred in task.function: {e}")
461+
if callable(config['task']['callback']['on_all_done']):
459462
try:
460-
log['result'] = config['task']['result_callback'](config=config['task']['config'], id=log['task_id'], result=log['result'], log=log)
463+
log['result'] = config['task']['callback']['on_all_done'](config=config['task']['config'], id=log['task_id'], result=log['result'], log=log)
461464
except Exception as e:
462-
exit(f"Fatal error occurred in task.result_callback function: {e}")
465+
exit(f"Fatal error occurred in task.callback.on_all_done function: {e}")
463466
logs.append(log)
464467

465468
undispatched_tasks_count = undispatched_tasks_count if undispatched_tasks_count else len(not_done)
@@ -493,14 +496,14 @@ def _parallel_consume_tasks(queue, config, runtime, worker_num, frequency_interv
493496
try:
494497
log_batch = future.result()
495498
except Exception as e:
496-
exit(f"Fatal error occurred in task.callback function in thread pool: {e}")
499+
exit(f"Fatal error occurred in task.function in thread pool: {e}")
497500
# print(log)
498501
for log in log_batch:
499-
if callable(config['task']['result_callback']):
502+
if callable(config['task']['callback']['on_all_done']):
500503
try:
501-
log['result'] = config['task']['result_callback'](config=config['task']['config'], id=log['task_id'], result=log['result'], log=log)
504+
log['result'] = config['task']['callback']['on_all_done'](config=config['task']['config'], id=log['task_id'], result=log['result'], log=log)
502505
except Exception as e:
503-
exit(f"Fatal error occurred in task.result_callback function: {e}")
506+
exit(f"Fatal error occurred in task.callback.on_all_done function: {e}")
504507
logs.append(log)
505508

506509
undispatched_tasks_count = len(not_done)
@@ -534,7 +537,7 @@ def _consume_queue(queue, config, timeout_unixtime, frequency_interval_seconds)
534537
def _consume_task(data, config) -> dict:
535538
started_at = time.time()
536539
task_rewrite_data = {}
537-
return_value = config['task']['callback'](config=config['task']['config'], id=data['id'], task=data['task'], log=task_rewrite_data)
540+
return_value = config['task']['function'](config=config['task']['config'], id=data['id'], task=data['task'], log=task_rewrite_data)
538541
ended_at = time.time()
539542
duration = ended_at - started_at
540543
log = {
@@ -545,6 +548,13 @@ def _consume_task(data, config) -> dict:
545548
'result': return_value,
546549
'log': task_rewrite_data,
547550
}
551+
# On_done hook (will update the result in the log)
552+
if callable(config['task']['callback']['on_done']):
553+
try:
554+
log['result'] = config['task']['callback']['on_done'](config=config['task']['config'], id=log['task_id'], result=log['result'], log=log)
555+
except Exception as e:
556+
exit(f"Fatal error occurred in task.callback.on_done function: {e}")
557+
548558
return log
549559

550560
def _merge_dicts_recursive(default_dict, user_dict):

0 commit comments

Comments
 (0)