@@ -12,54 +12,48 @@ def handle_scale_up(name_resolve: name_resolve, actor, rollout, weight_update_me
1212 Handle scale-up logic when scale_up_request is detected.
1313 Requires: name_resolve, actor, rollout.
1414 """
15- new_scale = 0
16- req_raw = None
1715 try :
1816 req_raw = name_resolve .get ("scale_up_request" )
1917 new_scale = ast .literal_eval (req_raw )["scaled_k" ]
2018 except NameEntryNotFoundError :
21- logger .info ("scale_up_request not found" )
22- pass # no request → don't wait
19+ return
2320
24- logger .info (f"scale_up_request { req_raw } " )
21+ logger .info (f"Handling scale_up_request: { req_raw } " )
2522
26- if req_raw :
27- # Now wait until scale_up_done is posted from scaler process
28- start = time .time ()
23+ # Now wait until scale_up_done is posted from scaling_controller
24+ start = time .time ()
25+ try :
26+ name_resolve .delete ("scale_up_request" )
27+ except NameEntryNotFoundError :
28+ pass
29+
30+ while True :
2931 try :
30- name_resolve .delete ( "scale_up_request " )
32+ done_raw = name_resolve .get ( "scale_up_done " )
3133 except NameEntryNotFoundError :
32- pass
34+ done_raw = None
35+
36+ if done_raw :
37+ logger .info (f"[areal] Scale-up finished: { done_raw } " )
38+ name_resolve .add (
39+ "scale_up_time" ,
40+ {"time" : time .time () - start },
41+ replace = True ,
42+ )
3343
34- while True :
3544 try :
36- done_raw = name_resolve .get ("scale_up_done" )
45+ name_resolve .delete ("scale_up_done" )
3746 except NameEntryNotFoundError :
38- done_raw = None
47+ pass
3948
40- if done_raw :
41- logger . info ( f"[areal] Scale-up finished: { done_raw } " )
42- name_resolve . add (
43- "scale_up_time" ,
44- { "time" : time . time () - start },
45- replace = True ,
46- )
49+ # Increase the number of scale in rollout engine and actor. To get correct world size
50+ actor . scaling_count = actor . scaling_count + new_scale
51+ rollout . _engine . backend . scaling_count = (
52+ rollout . _engine . backend . scaling_count + new_scale
53+ )
54+ rollout . _engine . distributed_weight_update_initialized = False
55+ actor . _re_init_weight_update_from_distributed ( weight_update_meta )
4756
48- try :
49- name_resolve .delete ("scale_up_request" )
50- except NameEntryNotFoundError :
51- pass
52- try :
53- name_resolve .delete ("scale_up_done" )
54- except NameEntryNotFoundError :
55- pass
56- # Increase teh number of scale in rollout engine and actor. To get correct world size
57- actor .scaling_count = actor .scaling_count + new_scale
58- rollout ._engine .backend .scaling_count = (
59- rollout ._engine .backend .scaling_count + new_scale
60- )
61- rollout ._engine .distributed_weight_update_initialized = False
62- actor ._re_init_weight_update_from_distributed (weight_update_meta )
57+ break
6358
64- break
65- time .sleep (0.5 )
59+ time .sleep (0.5 )
0 commit comments