- 
                Notifications
    You must be signed in to change notification settings 
- Fork 1.5k
Create shared schemas collector for DBM integrations #21720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
6b80e5a
              96e5260
              04f8163
              4624b88
              a68f875
              aa0e0dd
              3c64896
              da84647
              9c2daa0
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Create shared schemas collector for the Postgres, MySQL, and SQL Server integrations | 
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
| @@ -0,0 +1,197 @@ | ||||
| # (C) Datadog, Inc. 2025-present | ||||
| # All rights reserved | ||||
| # Licensed under a 3-clause BSD style license (see LICENSE) | ||||
|  | ||||
| from __future__ import annotations | ||||
|  | ||||
| from abc import ABC, abstractmethod | ||||
| from typing import TYPE_CHECKING, TypedDict | ||||
|  | ||||
| from datadog_checks.base.utils.serialization import json | ||||
|  | ||||
| from .utils import now_ms | ||||
|  | ||||
| if TYPE_CHECKING: | ||||
| from datadog_checks.base.checks.db import DatabaseCheck | ||||
|  | ||||
| try: | ||||
| import datadog_agent # type: ignore | ||||
| except ImportError: | ||||
| from datadog_checks.base.stubs import datadog_agent | ||||
|  | ||||
|  | ||||
| class DatabaseInfo(TypedDict): | ||||
| name: str | ||||
|  | ||||
|  | ||||
| # The schema collector sends lists of DatabaseObjects to the agent | ||||
| # DBMS subclasses may add additional fields to the dictionary | ||||
| class DatabaseObject(TypedDict): | ||||
| name: str | ||||
|  | ||||
|  | ||||
| # Common configuration for schema collector | ||||
| # Individual DBMS implementations should map their specific | ||||
| # configuration to this type | ||||
| class SchemaCollectorConfig: | ||||
| def __init__(self): | ||||
| self.collection_interval = 3600 | ||||
| self.payload_chunk_size = 10_000 | ||||
|  | ||||
|  | ||||
| class SchemaCollector(ABC): | ||||
| """ | ||||
| Abstract base class for DBM schema collectors. | ||||
| Attributes: | ||||
| _collection_started_at (int): Timestamp in whole milliseconds | ||||
| when the current collection started. | ||||
| """ | ||||
|  | ||||
| _collection_started_at: int | None = None | ||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 
        Suggested change
       
 This should be able to be removed. We're using this on the class instance level and not the class level so this gets overwritten on  | ||||
