From a0598e2e15d05812205379ace652bc98b3b85c85 Mon Sep 17 00:00:00 2001 From: Renizmy Date: Sun, 26 Oct 2025 20:14:34 +0100 Subject: [PATCH 1/3] init --- pycti/connector/opencti_connector_helper.py | 536 +++++++++++++++++++- 1 file changed, 519 insertions(+), 17 deletions(-) diff --git a/pycti/connector/opencti_connector_helper.py b/pycti/connector/opencti_connector_helper.py index 6d843729f..a49c11248 100644 --- a/pycti/connector/opencti_connector_helper.py +++ b/pycti/connector/opencti_connector_helper.py @@ -726,6 +726,351 @@ def stop(self) -> None: self.exit_event.set() +class ListenStreamBatch(threading.Thread): + def __init__( + self, + helper, + callback, + url, + token, + verify_ssl, + start_timestamp, + live_stream_id, + listen_delete, + no_dependencies, + recover_iso_date, + with_inferences, + batch_size, + batch_timeout, + max_batches_per_minute, + ) -> None: + threading.Thread.__init__(self) + self.helper = helper + self.callback = callback + self.url = url + self.token = token + self.verify_ssl = verify_ssl + self.start_timestamp = start_timestamp + self.live_stream_id = live_stream_id + self.listen_delete = listen_delete + self.no_dependencies = no_dependencies + self.recover_iso_date = recover_iso_date + self.with_inferences = with_inferences + self.batch_size = batch_size + self.batch_timeout = batch_timeout + self.exit_event = threading.Event() + self.batch_start_time = None # Will be initialized in run() + + # Rate limiting state + self.max_batches_per_minute = max_batches_per_minute + if max_batches_per_minute is not None: + from collections import deque + self.batch_timestamps = deque() + else: + self.batch_timestamps = None + + def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="unknown"): + """Process a batch of messages and update connector state. + + This method handles: + - Logging batch processing with metadata + - Invoking user callback with batch + - Updating connector state with last processed message ID + - Handling state reset scenarios + - ALWAYS updating state, even if callback fails + + Args: + batch: List of SSE message objects to process + last_msg_id: Message ID of the last event in the batch + trigger_reason: String describing what triggered batch processing + (e.g., "size_limit", "timeout", "final_batch") + + Returns: + bool: True if processing succeeded and thread should continue, + False if state was reset and thread should exit + """ + # Skip processing empty batches + if len(batch) == 0: + return True + elapsed = time.time() - self.batch_start_time + + self.helper.connector_logger.debug( + f"Processing batch due to {trigger_reason}", + { + "batch_size": len(batch), + "elapsed_time": elapsed, + }, + ) + self.helper.connector_logger.info( + "Processing batch", + { + "batch_size": len(batch), + "elapsed_time": elapsed, + "trigger": trigger_reason, + }, + ) + + # Prepare batch data with metadata + batch_data = { + "events": batch, + "batch_metadata": { + "batch_size": len(batch), + "trigger_reason": trigger_reason, + "elapsed_time": elapsed, + "timestamp": time.time(), + } + } + + # Invoke user's callback function with exception handling + callback_failed = False + callback_error = None + try: + self.callback(batch_data) + except Exception as ex: + callback_failed = True + callback_error = ex + self.helper.connector_logger.error( + "Batch callback failed - continuing with next batch", + { + "error": str(ex), + "error_type": type(ex).__name__, + "batch_size": len(batch), + "trigger": trigger_reason, + "last_msg_id": last_msg_id, + }, + ) + self.helper.metric.inc("error_count") + + # Update state with last processed message EVEN IF CALLBACK FAILED + # This ensures the connector progresses through the stream + state = self.helper.get_state() + if state is None: + # State was reset from UI during processing + self.helper.connector_logger.warning( + "State was reset during batch processing, stopping thread", + { + "batch_size": len(batch), + "trigger": trigger_reason, + "callback_failed": callback_failed, + } + ) + self.exit_event.set() + return False + + # Update and save state + state["start_from"] = str(last_msg_id) + self.helper.set_state(state) + + if callback_failed: + self.helper.connector_logger.warning( + "State updated despite callback failure - batch will NOT be retried", + { + "batch_size": len(batch), + "last_msg_id": last_msg_id, + "error": str(callback_error), + } + ) + + return True + + def _wait_for_rate_limit(self, q): + """Apply rate limiting before processing batch. + + Uses a sliding window algorithm to enforce max_batches_per_minute. + Sleeps if necessary to stay within the limit. + + Note: No lock needed - this method is only called by the run() method + in a single-threaded context. Instance variables are not shared across threads. + + Args: + q: Queue for sending heartbeat signals to StreamAlive thread + + Returns: + float: Time spent waiting (seconds), 0 if no wait needed + """ + if self.max_batches_per_minute is None: + return 0.0 # No rate limiting + + now = time.time() + cutoff_time = now - 60.0 # 60 seconds ago + + # Remove timestamps older than 60 seconds (sliding window cleanup) + while self.batch_timestamps and self.batch_timestamps[0] < cutoff_time: + self.batch_timestamps.popleft() + + # Check if we've reached the rate limit + wait_time = 0.0 + if len(self.batch_timestamps) >= self.max_batches_per_minute: + # Calculate how long to wait until oldest batch expires + oldest_timestamp = self.batch_timestamps[0] + time_since_oldest = now - oldest_timestamp + wait_time = 60.0 - time_since_oldest + + if wait_time > 0: + # Log the rate limit delay + self.helper.connector_logger.info( + "Rate limit reached, delaying batch processing", + { + "max_batches_per_minute": self.max_batches_per_minute, + "current_batch_count": len(self.batch_timestamps), + "wait_seconds": round(wait_time, 2), + } + ) + + + chunk_size = 30.0 + total_slept = 0.0 + + while total_slept < wait_time: + sleep_duration = min(chunk_size, wait_time - total_slept) + time.sleep(sleep_duration) + total_slept += sleep_duration + + # Send heartbeat to keep StreamAlive alive during long waits + q.put("rate_limit_heartbeat", block=False) + + if self.exit_event.is_set(): + break + + # Cleanup again after sleep + now = time.time() + cutoff_time = now - 60.0 + while self.batch_timestamps and self.batch_timestamps[0] < cutoff_time: + self.batch_timestamps.popleft() + + # Record this batch timestamp + self.batch_timestamps.append(now) + + return wait_time + + def run(self) -> None: + try: + self.helper.connector_logger.info("Starting ListenStreamBatch thread") + current_state = self.helper.get_state() + start_from = self.start_timestamp + recover_until = self.recover_iso_date + if current_state is None: + if start_from is None: + start_from = "0-0" + if recover_until is None: + recover_until = self.helper.date_now_z() + self.helper.set_state( + {"start_from": start_from, "recover_until": recover_until} + ) + else: + if "connectorLastEventId" in current_state: + start_from = current_state["connectorLastEventId"] + else: + start_from = current_state["start_from"] + if "connectorStartTime" in current_state: + recover_until = current_state["connectorStartTime"] + else: + recover_until = current_state["recover_until"] + + q = Queue(maxsize=1) + stream_alive = StreamAlive(self.helper, q) + stream_alive.start() + + live_stream_url = self.url + if recover_until is not False and recover_until not in [ + "no", + "none", + "No", + "None", + "false", + "False", + ]: + live_stream_url = live_stream_url + "?recover=" + recover_until + listen_delete = str(self.listen_delete).lower() + no_dependencies = str(self.no_dependencies).lower() + with_inferences = str(self.with_inferences).lower() + self.helper.connector_logger.info( + "Starting to listen stream events in batch mode", + { + "live_stream_url": live_stream_url, + "listen_delete": listen_delete, + "no_dependencies": no_dependencies, + "with_inferences": with_inferences, + "batch_size": self.batch_size, + "batch_timeout": self.batch_timeout, + }, + ) + messages = SSEClient( + live_stream_url, + start_from, + headers={ + "authorization": "Bearer " + self.token, + "listen-delete": listen_delete, + "no-dependencies": no_dependencies, + "with-inferences": with_inferences, + }, + verify=self.verify_ssl, + ) + + batch = [] + self.batch_start_time = time.time() + last_msg_id = None + + for msg in messages: + if self.exit_event.is_set(): + stream_alive.stop() + break + if msg.id is not None: + try: + q.put(msg.event, block=False) + except queue.Full: + pass + + if self.batch_timeout and len(batch) > 0: + elapsed = time.time() - self.batch_start_time + if elapsed >= self.batch_timeout: + self._wait_for_rate_limit(q) + + should_continue = self._process_batch_and_update_state( + batch, last_msg_id, trigger_reason="timeout" + ) + if not should_continue: + break + batch = [] + self.batch_start_time = time.time() + + if msg.event == "heartbeat" or msg.event == "connected": + state = self.helper.get_state() + if state is None: + self.exit_event.set() + last_msg_id = msg.id + else: + batch.append(msg) + last_msg_id = msg.id + + if self.batch_size and len(batch) >= self.batch_size: + self._wait_for_rate_limit(q) + should_continue = self._process_batch_and_update_state( + batch, last_msg_id, trigger_reason="size_limit" + ) + if not should_continue: + break + batch = [] + self.batch_start_time = time.time() + + if len(batch) > 0: + self._process_batch_and_update_state( + batch, last_msg_id, trigger_reason="final_batch" + ) + + except Exception as ex: + self.helper.connector_logger.error( + "Error in ListenStreamBatch loop, exit.", {"reason": str(ex)} + ) + sys.excepthook(*sys.exc_info()) + + def stop(self): + self.helper.connector_logger.info( + "Preparing ListenStreamBatch for clean shutdown" + ) + self.exit_event.set() + + class ListenStream(threading.Thread): def __init__( self, @@ -1796,9 +2141,8 @@ def listen( self.listen_queue.start() self.listen_queue.join() - def listen_stream( + def _resolve_stream_parameters( self, - message_callback, url=None, token=None, verify_ssl=None, @@ -1808,10 +2152,23 @@ def listen_stream( no_dependencies=None, recover_iso_date=None, with_inferences=None, - ) -> ListenStream: - """listen for messages and register callback function - - :param message_callback: callback function to process messages + ) -> dict: + """Resolve stream connection parameters from arguments or configuration. + + This helper method consolidates parameter resolution logic used by both + listen_stream() and listen_stream_batch() to eliminate code duplication. + + :param url: OpenCTI URL (defaults to configured URL) + :param token: Authentication token (defaults to configured token) + :param verify_ssl: SSL verification flag (defaults to configured value) + :param start_timestamp: Starting timestamp for stream (optional) + :param live_stream_id: Stream ID to consume from (optional) + :param listen_delete: Whether to listen for delete events (defaults to configured value) + :param no_dependencies: Whether to exclude dependencies (defaults to configured value) + :param recover_iso_date: ISO date to recover from (optional) + :param with_inferences: Whether to include inferences (defaults to configured value) + :return: Dictionary with resolved parameters + :rtype: dict """ # URL if url is None: @@ -1861,25 +2218,170 @@ def listen_stream( ): recover_iso_date = self.connect_live_stream_recover_iso_date # Generate the stream URL - url = url + "/stream" + stream_url = url + "/stream" if live_stream_id is not None: - url = url + "/" + live_stream_id + stream_url = stream_url + "/" + live_stream_id + + return { + "url": stream_url, + "token": token, + "verify_ssl": verify_ssl, + "start_timestamp": start_timestamp, + "live_stream_id": live_stream_id, + "listen_delete": listen_delete, + "no_dependencies": no_dependencies, + "recover_iso_date": recover_iso_date, + "with_inferences": with_inferences, + } + + def listen_stream( + self, + message_callback, + url=None, + token=None, + verify_ssl=None, + start_timestamp=None, + live_stream_id=None, + listen_delete=None, + no_dependencies=None, + recover_iso_date=None, + with_inferences=None, + ) -> ListenStream: + """listen for messages and register callback function + + :param message_callback: callback function to process messages + """ + # Resolve all stream parameters using helper method + params = self._resolve_stream_parameters( + url=url, + token=token, + verify_ssl=verify_ssl, + start_timestamp=start_timestamp, + live_stream_id=live_stream_id, + listen_delete=listen_delete, + no_dependencies=no_dependencies, + recover_iso_date=recover_iso_date, + with_inferences=with_inferences, + ) + self.listen_stream = ListenStream( self, message_callback, - url, - token, - verify_ssl, - start_timestamp, - live_stream_id, - listen_delete, - no_dependencies, - recover_iso_date, - with_inferences, + params["url"], + params["token"], + params["verify_ssl"], + params["start_timestamp"], + params["live_stream_id"], + params["listen_delete"], + params["no_dependencies"], + params["recover_iso_date"], + params["with_inferences"], ) self.listen_stream.start() return self.listen_stream + def listen_stream_batch( + self, + message_callback, + batch_size=None, + batch_timeout=None, + max_batches_per_minute=None, + url=None, + token=None, + verify_ssl=None, + start_timestamp=None, + live_stream_id=None, + listen_delete=None, + no_dependencies=None, + recover_iso_date=None, + with_inferences=None, + ) -> ListenStreamBatch: + """Listen for stream messages and process them in batches. + + Collects events and processes them in batches based on either: + - Number of events (batch_size): Process when batch contains N events + - Time window (batch_timeout): Process batch every X seconds + - Both: Process when either condition is met (whichever comes first) + + :param message_callback: Callback function to process batch of messages. + Receives a list of SSE message objects. + If the callback raises an exception, it is caught and logged, + but the connector continues processing (see Error Handling above). + :type message_callback: Callable[[List], None] + :param batch_size: Number of events to collect before processing batch (optional) + :type batch_size: int or None + :param batch_timeout: Time in seconds to wait before processing batch (optional) + :type batch_timeout: float or None + :param max_batches_per_minute: Maximum batches to process per 60-second window (optional) + :type max_batches_per_minute: int or None + :param url: OpenCTI URL (defaults to configured URL) + :param token: Authentication token (defaults to configured token) + :param verify_ssl: SSL verification flag (defaults to configured value) + :param start_timestamp: Starting timestamp for stream (optional) + :param live_stream_id: Stream ID to consume from (optional) + :param listen_delete: Whether to listen for delete events (defaults to configured value) + :param no_dependencies: Whether to exclude dependencies (defaults to configured value) + :param recover_iso_date: ISO date to recover from (optional) + :param with_inferences: Whether to include inferences (defaults to configured value) + :return: ListenStreamBatch thread instance + :rtype: ListenStreamBatch + :raises ValueError: If neither batch_size nor batch_timeout is specified + """ + if batch_size is None and batch_timeout is None: + raise ValueError( + "At least one of batch_size or batch_timeout must be specified" + ) + + if max_batches_per_minute is not None: + if not isinstance(max_batches_per_minute, int): + raise ValueError("max_batches_per_minute must be an integer") + if max_batches_per_minute <= 0: + raise ValueError("max_batches_per_minute must be > 0") + if max_batches_per_minute > 10000: + self.connector_logger.warning( + "Very high max_batches_per_minute configured", + {"max_batches_per_minute": max_batches_per_minute} + ) + + params = self._resolve_stream_parameters( + url=url, + token=token, + verify_ssl=verify_ssl, + start_timestamp=start_timestamp, + live_stream_id=live_stream_id, + listen_delete=listen_delete, + no_dependencies=no_dependencies, + recover_iso_date=recover_iso_date, + with_inferences=with_inferences, + ) + + if max_batches_per_minute is not None: + self.connector_logger.info( + "Batch rate limiting enabled", + { + "max_batches_per_minute": max_batches_per_minute + } + ) + + self._listen_stream_batch_thread = ListenStreamBatch( + self, + message_callback, + params["url"], + params["token"], + params["verify_ssl"], + params["start_timestamp"], + params["live_stream_id"], + params["listen_delete"], + params["no_dependencies"], + params["recover_iso_date"], + params["with_inferences"], + batch_size, + batch_timeout, + max_batches_per_minute, + ) + self._listen_stream_batch_thread.start() + return self._listen_stream_batch_thread + def get_opencti_url(self) -> Optional[Union[bool, int, str]]: """Get the OpenCTI URL. From 69b627299c27d1460b762cb25513c5f5ce6289cf Mon Sep 17 00:00:00 2001 From: Renizmy Date: Thu, 30 Oct 2025 08:09:26 +0100 Subject: [PATCH 2/3] align state management between ListenStream and ListenStreamBatch (retry on error) --- pycti/connector/opencti_connector_helper.py | 44 +++++---------------- 1 file changed, 10 insertions(+), 34 deletions(-) diff --git a/pycti/connector/opencti_connector_helper.py b/pycti/connector/opencti_connector_helper.py index a49c11248..6d5f01d99 100644 --- a/pycti/connector/opencti_connector_helper.py +++ b/pycti/connector/opencti_connector_helper.py @@ -777,7 +777,9 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un - Invoking user callback with batch - Updating connector state with last processed message ID - Handling state reset scenarios - - ALWAYS updating state, even if callback fails + + Note: If callback fails, state will NOT be updated and exception will propagate. + This matches ListenStream behavior where failed messages are retried on restart. Args: batch: List of SSE message objects to process @@ -821,28 +823,7 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un } } - # Invoke user's callback function with exception handling - callback_failed = False - callback_error = None - try: - self.callback(batch_data) - except Exception as ex: - callback_failed = True - callback_error = ex - self.helper.connector_logger.error( - "Batch callback failed - continuing with next batch", - { - "error": str(ex), - "error_type": type(ex).__name__, - "batch_size": len(batch), - "trigger": trigger_reason, - "last_msg_id": last_msg_id, - }, - ) - self.helper.metric.inc("error_count") - - # Update state with last processed message EVEN IF CALLBACK FAILED - # This ensures the connector progresses through the stream + self.callback(batch_data) state = self.helper.get_state() if state is None: # State was reset from UI during processing @@ -851,7 +832,6 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un { "batch_size": len(batch), "trigger": trigger_reason, - "callback_failed": callback_failed, } ) self.exit_event.set() @@ -861,16 +841,6 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un state["start_from"] = str(last_msg_id) self.helper.set_state(state) - if callback_failed: - self.helper.connector_logger.warning( - "State updated despite callback failure - batch will NOT be retried", - { - "batch_size": len(batch), - "last_msg_id": last_msg_id, - "error": str(callback_error), - } - ) - return True def _wait_for_rate_limit(self, q): @@ -1038,6 +1008,12 @@ def run(self) -> None: state = self.helper.get_state() if state is None: self.exit_event.set() + else: + # Only update state if batch is empty to prevent message loss + # If batch has unprocessed messages, state will be updated after batch processing + if len(batch) == 0: + state["start_from"] = str(msg.id) + self.helper.set_state(state) last_msg_id = msg.id else: batch.append(msg) From 9ad7b3393bb069eba759c27a7931e1f55670cc62 Mon Sep 17 00:00:00 2001 From: Renizmy Date: Thu, 30 Oct 2025 11:48:30 +0100 Subject: [PATCH 3/3] fix linter --- pycti/connector/opencti_connector_helper.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pycti/connector/opencti_connector_helper.py b/pycti/connector/opencti_connector_helper.py index 6d5f01d99..0f243b72e 100644 --- a/pycti/connector/opencti_connector_helper.py +++ b/pycti/connector/opencti_connector_helper.py @@ -765,11 +765,14 @@ def __init__( self.max_batches_per_minute = max_batches_per_minute if max_batches_per_minute is not None: from collections import deque + self.batch_timestamps = deque() else: self.batch_timestamps = None - def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="unknown"): + def _process_batch_and_update_state( + self, batch, last_msg_id, trigger_reason="unknown" + ): """Process a batch of messages and update connector state. This method handles: @@ -820,7 +823,7 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un "trigger_reason": trigger_reason, "elapsed_time": elapsed, "timestamp": time.time(), - } + }, } self.callback(batch_data) @@ -832,7 +835,7 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un { "batch_size": len(batch), "trigger": trigger_reason, - } + }, ) self.exit_event.set() return False @@ -884,10 +887,9 @@ def _wait_for_rate_limit(self, q): "max_batches_per_minute": self.max_batches_per_minute, "current_batch_count": len(self.batch_timestamps), "wait_seconds": round(wait_time, 2), - } + }, ) - chunk_size = 30.0 total_slept = 0.0 @@ -2316,7 +2318,7 @@ def listen_stream_batch( if max_batches_per_minute > 10000: self.connector_logger.warning( "Very high max_batches_per_minute configured", - {"max_batches_per_minute": max_batches_per_minute} + {"max_batches_per_minute": max_batches_per_minute}, ) params = self._resolve_stream_parameters( @@ -2334,9 +2336,7 @@ def listen_stream_batch( if max_batches_per_minute is not None: self.connector_logger.info( "Batch rate limiting enabled", - { - "max_batches_per_minute": max_batches_per_minute - } + {"max_batches_per_minute": max_batches_per_minute}, ) self._listen_stream_batch_thread = ListenStreamBatch(