@@ -117,14 +117,20 @@ async def start_over_database(self):
117
117
self .symbol )
118
118
symbol_db .set_initialized_flags (False )
119
119
for producer in self .producers :
120
- for time_frame , call_args in producer .last_call_by_timeframe .items ():
121
- run_db = databases .RunDatabasesProvider .instance ().get_run_db (self .bot_id )
122
- await producer .init_user_inputs (False )
123
- run_db .set_initialized_flags (False , (time_frame , ))
124
- await databases .CacheManager ().close_cache (commons_constants .UNPROVIDED_CACHE_IDENTIFIER ,
125
- reset_cache_db_ids = True )
126
- await producer .call_script (* call_args )
127
- await run_db .flush ()
120
+ for time_frame , call_args_by_symbols in producer .last_calls_by_time_frame_and_symbol .items ():
121
+ if self .symbol in call_args_by_symbols :
122
+ run_db = databases .RunDatabasesProvider .instance ().get_run_db (self .bot_id )
123
+ await producer .init_user_inputs (False )
124
+ run_db .set_initialized_flags (False , (time_frame , ))
125
+ await databases .CacheManager ().close_cache (commons_constants .UNPROVIDED_CACHE_IDENTIFIER ,
126
+ reset_cache_db_ids = True )
127
+ await producer .call_script (* call_args_by_symbols [self .symbol ])
128
+ await run_db .flush ()
129
+ else :
130
+ raise RuntimeError (
131
+ f"Failed to reload trading mode as { self .symbol } "
132
+ "is not initialized"
133
+ )
128
134
129
135
def set_initialized_trading_pair_by_bot_id (self , symbol , time_frame , initialized ):
130
136
# todo migrate to event tree
@@ -164,7 +170,7 @@ class AbstractScriptedTradingModeProducer(modes_channel.AbstractTradingModeProdu
164
170
165
171
def __init__ (self , channel , config , trading_mode , exchange_manager ):
166
172
super ().__init__ (channel , config , trading_mode , exchange_manager )
167
- self .last_call_by_timeframe = {}
173
+ self .last_calls_by_time_frame_and_symbol = {}
168
174
169
175
async def start (self ) -> None :
170
176
await super ().start ()
@@ -222,23 +228,60 @@ async def ohlcv_callback(self, exchange: str, exchange_id: str, cryptocurrency:
222
228
trigger_time = candle [commons_enums .PriceIndexes .IND_PRICE_TIME .value ] + \
223
229
commons_enums .TimeFramesMinutes [commons_enums .TimeFrames (time_frame )] * \
224
230
commons_constants .MINUTE_TO_SECONDS
225
- await self .call_script (self .matrix_id , cryptocurrency , symbol , time_frame ,
226
- commons_enums .TriggerSource .OHLCV .value ,
227
- trigger_time ,
228
- candle = candle ,
229
- init_call = init_call )
231
+ self .log_last_call_by_time_frame_and_symbol (
232
+ matrix_id = self .matrix_id ,
233
+ cryptocurrency = cryptocurrency ,
234
+ symbol = symbol ,
235
+ time_frame = time_frame ,
236
+ trigger_source = commons_enums .TriggerSource .OHLCV .value ,
237
+ trigger_cache_timestamp = trigger_time ,
238
+ candle = candle
239
+ )
240
+ await self .call_script (
241
+ matrix_id = self .matrix_id , cryptocurrency = cryptocurrency ,
242
+ symbol = symbol , time_frame = time_frame ,
243
+ trigger_source = commons_enums .TriggerSource .OHLCV .value ,
244
+ trigger_cache_timestamp = trigger_time ,
245
+ candle = candle , init_call = init_call )
230
246
231
247
async def kline_callback (self , exchange : str , exchange_id : str , cryptocurrency : str , symbol : str ,
232
248
time_frame , kline : dict ):
233
249
async with self .trading_mode_trigger (), self .trading_mode .remote_signal_publisher (symbol ):
234
- await self .call_script (self .matrix_id , cryptocurrency , symbol , time_frame ,
235
- commons_enums .TriggerSource .KLINE .value ,
236
- kline [commons_enums .PriceIndexes .IND_PRICE_TIME .value ],
237
- kline = kline )
238
-
239
- async def set_final_eval (self , matrix_id : str , cryptocurrency : str , symbol : str , time_frame , trigger_source : str ):
240
- await self .call_script (matrix_id , cryptocurrency , symbol , time_frame , trigger_source ,
241
- self ._get_latest_eval_time (matrix_id , cryptocurrency , symbol , time_frame ))
250
+ self .log_last_call_by_time_frame_and_symbol (
251
+ matrix_id = self .matrix_id ,
252
+ cryptocurrency = cryptocurrency ,
253
+ symbol = symbol ,
254
+ time_frame = time_frame ,
255
+ trigger_source = commons_enums .TriggerSource .KLINE .value ,
256
+ trigger_cache_timestamp = kline [commons_enums .PriceIndexes .IND_PRICE_TIME .value ],
257
+ kline = kline
258
+ )
259
+ await self .call_script (
260
+ matrix_id = self .matrix_id , cryptocurrency = cryptocurrency ,
261
+ symbol = symbol , time_frame = time_frame ,
262
+ trigger_source = commons_enums .TriggerSource .KLINE .value ,
263
+ trigger_cache_timestamp = kline [commons_enums .PriceIndexes .IND_PRICE_TIME .value ],
264
+ kline = kline )
265
+
266
+ async def set_final_eval (
267
+ self , matrix_id : str , cryptocurrency : str ,
268
+ symbol : str , time_frame , trigger_source : str ):
269
+ trigger_cache_timestamp = self ._get_latest_eval_time (
270
+ matrix_id , cryptocurrency , symbol , time_frame )
271
+ self .log_last_call_by_time_frame_and_symbol (
272
+ matrix_id = matrix_id ,
273
+ cryptocurrency = cryptocurrency ,
274
+ symbol = symbol ,
275
+ time_frame = time_frame ,
276
+ trigger_source = trigger_source ,
277
+ trigger_cache_timestamp = trigger_cache_timestamp ,
278
+ )
279
+ await self .call_script (
280
+ matrix_id = matrix_id , cryptocurrency = cryptocurrency ,
281
+ symbol = symbol , time_frame = time_frame ,
282
+ trigger_source = trigger_source ,
283
+ trigger_cache_timestamp = trigger_cache_timestamp ,
284
+ )
242
285
243
286
def _get_latest_eval_time (self , matrix_id : str , cryptocurrency : str , symbol : str , time_frame ):
244
287
try :
@@ -261,8 +304,6 @@ async def call_script(self, matrix_id: str, cryptocurrency: str, symbol: str, ti
261
304
self .trading_mode , matrix_id , cryptocurrency , symbol , time_frame ,
262
305
trigger_source , trigger_cache_timestamp , candle , kline , init_call = init_call
263
306
)
264
- self .last_call_by_timeframe [time_frame ] = \
265
- (matrix_id , cryptocurrency , symbol , time_frame , trigger_source , trigger_cache_timestamp , candle , kline , init_call )
266
307
context .matrix_id = matrix_id
267
308
context .cryptocurrency = cryptocurrency
268
309
context .symbol = symbol
@@ -307,3 +348,27 @@ async def post_trigger(self):
307
348
await database .flush ()
308
349
except Exception as err :
309
350
self .logger .exception (err , True , f"Error when flushing database: { err } " )
351
+
352
+ def log_last_call_by_time_frame_and_symbol (
353
+ self ,
354
+ matrix_id ,
355
+ cryptocurrency ,
356
+ symbol ,
357
+ time_frame ,
358
+ trigger_source ,
359
+ trigger_cache_timestamp ,
360
+ candle = None ,
361
+ kline = None ,
362
+ ):
363
+ if time_frame not in self .last_calls_by_time_frame_and_symbol :
364
+ self .last_calls_by_time_frame_and_symbol [time_frame ] = {}
365
+ self .last_calls_by_time_frame_and_symbol [time_frame ][symbol ] = (
366
+ matrix_id ,
367
+ cryptocurrency ,
368
+ symbol ,
369
+ time_frame ,
370
+ trigger_source ,
371
+ trigger_cache_timestamp ,
372
+ candle ,
373
+ kline ,
374
+ )
0 commit comments