Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka tool #229

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,20 @@ Configure Slack to send notifications to specific channels. Provide your Slack t

</details>

<summary>Kafka Integration</summary>

Enable Kafka as a tool for Holmes to fetch kafka metadata like the topics or consumer groups.

```bash
export KAFKA_BROKERS="localhost:9092" # Required. comma separated list
export KAFKA_SECURITY_PROTOCOL = "PLAINTEXT" # optional
export KAFKA_SASL_MECHANISM= "..." # optional
export KAFKA_USERNAME = "..." # optional
export KAFKA_PASSWORD = "..." # optional
export KAFKA_CLIENT_ID = "..." # optional
```

</details>
<details>

<summary>Custom Runbooks</summary>
Expand Down
29 changes: 27 additions & 2 deletions holmes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from holmes.plugins.sources.prometheus.plugin import AlertManagerSource
from holmes.plugins.toolsets import (load_builtin_toolsets,
load_toolsets_from_file)
from holmes.plugins.toolsets.kafka import KafkaConfig
from holmes.utils.pydantic_utils import RobustaBaseConfig, load_model_from_file
from holmes.utils.definitions import CUSTOM_TOOLSET_LOCATION
from holmes.utils.holmes_sync_toolsets import load_custom_toolsets_config, merge_and_override_bultin_toolsets_with_toolsets_config
Expand Down Expand Up @@ -72,6 +73,13 @@ class Config(RobustaBaseConfig):
opsgenie_team_integration_key: Optional[SecretStr] = None
opsgenie_query: Optional[str] = None

kafka_brokers: Optional[str] = None # comma separated values
kafka_security_protocol: Optional[str] = None
kafka_sasl_mechanism: Optional[str] = None
kafka_username: Optional[str] = None
kafka_password: Optional[str] = None
kafka_client_id: Optional[str] = None

custom_runbooks: List[FilePath] = []
custom_toolsets: List[FilePath] = []

Expand All @@ -98,6 +106,12 @@ def load_from_env(cls):
"github_repository",
"github_pat",
"github_query",
"kafka_brokers",
"kafka_security_protocol",
"kafka_sasl_mechanism",
"kafka_username",
"kafka_password",
"kafka_client_id"
# TODO
# custom_runbooks
]:
Expand Down Expand Up @@ -126,13 +140,24 @@ def __get_cluster_name() -> Optional[str]:

return None

def get_kafka_config(self) -> Optional[KafkaConfig]:
if self.kafka_brokers:
return KafkaConfig(
brokers = self.kafka_brokers.split(","),
security_protocol = self.kafka_security_protocol,
sasl_mechanism = self.kafka_sasl_mechanism,
username = self.kafka_username,
password = self.kafka_password,
client_id = self.kafka_client_id
)

def create_console_tool_executor(
self, allowed_toolsets: ToolsetPattern, dal:Optional[SupabaseDal]
) -> ToolExecutor:
"""
Creates ToolExecutor for the cli
"""
default_toolsets = [toolset for toolset in load_builtin_toolsets(dal) if any(tag in (ToolsetTag.CORE, ToolsetTag.CLI) for tag in toolset.tags)]
default_toolsets = [toolset for toolset in load_builtin_toolsets(dal=dal, kafka_config=self.get_kafka_config()) if any(tag in (ToolsetTag.CORE, ToolsetTag.CLI) for tag in toolset.tags)]

if allowed_toolsets == "*":
matching_toolsets = default_toolsets
Expand Down Expand Up @@ -184,7 +209,7 @@ def create_tool_executor(
Creates ToolExecutor for the server endpoints
"""

all_toolsets = load_builtin_toolsets(dal=dal)
all_toolsets = load_builtin_toolsets(dal=dal, kafka_config=self.get_kafka_config())

if os.path.isfile(CUSTOM_TOOLSET_LOCATION):
try:
Expand Down
13 changes: 8 additions & 5 deletions holmes/plugins/toolsets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from typing import Optional
import yaml

from holmes.plugins.toolsets.kafka import KafkaConfig, KafkaToolset

THIS_DIR = os.path.abspath(os.path.dirname(__file__))


Expand All @@ -39,12 +41,13 @@ def load_toolsets_from_file(path: str, silent_fail: bool = False) -> List[YAMLTo
return file_toolsets


def load_python_toolsets(dal:Optional[SupabaseDal]) -> List[Toolset]:
def load_python_toolsets(dal:Optional[SupabaseDal], kafka_config: Optional[KafkaConfig] = None) -> List[Toolset]:
logging.debug("loading python toolsets")
return [InternetToolset(), FindingsToolset(dal)]

return [InternetToolset(), FindingsToolset(dal), KafkaToolset(kafka_config)]

def load_builtin_toolsets(dal:Optional[SupabaseDal] = None) -> List[Toolset]:
# TODO: separate the Config logic from the Config structure so that config can be passed directly to this func
# It will simplify the Config logic and the code here to avoid passing specific configs to toolsets
def load_builtin_toolsets(dal:Optional[SupabaseDal] = None, kafka_config: Optional[KafkaConfig] = None) -> List[Toolset]:
all_toolsets = []
logging.debug(f"loading toolsets from {THIS_DIR}")
for filename in os.listdir(THIS_DIR):
Expand All @@ -53,5 +56,5 @@ def load_builtin_toolsets(dal:Optional[SupabaseDal] = None) -> List[Toolset]:
path = os.path.join(THIS_DIR, filename)
all_toolsets.extend(load_toolsets_from_file(path))

all_toolsets.extend(load_python_toolsets(dal))
all_toolsets.extend(load_python_toolsets(dal, kafka_config))
return all_toolsets
6 changes: 3 additions & 3 deletions holmes/plugins/toolsets/findings.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Optional
from typing_extensions import Dict
from holmes.core.supabase_dal import SupabaseDal
from holmes.core.tools import StaticPrerequisite, Tool, ToolParameter, Toolset
from holmes.core.tools import StaticPrerequisite, Tool, ToolParameter, Toolset, ToolsetTag

PARAM_FINDING_ID = "id"

Expand Down Expand Up @@ -52,7 +52,7 @@ def invoke(self, params: Dict) -> str:
return f"There was an internal error while fetching finding {finding_id}"

def get_parameterized_one_liner(self, params:Dict) -> str:
return f"Fetch metadata and history"
return "Fetch metadata and history"


class FindingsToolset(Toolset):
Expand All @@ -72,5 +72,5 @@ def __init__(self, dal: Optional[SupabaseDal]):
name="robusta",
prerequisites=[dal_prereq],
tools=[FetchRobustaFinding(dal)],
tags=["core",]
tags=[ToolsetTag.CORE]
)
4 changes: 2 additions & 2 deletions holmes/plugins/toolsets/internet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from typing import Any
from holmes.core.tools import Tool, ToolParameter, Toolset, ToolsetCommandPrerequisite
from holmes.core.tools import Tool, ToolParameter, Toolset, ToolsetCommandPrerequisite, ToolsetTag
from markdownify import markdownify
from playwright.sync_api import Error as PlaywrightError
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
Expand Down Expand Up @@ -167,5 +167,5 @@ def __init__(self):
),
],
tools=[FetchWebpage()],
tags=["core",]
tags=[ToolsetTag.CORE]
)
Loading
Loading