Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka tool #229

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f113b54
feat: add kafka toolset (wip)
nherment Dec 12, 2024
b5b45a9
feat: add kafka toolset (wip)
nherment Dec 13, 2024
67d8906
Merge remote-tracking branch 'origin/master' into add_kafka_tool
nherment Dec 13, 2024
d7c331e
feat: add kafka toolset
nherment Dec 13, 2024
7cce0be
feat: remove unused kafka test
nherment Dec 13, 2024
75de978
feat: kafka
nherment Dec 13, 2024
d2e2603
feat: kafka + fix ask_holmes test 21
nherment Dec 13, 2024
fc72c84
updated lockfiles
Avi-Robusta Dec 30, 2024
6925898
Merge branch 'master' into add_kafka_tool
Avi-Robusta Dec 30, 2024
6abcc9a
rebase fix
Avi-Robusta Dec 30, 2024
b8fab59
Merge branch 'master' into add_kafka_tool
nherment Jan 16, 2025
f19c9c8
Merge branch 'master' into add_kafka_tool
nherment Jan 20, 2025
34be2d2
Merge branch 'master' into add_kafka_tool
nherment Feb 7, 2025
5e6829e
chore: ruff
nherment Feb 7, 2025
4c0e0f5
feat: align kafka toolset config + readme
nherment Feb 7, 2025
865aaec
feat: align kafka toolset config + readme
nherment Feb 7, 2025
9004bbf
Merge branch 'master' into add_kafka_tool
nherment Feb 10, 2025
1e47de9
Merge branch 'master' into add_kafka_tool
nherment Feb 12, 2025
24ce312
fix: properly handle list consumer groups errors
nherment Feb 13, 2025
7e8adb4
fix: kafka toolset improve error handling, update tests
nherment Feb 13, 2025
8fe7784
chore: ruff
nherment Feb 13, 2025
afbc93e
Merge branch 'master' into add_kafka_tool
nherment Feb 13, 2025
57578a7
chore: removefiles unrelated to Kafka PR
nherment Feb 13, 2025
be290ea
feat: add link to robusta docs for kafka toolset
nherment Feb 13, 2025
4a3534d
Merge branch 'master' into add_kafka_tool
nherment Feb 19, 2025
35cff73
Merge branch 'master' into add_kafka_tool
nherment Feb 25, 2025
74e978a
fix: simplify waiting for docker up in kafka integration test
nherment Feb 27, 2025
283c1c7
Merge branch 'master' into add_kafka_tool
nherment Feb 27, 2025
96c9e38
Merge branch 'master' into add_kafka_tool
nherment Feb 27, 2025
6db9bf3
fix: update toolset tools to override _invoke() instead of invoke()
nherment Feb 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: add kafka toolset
nherment committed Dec 13, 2024
commit d7c331efcc2a6799be32c54d2b164d2e9ea02e7d
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -713,6 +713,20 @@ Configure Slack to send notifications to specific channels. Provide your Slack t

</details>

<summary>Kafka Integration</summary>

Enable Kafka as a tool for Holmes to fetch kafka metadata like the topics or consumer groups.

```bash
export KAFKA_BROKERS="localhost:9092" # Required. comma separated list
export KAFKA_SECURITY_PROTOCOL = "PLAINTEXT" # optional
export KAFKA_SASL_MECHANISM= "..." # optional
export KAFKA_USERNAME = "..." # optional
export KAFKA_PASSWORD = "..." # optional
export KAFKA_CLIENT_ID = "..." # optional
```

</details>
<details>

<summary>Custom Runbooks</summary>
55 changes: 40 additions & 15 deletions holmes/config.py
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@

