Skip to content

[Scripted] properly handle last_call_by_time_frame #851

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,30 +97,40 @@ def get_script_from_module(module):

async def reload_scripts(self):
for is_live in (False, True):
module = self.__class__.TRADING_SCRIPT_MODULE if is_live else self.__class__.BACKTESTING_SCRIPT_MODULE
importlib.reload(module)
self.register_script_module(module, live=is_live)
# reload config
await self.reload_config(self.exchange_manager.bot_id)
if is_live:
# todo cancel and restart live tasks
await self.start_over_database()

if ((is_live and self.__class__.TRADING_SCRIPT_MODULE)
or (not is_live and self.__class__.BACKTESTING_SCRIPT_MODULE)):
module = (self.__class__.TRADING_SCRIPT_MODULE
if is_live
else self.__class__.BACKTESTING_SCRIPT_MODULE)
importlib.reload(module)
self.register_script_module(module, live=is_live)
# reload config
await self.reload_config(self.exchange_manager.bot_id)
if is_live:
# todo cancel and restart live tasks
await self.start_over_database()

async def start_over_database(self):
await modes_util.clear_plotting_cache(self)
symbol_db = databases.RunDatabasesProvider.instance().get_symbol_db(self.bot_id,
self.exchange_manager.exchange_name,
self.symbol)
symbol_db.set_initialized_flags(False)
for producer in self.producers:
for time_frame, call_args in producer.last_call_by_timeframe.items():
run_db = databases.RunDatabasesProvider.instance().get_run_db(self.bot_id)
await producer.init_user_inputs(False)
run_db.set_initialized_flags(False, (time_frame, ))
await databases.CacheManager().close_cache(commons_constants.UNPROVIDED_CACHE_IDENTIFIER,
reset_cache_db_ids=True)
await producer.call_script(*call_args)
await run_db.flush()
for time_frame, call_args_by_symbols in producer.last_calls_by_time_frame_and_symbol.items():
if self.symbol in call_args_by_symbols:
run_db = databases.RunDatabasesProvider.instance().get_run_db(self.bot_id)
await producer.init_user_inputs(False)
run_db.set_initialized_flags(False, (time_frame, ))
await databases.CacheManager().close_cache(commons_constants.UNPROVIDED_CACHE_IDENTIFIER,
reset_cache_db_ids=True)
await producer.call_script(*call_args_by_symbols[self.symbol])
await run_db.flush()
else:
raise RuntimeError(
f"Failed to reload trading mode as {self.symbol}"
"is not initialized"
)