|  | ||||
| def __init__(self, check: DatabaseCheck, config: SchemaCollectorConfig): | ||||
| self._check = check | ||||
| self._log = check.log | ||||
| self._config = config | ||||
| self._dbms = check.__class__.__name__.lower() | ||||
| if self._dbms == 'postgresql': | ||||
| # Backwards compatibility for metrics namespacing | ||||
| self._dbms = 'postgres' | ||||
| 
      Comment on lines
    
      +57
     to 
      +60
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can move this onto  | ||||
| self._reset() | ||||
|  | ||||
| def _reset(self): | ||||
| self._collection_started_at = None | ||||
| self._collection_payloads_count = 0 | ||||
| self._queued_rows = [] | ||||
| self._total_rows_count = 0 | ||||
|  | ||||
| def collect_schemas(self) -> bool: | ||||
| """ | ||||
| Collects and submits all applicable schema metadata to the agent. | ||||
| This class relies on the owning check to handle scheduling this method. | ||||
| This method will enforce non-overlapping invocations and | ||||
| returns False if the previous collection was still in progress when invoked again. | ||||
| """ | ||||
| if self._collection_started_at is not None: | ||||
| return False | ||||
| 
      Comment on lines
    
      +74
     to 
      +78
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's chat more about this one offline. I might be missing something, but I don't immediately see how this function is ever called in a non-blocking way, including in your reference PR where this would occur today | ||||
| status = "success" | ||||
| try: | ||||
| self._collection_started_at = now_ms() | ||||
| databases = self._get_databases() | ||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a debug log here and report the number of databases we'll be collecting | ||||
| for database in databases: | ||||
| database_name = database['name'] | ||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly here, we don't need to log the DB name to avoid customer data, but let's emit a debug log like "collecting schemas for database ". I think this will be helpful reference markers if needing to debug stalls/issues on large collections | ||||
| if not database_name: | ||||
| self._log.warning("database has no name %v", database) | ||||
| continue | ||||
| with self._get_cursor(database_name) as cursor: | ||||
| # Get the next row from the cursor | ||||
| next = self._get_next(cursor) | ||||
| while next: | ||||
| self._queued_rows.append(self._map_row(database, next)) | ||||
| self._total_rows_count += 1 | ||||
| # Because we're iterating over a cursor we need to try to get | ||||
| # the next row to see if we've reached the last row | ||||
| next = self._get_next(cursor) | ||||
| is_last_payload = database is databases[-1] and next is None | ||||
| self.maybe_flush(is_last_payload) | ||||
| except Exception as e: | ||||
| status = "error" | ||||
| self._log.error("Error collecting schema: %s", e) | ||||
| raise e | ||||
| finally: | ||||
| self._check.histogram( | ||||
| f"dd.{self._dbms}.schema.time", | ||||
| now_ms() - self._collection_started_at, | ||||
| tags=self._check.tags + ["status:" + status], | ||||
| hostname=self._check.reported_hostname, | ||||
| raw=True, | ||||
| ) | ||||
| self._check.gauge( | ||||
| f"dd.{self._dbms}.schema.tables_count", | ||||
| self._total_rows_count, | ||||
| tags=self._check.tags + ["status:" + status], | ||||
| hostname=self._check.reported_hostname, | ||||
| raw=True, | ||||
| ) | ||||
| self._check.gauge( | ||||
| f"dd.{self._dbms}.schema.payloads_count", | ||||
| self._collection_payloads_count, | ||||
| tags=self._check.tags + ["status:" + status], | ||||
| hostname=self._check.reported_hostname, | ||||
| raw=True, | ||||
| ) | ||||
|  | ||||
| self._reset() | ||||
| return True | ||||
|  | ||||
| @property | ||||
| def base_event(self): | ||||
| return { | ||||
| "host": self._check.reported_hostname, | ||||
| "database_instance": self._check.database_identifier, | ||||
| "kind": self.kind, | ||||
| "agent_version": datadog_agent.get_version(), | ||||
| "collection_interval": self._config.collection_interval, | ||||
| "dbms": self._dbms, | ||||
| "dbms_version": str(self._check.dbms_version), | ||||
| "tags": self._check.tags, | ||||
| "cloud_metadata": self._check.cloud_metadata, | ||||
| "collection_started_at": self._collection_started_at, | ||||
| } | ||||
|  | ||||
| def maybe_flush(self, is_last_payload): | ||||
| if is_last_payload or len(self._queued_rows) >= self._config.payload_chunk_size: | ||||
| event = self.base_event.copy() | ||||
| event["timestamp"] = now_ms() | ||||
| # DBM backend expects metadata to be an array of database objects | ||||
| event["metadata"] = self._queued_rows | ||||
| self._collection_payloads_count += 1 | ||||
| if is_last_payload: | ||||
| # For the last payload, we need to include the total number of payloads collected | ||||
| # This is used for snapshotting to ensure that all payloads have been received | ||||
| event["collection_payloads_count"] = self._collection_payloads_count | ||||
| self._check.database_monitoring_metadata(json.dumps(event)) | ||||
|  | ||||
| self._queued_rows = [] | ||||
|  | ||||
| @property | ||||
| @abstractmethod | ||||
| def kind(self) -> str: | ||||
| """ | ||||
| Returns the kind property of the schema metadata event. | ||||
| Subclasses should override this property to return the kind of schema being collected. | ||||
| """ | ||||
| raise NotImplementedError("Subclasses must implement kind") | ||||
|  | ||||
| def _get_databases(self) -> list[DatabaseInfo]: | ||||
| """ | ||||
| Returns a list of database dictionaries. | ||||
| Subclasses should override this method to return the list of databases to collect schema metadata for. | ||||
| """ | ||||
| raise NotImplementedError("Subclasses must implement _get_databases") | ||||
|  | ||||
| @abstractmethod | ||||
| def _get_cursor(self, database): | ||||
| """ | ||||
| Returns a cursor for the given database. | ||||
| Subclasses should override this method to return the cursor for the given database. | ||||
| """ | ||||
| raise NotImplementedError("Subclasses must implement _get_cursor") | ||||
|  | ||||
| @abstractmethod | ||||
| def _get_next(self, cursor): | ||||
| """ | ||||
| Returns the next row from the cursor. | ||||
| Subclasses should override this method to return the next row from the cursor. | ||||
| """ | ||||
| raise NotImplementedError("Subclasses must implement _get_next") | ||||
|  | ||||
| def _map_row(self, database: DatabaseInfo, _cursor_row) -> DatabaseObject: | ||||
| """ | ||||
| Maps a cursor row to a dict that matches the schema expected by DBM. | ||||
| The base implementation of this method returns just the database dictionary. | ||||
| Subclasses should override this method to add schema and table data based on the cursor row. | ||||
| """ | ||||
| return {**database} | ||||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| # (C) Datadog, Inc. 2023-present | ||
| # All rights reserved | ||
| # Licensed under a 3-clause BSD style license (see LICENSE) | ||
| from contextlib import contextmanager | ||
|  | ||
| import pytest | ||
|  | ||
| from datadog_checks.base.checks.db import DatabaseCheck | ||
| from datadog_checks.base.utils.db.schemas import SchemaCollector, SchemaCollectorConfig | ||
|  | ||
| try: | ||
| import datadog_agent # type: ignore | ||
| except ImportError: | ||
| from datadog_checks.base.stubs import datadog_agent | ||
|  | ||
|  | ||
| class TestDatabaseCheck(DatabaseCheck): | ||
| __test__ = False | ||
|  | ||
| def __init__(self): | ||
| super().__init__() | ||
| self._reported_hostname = "test_hostname" | ||
| self._database_identifier = "test_database_identifier" | ||
| self._dbms_version = "test_dbms_version" | ||
| self._agent_version = "test_agent_version" | ||
| self._tags = ["test_tag"] | ||
| self._cloud_metadata = {"test_cloud_metadata": "test_cloud_metadata"} | ||
|  | ||
| @property | ||
| def reported_hostname(self): | ||
| return self._reported_hostname | ||
|  | ||
| @property | ||
| def database_identifier(self): | ||
| return self._database_identifier | ||
|  | ||
| @property | ||
| def dbms_version(self): | ||
| return self._dbms_version | ||
|  | ||
| @property | ||
| def agent_version(self): | ||
| return self._agent_version | ||
|  | ||
| @property | ||
| def tags(self): | ||
| return self._tags | ||
|  | ||
| @property | ||
| def cloud_metadata(self): | ||
| return self._cloud_metadata | ||
|  | ||
|  | ||
| class TestSchemaCollector(SchemaCollector): | ||
| __test__ = False | ||
|  | ||
| def __init__(self, check: DatabaseCheck, config: SchemaCollectorConfig): | ||
| super().__init__(check, config) | ||
| self._row_index = 0 | ||
| self._rows = [{'table_name': 'test_table'}] | ||
|  | ||
| def _get_databases(self): | ||
| return [{'name': 'test_database'}] | ||
|  | ||
| @contextmanager | ||
| def _get_cursor(self, database: str): | ||
| yield {} | ||
|  | ||
| def _get_next(self, _cursor): | ||
| if self._row_index < len(self._rows): | ||
| row = self._rows[self._row_index] | ||
| self._row_index += 1 | ||
| return row | ||
| return None | ||
|  | ||
| def _map_row(self, database: str, cursor_row: dict): | ||
| return {**database, "tables": [cursor_row]} | ||
|  | ||
| @property | ||
| def kind(self): | ||
| return "test_databases" | ||
|  | ||
|  | ||
| @pytest.mark.unit | ||
| def test_schema_collector(aggregator): | ||
| check = TestDatabaseCheck() | ||
| collector = TestSchemaCollector(check, SchemaCollectorConfig()) | ||
| collector.collect_schemas() | ||
|  | ||
| events = aggregator.get_event_platform_events("dbm-metadata") | ||
| assert len(events) == 1 | ||
| event = events[0] | ||
| assert event['kind'] == collector.kind | ||
| assert event['host'] == check.reported_hostname | ||
| assert event['database_instance'] == check.database_identifier | ||
| assert event['agent_version'] == datadog_agent.get_version() | ||
| assert event['collection_interval'] == collector._config.collection_interval | ||
| assert event['dbms_version'] == check.dbms_version | ||
| assert event['tags'] == check.tags | ||
| assert event['cloud_metadata'] == check.cloud_metadata | ||
| assert event['metadata'][0]['name'] == 'test_database' | ||
| assert event['metadata'][0]['tables'][0]['table_name'] == 'test_table' | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -40,6 +40,8 @@ def cache_id(check: AgentCheck) -> str: | |
|  | ||
|  | ||
| class TestCheck(AgentCheck): | ||
| __test__ = False | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixes a warning in pytest | ||
|  | ||
| def check(self, instance): | ||
| pass | ||
|  | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative to raising NotImplementedError would be to decorate these as abstractmethods (
from abc import abstractmethod). The benefit being unimplemented abstract methods will throw errors immediately upon class instantiation if missing vs the current approach which will lazily throw the error at first property access. Marking it abstractmethod also should get picked up in IDE's and linters I believe.