from holmes.core.runbooks import RunbookManager
from holmes.core.supabase_dal import SupabaseDal
from holmes.core.tool_calling_llm import (IssueInvestigator,
from holmes.core.tool_calling_llm import (IssueInvestigator,
ToolCallingLLM,
ToolExecutor)
from holmes.core.tools import ToolsetPattern, get_matching_toolsets, ToolsetStatusEnum, ToolsetTag
@@ -27,6 +27,7 @@
from holmes.plugins.sources.prometheus.plugin import AlertManagerSource
from holmes.plugins.toolsets import (load_builtin_toolsets,
load_toolsets_from_file)
from holmes.plugins.toolsets.kafka import KafkaConfig
from holmes.utils.pydantic_utils import RobustaBaseConfig, load_model_from_file
from holmes.utils.definitions import CUSTOM_TOOLSET_LOCATION
from pydantic import ValidationError
@@ -75,9 +76,16 @@ class Config(RobustaBaseConfig):
opsgenie_team_integration_key: Optional[SecretStr] = None
opsgenie_query: Optional[str] = None

kafka_brokers: Optional[str] = None # comma separated values
kafka_security_protocol: Optional[str] = None
kafka_sasl_mechanism: Optional[str] = None
kafka_username: Optional[str] = None
kafka_password: Optional[str] = None
kafka_client_id: Optional[str] = None

custom_runbooks: List[FilePath] = []
custom_toolsets: List[FilePath] = []

enabled_toolsets_names: List[str] = Field(default_factory=list)

@classmethod
@@ -101,6 +109,12 @@ def load_from_env(cls):
"github_repository",
"github_pat",
"github_query",
"kafka_brokers",
"kafka_security_protocol",
"kafka_sasl_mechanism",
"kafka_username",
"kafka_password",
"kafka_client_id"
# TODO
# custom_runbooks
]:
@@ -109,7 +123,7 @@ def load_from_env(cls):
kwargs[field_name] = val
kwargs["cluster_name"] = Config.__get_cluster_name()
return cls(**kwargs)

@staticmethod
def __get_cluster_name() -> Optional[str]:
config_file_path = ROBUSTA_CONFIG_PATH
@@ -129,21 +143,32 @@ def __get_cluster_name() -> Optional[str]:

return None

def get_kafka_config(self) -> Optional[KafkaConfig]:
if self.kafka_brokers:
return KafkaConfig(
brokers = self.kafka_brokers.split(","),
security_protocol = self.kafka_security_protocol,
sasl_mechanism = self.kafka_sasl_mechanism,
username = self.kafka_username,
password = self.kafka_password,
client_id = self.kafka_client_id
)

def create_console_tool_executor(
self, console: Console, allowed_toolsets: ToolsetPattern, dal:Optional[SupabaseDal]
) -> ToolExecutor:
"""
Creates ToolExecutor for the cli
Creates ToolExecutor for the cli
"""
default_toolsets = [toolset for toolset in load_builtin_toolsets(dal) if any(tag in (ToolsetTag.CORE, ToolsetTag.CLI) for tag in toolset.tags)]
default_toolsets = [toolset for toolset in load_builtin_toolsets(dal=dal, kafka_config=self.get_kafka_config()) if any(tag in (ToolsetTag.CORE, ToolsetTag.CLI) for tag in toolset.tags)]

if allowed_toolsets == "*":
matching_toolsets = default_toolsets
else:
matching_toolsets = get_matching_toolsets(
default_toolsets, allowed_toolsets.split(",")
)
)

# Enable all matching toolsets that have CORE or CLI tag
for toolset in matching_toolsets:
toolset.enabled = True
@@ -155,7 +180,7 @@ def create_console_tool_executor(
toolsets_loaded_from_config,
matched_default_toolsets_by_name,
)