def set_initialized_trading_pair_by_bot_id(self, symbol, time_frame, initialized):
# todo migrate to event tree
Expand Down Expand Up @@ -160,7 +170,7 @@ class AbstractScriptedTradingModeProducer(modes_channel.AbstractTradingModeProdu

def __init__(self, channel, config, trading_mode, exchange_manager):
super().__init__(channel, config, trading_mode, exchange_manager)
self.last_call_by_timeframe = {}
self.last_calls_by_time_frame_and_symbol = {}

async def start(self) -> None:
await super().start()
Expand Down Expand Up @@ -218,23 +228,60 @@ async def ohlcv_callback(self, exchange: str, exchange_id: str, cryptocurrency:
trigger_time = candle[commons_enums.PriceIndexes.IND_PRICE_TIME.value] + \
commons_enums.TimeFramesMinutes[commons_enums.TimeFrames(time_frame)] * \
commons_constants.MINUTE_TO_SECONDS
await self.call_script(self.matrix_id, cryptocurrency, symbol, time_frame,
commons_enums.TriggerSource.OHLCV.value,
trigger_time,
candle=candle,
init_call=init_call)
self.log_last_call_by_time_frame_and_symbol(
matrix_id=self.matrix_id,
cryptocurrency=cryptocurrency,
symbol=symbol,
time_frame=time_frame,
trigger_source=commons_enums.TriggerSource.OHLCV.value,
trigger_cache_timestamp=trigger_time,
candle=candle
)
await self.call_script(
matrix_id=self.matrix_id, cryptocurrency=cryptocurrency,
symbol=symbol, time_frame=time_frame,
trigger_source=commons_enums.TriggerSource.OHLCV.value,
trigger_cache_timestamp=trigger_time,
candle=candle, init_call=init_call)

async def kline_callback(self, exchange: str, exchange_id: str, cryptocurrency: str, symbol: str,
time_frame, kline: dict):
async with self.trading_mode_trigger(), self.trading_mode.remote_signal_publisher(symbol):
await self.call_script(self.matrix_id, cryptocurrency, symbol, time_frame,
commons_enums.TriggerSource.KLINE.value,
kline[commons_enums.PriceIndexes.IND_PRICE_TIME.value],
kline=kline)

async def set_final_eval(self, matrix_id: str, cryptocurrency: str, symbol: str, time_frame, trigger_source: str):
await self.call_script(matrix_id, cryptocurrency, symbol, time_frame, trigger_source,
self._get_latest_eval_time(matrix_id, cryptocurrency, symbol, time_frame))
self.log_last_call_by_time_frame_and_symbol(
matrix_id=self.matrix_id,
cryptocurrency=cryptocurrency,
symbol=symbol,
time_frame=time_frame,
trigger_source=commons_enums.TriggerSource.KLINE.value,
trigger_cache_timestamp=kline[commons_enums.PriceIndexes.IND_PRICE_TIME.value],
kline=kline
)
await self.call_script(
matrix_id=self.matrix_id, cryptocurrency=cryptocurrency,
symbol=symbol, time_frame=time_frame,
trigger_source=commons_enums.TriggerSource.KLINE.value,
trigger_cache_timestamp=kline[commons_enums.PriceIndexes.IND_PRICE_TIME.value],
kline=kline)

async def set_final_eval(
self, matrix_id: str, cryptocurrency: str,
symbol: str, time_frame, trigger_source: str):
trigger_cache_timestamp = self._get_latest_eval_time(
matrix_id, cryptocurrency, symbol, time_frame)
self.log_last_call_by_time_frame_and_symbol(
matrix_id=matrix_id,
cryptocurrency=cryptocurrency,
symbol=symbol,
time_frame=time_frame,
trigger_source=trigger_source,
trigger_cache_timestamp=trigger_cache_timestamp,
)
await self.call_script(
matrix_id=matrix_id, cryptocurrency=cryptocurrency,
symbol=symbol, time_frame=time_frame,
trigger_source=trigger_source,
trigger_cache_timestamp=trigger_cache_timestamp,
)

def _get_latest_eval_time(self, matrix_id: str, cryptocurrency: str, symbol: str, time_frame):
try:
Expand All @@ -257,8 +304,6 @@ async def call_script(self, matrix_id: str, cryptocurrency: str, symbol: str, ti
self.trading_mode, matrix_id, cryptocurrency, symbol, time_frame,
trigger_source, trigger_cache_timestamp, candle, kline, init_call=init_call
)
self.last_call_by_timeframe[time_frame] = \
(matrix_id, cryptocurrency, symbol, time_frame, trigger_source, trigger_cache_timestamp, candle, kline, init_call)
context.matrix_id = matrix_id
context.cryptocurrency = cryptocurrency
context.symbol = symbol
Expand All @@ -267,7 +312,8 @@ async def call_script(self, matrix_id: str, cryptocurrency: str, symbol: str, ti
run_data_writer = databases.RunDatabasesProvider.instance().get_run_db(self.exchange_manager.bot_id)
try:
await self._pre_script_call(context)
await self.trading_mode.get_script(live=True)(context)
if hasattr(self.trading_mode, "TRADING_SCRIPT_MODULE") and self.trading_mode.TRADING_SCRIPT_MODULE:
await self.trading_mode.get_script(live=True)(context)
except errors.UnreachableExchange:
raise
except (commons_errors.MissingDataError, commons_errors.ExecutionAborted) as e:
Expand Down Expand Up @@ -302,3 +348,27 @@ async def post_trigger(self):
await database.flush()
except Exception as err:
self.logger.exception(err, True, f"Error when flushing database: {err}")

def log_last_call_by_time_frame_and_symbol(
self,
matrix_id,
cryptocurrency,
symbol,
time_frame,
trigger_source,
trigger_cache_timestamp,
candle=None,
kline=None,
):
if time_frame not in self.last_calls_by_time_frame_and_symbol:
self.last_calls_by_time_frame_and_symbol[time_frame] = {}
self.last_calls_by_time_frame_and_symbol[time_frame][symbol] = (
matrix_id,
cryptocurrency,
symbol,
time_frame,
trigger_source,
trigger_cache_timestamp,
candle,
kline,
)