for toolset in filtered_toolsets_by_name.values():
if toolset.enabled:
toolset.check_prerequisites()
@@ -169,11 +194,11 @@ def create_console_tool_executor(
logging.info(f"Disabled toolset: {ts.name} from {ts.get_path()})")
elif ts.get_status() == ToolsetStatusEnum.ERROR:
logging.info(f"Failed loading toolset {ts.name} from {ts.get_path()}: ({ts.get_error()})")

for ts in default_toolsets:
if ts.name not in filtered_toolsets_by_name.keys():
logging.debug(f"Toolset {ts.name} from {ts.get_path()} was filtered out due to allowed_toolsets value")

enabled_tools = concat(*[ts.tools for ts in enabled_toolsets])
logging.debug(
f"Starting AI session with tools: {[t.name for t in enabled_tools]}"
@@ -184,10 +209,10 @@ def create_tool_executor(
self, console: Console, dal:Optional[SupabaseDal]
) -> ToolExecutor:
"""
Creates ToolExecutor for the server endpoints
Creates ToolExecutor for the server endpoints
"""

all_toolsets = load_builtin_toolsets(dal=dal)
all_toolsets = load_builtin_toolsets(dal=dal, kafka_config=self.get_kafka_config())

if os.path.isfile(CUSTOM_TOOLSET_LOCATION):
try:
@@ -201,7 +226,7 @@ def create_tool_executor(
f"Starting AI session with tools: {[t.name for t in enabled_tools]}"
)
return ToolExecutor(enabled_toolsets)

def create_console_toolcalling_llm(
self, console: Console, allowed_toolsets: ToolsetPattern, dal:Optional[SupabaseDal] = None
) -> ToolCallingLLM:
@@ -239,7 +264,7 @@ def create_issue_investigator(
self.max_steps,
self._get_llm()
)

def create_console_issue_investigator(
self,
console: Console,
13 changes: 8 additions & 5 deletions holmes/plugins/toolsets/__init__.py
Original file line number Diff line number Diff line change
@@ -14,6 +14,8 @@
from typing import Optional
import yaml

from holmes.plugins.toolsets.kafka import KafkaConfig, KafkaToolset

THIS_DIR = os.path.abspath(os.path.dirname(__file__))


@@ -39,12 +41,13 @@ def load_toolsets_from_file(path: str, silent_fail: bool = False) -> List[YAMLTo
return file_toolsets


def load_python_toolsets(dal:Optional[SupabaseDal]) -> List[Toolset]:
def load_python_toolsets(dal:Optional[SupabaseDal], kafka_config: Optional[KafkaConfig] = None) -> List[Toolset]:
logging.debug("loading python toolsets")
return [InternetToolset(), FindingsToolset(dal)]

return [InternetToolset(), FindingsToolset(dal), KafkaToolset(kafka_config)]

def load_builtin_toolsets(dal:Optional[SupabaseDal] = None) -> List[Toolset]:
# TODO: separate the Config logic from the Config structure so that config can be passed directly to this func
# It will simplify the Config logic and the code here to avoid passing specific configs to toolsets
def load_builtin_toolsets(dal:Optional[SupabaseDal] = None, kafka_config: Optional[KafkaConfig] = None) -> List[Toolset]:
all_toolsets = []
logging.debug(f"loading toolsets from {THIS_DIR}")
for filename in os.listdir(THIS_DIR):
@@ -53,5 +56,5 @@ def load_builtin_toolsets(dal:Optional[SupabaseDal] = None) -> List[Toolset]:
path = os.path.join(THIS_DIR, filename)
all_toolsets.extend(load_toolsets_from_file(path))

all_toolsets.extend(load_python_toolsets(dal))
all_toolsets.extend(load_python_toolsets(dal, kafka_config))
return all_toolsets
6 changes: 3 additions & 3 deletions holmes/plugins/toolsets/findings.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
from typing import Optional
from typing_extensions import Dict
from holmes.core.supabase_dal import SupabaseDal
from holmes.core.tools import StaticPrerequisite, Tool, ToolParameter, Toolset
from holmes.core.tools import StaticPrerequisite, Tool, ToolParameter, Toolset, ToolsetTag

PARAM_FINDING_ID = "id"

@@ -52,7 +52,7 @@ def invoke(self, params: Dict) -> str:
return f"There was an internal error while fetching finding {finding_id}"

def get_parameterized_one_liner(self, params:Dict) -> str:
return f"Fetch metadata and history"
return "Fetch metadata and history"


class FindingsToolset(Toolset):
@@ -72,5 +72,5 @@ def __init__(self, dal: Optional[SupabaseDal]):
name="robusta",
prerequisites=[dal_prereq],
tools=[FetchRobustaFinding(dal)],
tags=["core",]
tags=[ToolsetTag.CORE]
)
4 changes: 2 additions & 2 deletions holmes/plugins/toolsets/internet.py
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
import logging

from typing import Any
from holmes.core.tools import Tool, ToolParameter, Toolset, ToolsetCommandPrerequisite
from holmes.core.tools import Tool, ToolParameter, Toolset, ToolsetCommandPrerequisite, ToolsetTag
from markdownify import markdownify
from playwright.sync_api import Error as PlaywrightError
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
@@ -167,5 +167,5 @@ def __init__(self):
),
],
tools=[FetchWebpage()],
tags=["core",]
tags=[ToolsetTag.CORE]
)
88 changes: 37 additions & 51 deletions holmes/plugins/toolsets/kafka.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@

import logging
from pydantic import BaseModel
import yaml
from typing import Any, Dict, List, Optional
from holmes.core.tools import StaticPrerequisite, Tool, ToolParameter, Toolset
from confluent_kafka.admin import AdminClient, BrokerMetadata, ClusterMetadata, ConfigResource, GroupMember, GroupMetadata, ListConsumerGroupsResult, MemberAssignment, MemberDescription, NewTopic, ConsumerGroupDescription, PartitionMetadata, TopicMetadata
from holmes.core.tools import StaticPrerequisite, Tool, ToolParameter, Toolset, ToolsetTag
from confluent_kafka.admin import AdminClient, BrokerMetadata, ClusterMetadata, ConfigResource, GroupMember, GroupMetadata, ListConsumerGroupsResult, MemberAssignment, MemberDescription, ConsumerGroupDescription, PartitionMetadata, TopicMetadata
from confluent_kafka import KafkaException


def convert_to_dict(obj:Any):
if isinstance(obj, (ClusterMetadata, BrokerMetadata, TopicMetadata,
PartitionMetadata, GroupMember, GroupMetadata,
@@ -24,36 +24,13 @@ def convert_to_dict(obj:Any):
return result
return obj


class KafkaAuthConfig:
def __init__(
self,
security_protocol: str = "SASL_SSL",
sasl_mechanism: str = "PLAIN",
username: str = None,
password: str = None,
ssl_cafile: str = None
):
self.security_protocol = security_protocol
self.sasl_mechanism = sasl_mechanism
self.username = username
self.password = password
self.ssl_cafile = ssl_cafile

def get_config(self) -> Dict:
config = {
'security.protocol': self.security_protocol,
'sasl.mechanism': self.sasl_mechanism,
}

if self.username and self.password:
config['sasl.username'] = self.username
config['sasl.password'] = self.password

if self.ssl_cafile:
config['ssl.ca.location'] = self.ssl_cafile

return config
class KafkaConfig(BaseModel):
brokers: List[str]
security_protocol: Optional[str] = None
sasl_mechanism: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
client_id: Optional[str] = None

class ListKafkaConsumers(Tool):
def __init__(self, admin_client: AdminClient):
@@ -146,7 +123,7 @@ def __init__(self, admin_client: AdminClient):
),
"fetch_configuration": ToolParameter(
description="If true, also fetches the topic configuration. defaults to false",
type="bool",
type="boolean",
required=False,
)
},
@@ -237,24 +214,31 @@ def get_parameterized_one_liner(self, params: Dict) -> str:
class KafkaToolset(Toolset):
def __init__(
self,
bootstrap_servers: str,
auth_config: Optional[KafkaAuthConfig] = None
config: Optional[KafkaConfig] = None
):
try:
admin_config = {
'bootstrap.servers': bootstrap_servers,
'client.id': 'kafka_tools_admin'
}

if auth_config:
admin_config.update(auth_config.get_config())

admin_client = AdminClient(admin_config)

kafka_prereq = StaticPrerequisite(
enabled=True,
disabled_reason="Kafka admin client is available"
)
if config:
admin_config = {
'bootstrap.servers': config.brokers,
'client.id': config.client_id or "holmes-kafka-core-toolset",
'security.protocol': config.security_protocol,
'sasl.mechanisms': config.sasl_mechanism,
'sasl.username': config.username,
'sasl.password': config.password
}

admin_client = AdminClient(admin_config)

kafka_prereq = StaticPrerequisite(
enabled=True,
disabled_reason="Kafka admin client is available"
)
else:
admin_client = None
kafka_prereq = StaticPrerequisite(
enabled=False,
disabled_reason=f"Kafka client not configured"
)
except Exception as e:
admin_client = None
kafka_prereq = StaticPrerequisite(
@@ -264,7 +248,10 @@ def __init__(

super().__init__(
name="kafka_tools",
description="Fetches metadata from Kafka",
prerequisites=[kafka_prereq],
icon_url="https://en.wikipedia.org/wiki/Apache_Kafka#/media/File:Apache_Kafka_logo.svg",
tags=[ToolsetTag.CORE],
tools=[
ListKafkaConsumers(admin_client),
DescribeConsumerGroup(admin_client),
@@ -273,4 +260,3 @@ def __init__(
FindConsumerGroupsByTopic(admin_client),
],
)
self.check_prerequisites()
Loading