From e6714719d5f9bd0b9d60cdb8659040f51f7eea48 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Fri, 13 Sep 2024 20:57:28 -0400 Subject: [PATCH 1/6] add generic snowplow tracker with file logger for testing --- dbt_common/events/cookie.py | 32 +++++++ dbt_common/events/event_manager.py | 9 ++ dbt_common/events/event_manager_client.py | 6 ++ dbt_common/events/functions.py | 111 +++++++++++++++++++--- dbt_common/events/tracker.py | 71 ++++++++++++++ dbt_common/events/types.proto | 9 ++ dbt_common/events/types.py | 12 +++ dbt_common/events/user.py | 37 ++++++++ 8 files changed, 275 insertions(+), 12 deletions(-) create mode 100644 dbt_common/events/cookie.py create mode 100644 dbt_common/events/tracker.py create mode 100644 dbt_common/events/user.py diff --git a/dbt_common/events/cookie.py b/dbt_common/events/cookie.py new file mode 100644 index 00000000..fb659c47 --- /dev/null +++ b/dbt_common/events/cookie.py @@ -0,0 +1,32 @@ +from pathlib import Path +import uuid +from typing import Any, Dict + +import yaml + +# the C version is faster, but it doesn't always exist +try: + from yaml import CSafeLoader as SafeLoader +except ImportError: + from yaml import SafeLoader + + +class Cookie: + def __init__(self, directory: Path) -> None: + self.id: str = str(uuid.uuid4()) + self.path: Path = directory / ".user.yml" + self.save() + + def as_dict(self) -> Dict[str, Any]: + return {"id": self.id} + + def save(self) -> None: + with open(self.path, "w") as fh: + yaml.dump(self.as_dict(), fh) + + def load(self) -> Dict[str, Any]: + with open(self.path, "r") as fh: + try: + return yaml.load(fh, Loader=SafeLoader) + except yaml.reader.ReaderError: + return {} diff --git a/dbt_common/events/event_manager.py b/dbt_common/events/event_manager.py index 507588f3..e647c9e5 100644 --- a/dbt_common/events/event_manager.py +++ b/dbt_common/events/event_manager.py @@ -4,11 +4,13 @@ from dbt_common.events.base_types import BaseEvent, EventLevel, msg_from_base_event, TCallback from dbt_common.events.logger import LoggerConfig, _Logger, _TextLogger, _JsonLogger, LineFormat +from dbt_common.events.tracker import TrackerConfig, _Tracker class EventManager: def __init__(self) -> None: self.loggers: List[_Logger] = [] + self.trackers: List[_Tracker] = [] self.callbacks: List[TCallback] = [] def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: @@ -37,6 +39,9 @@ def add_logger(self, config: LoggerConfig) -> None: ) self.loggers.append(logger) + def add_tracker(self, config: TrackerConfig) -> None: + self.trackers.append(_Tracker(config)) + def add_callback(self, callback: TCallback) -> None: self.callbacks.append(callback) @@ -48,6 +53,7 @@ def flush(self) -> None: class IEventManager(Protocol): callbacks: List[TCallback] loggers: List[_Logger] + trackers: List[_Tracker] def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: ... @@ -55,6 +61,9 @@ def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: def add_logger(self, config: LoggerConfig) -> None: ... + def add_tracker(self, config: TrackerConfig) -> None: + ... + def add_callback(self, callback: TCallback) -> None: ... diff --git a/dbt_common/events/event_manager_client.py b/dbt_common/events/event_manager_client.py index 538d3199..c617d0fa 100644 --- a/dbt_common/events/event_manager_client.py +++ b/dbt_common/events/event_manager_client.py @@ -17,6 +17,11 @@ def add_logger_to_manager(logger) -> None: _EVENT_MANAGER.add_logger(logger) +def add_tracker_to_manager(tracker) -> None: + global _EVENT_MANAGER + _EVENT_MANAGER.add_tracker(tracker) + + def add_callback_to_manager(callback: TCallback) -> None: global _EVENT_MANAGER _EVENT_MANAGER.add_callback(callback) @@ -32,4 +37,5 @@ def cleanup_event_logger() -> None: # especially important for tests, since pytest replaces the stdout stream # during test runs, and closes the stream after the test is over. _EVENT_MANAGER.loggers.clear() + _EVENT_MANAGER.trackers.clear() _EVENT_MANAGER.callbacks.clear() diff --git a/dbt_common/events/functions.py b/dbt_common/events/functions.py index 4e055aa4..f7984dcf 100644 --- a/dbt_common/events/functions.py +++ b/dbt_common/events/functions.py @@ -1,20 +1,26 @@ -from pathlib import Path - -from dbt_common.events.event_manager_client import get_event_manager -from dbt_common.exceptions import EventCompilationError -from dbt_common.invocation import get_invocation_id -from dbt_common.helper_types import WarnErrorOptions -from dbt_common.utils.encoding import ForgivingJSONEncoder -from dbt_common.events.base_types import BaseEvent, EventLevel, EventMsg -from dbt_common.events.logger import LoggerConfig, LineFormat -from dbt_common.exceptions import scrub_secrets, env_secrets -from dbt_common.events.types import Note from functools import partial import json import os +from pathlib import Path import sys -from typing import Callable, Dict, Optional, TextIO, Union +from typing import Any, Callable, Dict, Optional, TextIO, Union + from google.protobuf.json_format import MessageToDict +from snowplow_tracker import Subject +from snowplow_tracker.typing import FailureCallback + +from dbt_common.helper_types import WarnErrorOptions +from dbt_common.invocation import get_invocation_id +from dbt_common.events.base_types import BaseEvent, EventLevel, EventMsg +from dbt_common.events.cookie import Cookie +from dbt_common.events.event_manager_client import get_event_manager +from dbt_common.events.logger import LoggerConfig, LineFormat +from dbt_common.events.tracker import TrackerConfig +from dbt_common.events.types import DisableTracking, Note +from dbt_common.events.user import User +from dbt_common.exceptions import EventCompilationError, scrub_secrets, env_secrets +from dbt_common.utils.encoding import ForgivingJSONEncoder + LOG_VERSION = 3 metadata_vars: Optional[Dict[str, str]] = None @@ -22,6 +28,7 @@ WARN_ERROR_OPTIONS = WarnErrorOptions(include=[], exclude=[]) WARN_ERROR = False + # This global, and the following two functions for capturing stdout logs are # an unpleasant hack we intend to remove as part of API-ification. The GitHub # issue #6350 was opened for that work. @@ -58,6 +65,24 @@ def get_stdout_config( ) +def get_logfile_config( + name: str, + log_path: str, + line_format: Optional[LineFormat] = LineFormat.PlainText, + use_colors: Optional[bool] = False, + log_file_max_bytes: Optional[int] = 10 * 1024 * 1024, +) -> LoggerConfig: + return LoggerConfig( + name=name, + line_format=line_format, + level=EventLevel.DEBUG, # File log is *always* debug level + use_colors=use_colors, + invocation_id=get_invocation_id(), + output_file_name=log_path, + output_file_max_bytes=log_file_max_bytes, + ) + + def make_log_dir_if_missing(log_path: Union[Path, str]) -> None: if isinstance(log_path, str): log_path = Path(log_path) @@ -153,3 +178,65 @@ def get_metadata_vars() -> Dict[str, str]: def reset_metadata_vars() -> None: global metadata_vars metadata_vars = None + + +def _default_on_failure(num_ok, unsent): + """ + num_ok will always be 0, unsent will always be 1 entry long + because the buffer is length 1, so not much to talk about + + TODO: add `disable_tracking` as a callback on `DisableTracking` + """ + fire_event(DisableTracking()) + + +def snowplow_config( + user: User, + endpoint: str, + protocol: Optional[str] = "https", + on_failure: Optional[FailureCallback] = _default_on_failure, +) -> TrackerConfig: + return TrackerConfig( + invocation_id=user.invocation_id, + endpoint=endpoint, + protocol=protocol, + on_failure=on_failure, + ) + + +def enable_tracking(tracker, user: User): + cookie = _get_cookie(user) + user.enable_tracking(cookie) + + subject = Subject() + subject.set_user_id(cookie.get("id")) + tracker.set_subject(subject) + + +def disable_tracking(tracker, user: User): + user.disable_tracking() + tracker.set_subject(None) + + +def _get_cookie(user: User) -> Dict[str, Any]: + if cookie := user.cookie: + return cookie + return _set_cookie(user) + + +def _set_cookie(user: User) -> Dict[str, Any]: + """ + If the user points dbt to a profile directory which exists AND + contains a profiles.yml file, then we can set a cookie. If the + specified folder does not exist, or if there is not a profiles.yml + file in this folder, then an inconsistent cookie can be used. This + will change in every dbt invocation until the user points to a + profile dir file which contains a valid profiles.yml file. + + See: https://github.com/dbt-labs/dbt-core/issues/1645 + """ + if user.profile.exists(): + cookie = Cookie(user.directory) + user.cookie = cookie.as_dict() + return user.cookie + return {} diff --git a/dbt_common/events/tracker.py b/dbt_common/events/tracker.py new file mode 100644 index 00000000..43b700aa --- /dev/null +++ b/dbt_common/events/tracker.py @@ -0,0 +1,71 @@ +from dataclasses import dataclass +import logging +from logging.handlers import RotatingFileHandler +from typing import Optional + +from snowplow_tracker import Emitter, Tracker +from snowplow_tracker.typing import FailureCallback + +from dbt_common.events.base_types import EventMsg + + +@dataclass +class TrackerConfig: + invocation_id: Optional[str] = None + endpoint: Optional[str] = None + protocol: Optional[str] = None + on_failure: Optional[FailureCallback] = None + name: Optional[str] = None + output_file_name: Optional[str] = None + output_file_max_bytes: Optional[int] = 10 * 1024 * 1024 # 10 mb + + +class _Tracker: + def __init__(self, config: TrackerConfig) -> None: + self.invocation_id: Optional[str] = config.invocation_id + + if all([config.name, config.output_file_name]): + file_handler = RotatingFileHandler( + filename=str(config.output_file_name), + encoding="utf8", + maxBytes=config.output_file_max_bytes, # type: ignore + backupCount=5, + ) + self._tracker = self._python_file_logger(config.name, file_handler) + + elif all([config.endpoint, config.protocol]): + self._tracker = self._snowplow_tracker(config.endpoint, config.protocol) + + def track(self, msg: EventMsg) -> str: + raise NotImplementedError() + + def _python_file_logger(self, name: str, handler: logging.Handler) -> logging.Logger: + log = logging.getLogger(name) + log.setLevel(logging.DEBUG) + handler.setFormatter(logging.Formatter(fmt="%(message)s")) + log.handlers.clear() + log.propagate = False + log.addHandler(handler) + return log + + def _snowplow_tracker( + self, + endpoint: str, + protocol: Optional[str] = "https", + on_failure: Optional[FailureCallback] = None, + ) -> Tracker: + emitter = Emitter( + endpoint, + protocol, + method="post", + batch_size=30, + on_failure=on_failure, + byte_limit=None, + request_timeout=5.0, + ) + tracker = Tracker( + emitters=emitter, + namespace="cf", + app_id="dbt", + ) + return tracker diff --git a/dbt_common/events/types.proto b/dbt_common/events/types.proto index d72d6b21..8352ed06 100644 --- a/dbt_common/events/types.proto +++ b/dbt_common/events/types.proto @@ -126,6 +126,15 @@ message FormattingMsg { Formatting data = 2; } +// Z039 +message DisableTracking { +} + +message DisableTrackingMsg { + CoreEventInfo info = 1; + DisableTracking data = 2; +} + // Z050 message Note { string msg = 1; diff --git a/dbt_common/events/types.py b/dbt_common/events/types.py index 02fc3ee2..16a55b52 100644 --- a/dbt_common/events/types.py +++ b/dbt_common/events/types.py @@ -151,6 +151,18 @@ def message(self) -> str: return self.msg +class DisableTracking(DebugLevel): + def code(self) -> str: + return "Z039" + + def message(self) -> str: + return ( + "Error sending anonymous usage statistics. Disabling tracking for this execution. " + "If you wish to permanently disable tracking, see: " + "https://docs.getdbt.com/reference/global-configs#send-anonymous-usage-stats." + ) + + class Note(InfoLevel): """Unstructured events. diff --git a/dbt_common/events/user.py b/dbt_common/events/user.py new file mode 100644 index 00000000..c7ee29a5 --- /dev/null +++ b/dbt_common/events/user.py @@ -0,0 +1,37 @@ +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional, Union + +import pytz + +from dbt_common.events.functions import get_invocation_id + + +class User: + def __init__(self, directory: Union[str, Path]) -> None: + self.cookie: Dict[str, Any] = {} + self.directory: Path = Path(directory) + self.invocation_id: str = get_invocation_id() + self.run_started_at: datetime = datetime.now(tz=pytz.utc) + + @property + def id(self) -> Optional[str]: + if self.cookie: + return self.cookie.get("id") + + @property + def do_not_track(self) -> bool: + return self.cookie != {} + + def state(self): + return "do not track" if self.do_not_track else "tracking" + + @property + def profile(self) -> Path: + return Path(self.directory) / "profiles.yml" + + def enable_tracking(self, cookie: Dict[str, Any]): + self.cookie = cookie + + def disable_tracking(self): + self.cookie = {} From b3494cc00666242aaaa00f7ff582c8b95122c99c Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Mon, 16 Sep 2024 13:32:08 -0400 Subject: [PATCH 2/6] add generic snowplow tracker with file logger for testing --- dbt_common/events/base_types.py | 16 ++++ dbt_common/events/functions.py | 88 +++++++++++++++------- dbt_common/events/tracker.py | 126 ++++++++++++++++++++++---------- tests/unit/test_tracker.py | 0 4 files changed, 164 insertions(+), 66 deletions(-) create mode 100644 tests/unit/test_tracker.py diff --git a/dbt_common/events/base_types.py b/dbt_common/events/base_types.py index 78b03682..f5a30f1b 100644 --- a/dbt_common/events/base_types.py +++ b/dbt_common/events/base_types.py @@ -151,6 +151,22 @@ def msg_from_base_event(event: BaseEvent, level: Optional[EventLevel] = None): return new_event +def msg_to_dict(msg: EventMsg) -> dict: + msg_dict = MessageToDict( + msg, + preserving_proto_field_name=True, + including_default_value_fields=True, # type: ignore + ) + # We don't want an empty NodeInfo in output + if ( + "data" in msg_dict + and "node_info" in msg_dict["data"] + and msg_dict["data"]["node_info"]["node_name"] == "" + ): + del msg_dict["data"]["node_info"] + return msg_dict + + # DynamicLevel requires that the level be supplied on the # event construction call using the "info" function from functions.py class DynamicLevel(BaseEvent): diff --git a/dbt_common/events/functions.py b/dbt_common/events/functions.py index f7984dcf..27e86cdd 100644 --- a/dbt_common/events/functions.py +++ b/dbt_common/events/functions.py @@ -5,17 +5,20 @@ import sys from typing import Any, Callable, Dict, Optional, TextIO, Union -from google.protobuf.json_format import MessageToDict -from snowplow_tracker import Subject from snowplow_tracker.typing import FailureCallback from dbt_common.helper_types import WarnErrorOptions from dbt_common.invocation import get_invocation_id -from dbt_common.events.base_types import BaseEvent, EventLevel, EventMsg +from dbt_common.events.base_types import ( + BaseEvent, + EventLevel, + EventMsg, + msg_to_dict as _msg_to_dict, +) from dbt_common.events.cookie import Cookie from dbt_common.events.event_manager_client import get_event_manager from dbt_common.events.logger import LoggerConfig, LineFormat -from dbt_common.events.tracker import TrackerConfig +from dbt_common.events.tracker import FileTracker, SnowplowTracker, Tracker, TrackerConfig from dbt_common.events.types import DisableTracking, Note from dbt_common.events.user import User from dbt_common.exceptions import EventCompilationError, scrub_secrets, env_secrets @@ -117,26 +120,14 @@ def msg_to_json(msg: EventMsg) -> str: def msg_to_dict(msg: EventMsg) -> dict: - msg_dict = dict() try: - msg_dict = MessageToDict( - msg, - preserving_proto_field_name=True, - including_default_value_fields=True, # type: ignore - ) + return _msg_to_dict(msg) except Exception as exc: event_type = type(msg).__name__ fire_event( Note(msg=f"type {event_type} is not serializable. {str(exc)}"), level=EventLevel.WARN ) - # We don't want an empty NodeInfo in output - if ( - "data" in msg_dict - and "node_info" in msg_dict["data"] - and msg_dict["data"]["node_info"]["node_name"] == "" - ): - del msg_dict["data"]["node_info"] - return msg_dict + return {} def warn_or_error(event, node=None) -> None: @@ -190,32 +181,61 @@ def _default_on_failure(num_ok, unsent): fire_event(DisableTracking()) -def snowplow_config( +def tracker_factory( + user: User, + endpoint: Optional[str], + protocol: Optional[str] = "https", + on_failure: Optional[FailureCallback] = _default_on_failure, + name: Optional[str] = None, + output_file_name: Optional[str] = None, + output_file_max_bytes: Optional[int] = None, +) -> Tracker: + if all([user, endpoint]): + return snowplow_tracker(user, endpoint, protocol, on_failure) + elif all([user, name, output_file_name]): + return file_tracker(user, name, output_file_name, output_file_max_bytes) + raise Exception("Invalid tracking configuration.") + + +def snowplow_tracker( user: User, endpoint: str, protocol: Optional[str] = "https", on_failure: Optional[FailureCallback] = _default_on_failure, -) -> TrackerConfig: - return TrackerConfig( +) -> Tracker: + config = TrackerConfig( invocation_id=user.invocation_id, endpoint=endpoint, protocol=protocol, on_failure=on_failure, ) + return SnowplowTracker.from_config(config) -def enable_tracking(tracker, user: User): +def file_tracker( + user: User, + name: str, + output_file_name: str, + output_file_max_bytes: Optional[int], +) -> Tracker: + config = TrackerConfig( + invocation_id=user.invocation_id, + name=name, + output_file_name=output_file_name, + output_file_max_bytes=output_file_max_bytes, + ) + return FileTracker.from_config(config) + + +def enable_tracking(tracker: Tracker, user: User): cookie = _get_cookie(user) user.enable_tracking(cookie) - - subject = Subject() - subject.set_user_id(cookie.get("id")) - tracker.set_subject(subject) + tracker.enable_tracking(cookie) -def disable_tracking(tracker, user: User): +def disable_tracking(tracker: Tracker, user: User): user.disable_tracking() - tracker.set_subject(None) + tracker.disable_tracking() def _get_cookie(user: User) -> Dict[str, Any]: @@ -240,3 +260,15 @@ def _set_cookie(user: User) -> Dict[str, Any]: user.cookie = cookie.as_dict() return user.cookie return {} + + +def track(tracker: Tracker, user: User, msg: EventMsg) -> None: + if user.do_not_track: + return + + # fire_event(SendingEvent(kwargs=str(**msg_to_dict(msg)))) + try: + tracker.track(msg) + except Exception: + # fire_event(SendEventFailure()) + pass diff --git a/dbt_common/events/tracker.py b/dbt_common/events/tracker.py index 43b700aa..a99f3af8 100644 --- a/dbt_common/events/tracker.py +++ b/dbt_common/events/tracker.py @@ -1,71 +1,121 @@ from dataclasses import dataclass import logging from logging.handlers import RotatingFileHandler -from typing import Optional +from typing import Any, Dict, Optional, Protocol, Self -from snowplow_tracker import Emitter, Tracker +import snowplow_tracker from snowplow_tracker.typing import FailureCallback -from dbt_common.events.base_types import EventMsg +from dbt_common.events.base_types import EventMsg, msg_to_dict +from dbt_common.events.format import timestamp_to_datetime_string @dataclass class TrackerConfig: invocation_id: Optional[str] = None + msg_schemas: Optional[Dict[str, str]] = None endpoint: Optional[str] = None - protocol: Optional[str] = None + protocol: Optional[str] = "https" on_failure: Optional[FailureCallback] = None name: Optional[str] = None output_file_name: Optional[str] = None output_file_max_bytes: Optional[int] = 10 * 1024 * 1024 # 10 mb -class _Tracker: - def __init__(self, config: TrackerConfig) -> None: - self.invocation_id: Optional[str] = config.invocation_id +class Tracker(Protocol): + @classmethod + def from_config(cls, config: TrackerConfig) -> Self: + ... - if all([config.name, config.output_file_name]): - file_handler = RotatingFileHandler( - filename=str(config.output_file_name), - encoding="utf8", - maxBytes=config.output_file_max_bytes, # type: ignore - backupCount=5, - ) - self._tracker = self._python_file_logger(config.name, file_handler) + def track(self, msg: EventMsg) -> None: + ... - elif all([config.endpoint, config.protocol]): - self._tracker = self._snowplow_tracker(config.endpoint, config.protocol) + def enable_tracking(self, cookie: Dict[str, Any]) -> None: + ... - def track(self, msg: EventMsg) -> str: - raise NotImplementedError() + def disable_tracking(self) -> None: + ... - def _python_file_logger(self, name: str, handler: logging.Handler) -> logging.Logger: - log = logging.getLogger(name) - log.setLevel(logging.DEBUG) - handler.setFormatter(logging.Formatter(fmt="%(message)s")) - log.handlers.clear() - log.propagate = False - log.addHandler(handler) - return log - def _snowplow_tracker( +class FileTracker(Tracker): + def __init__(self, logger: logging.Logger, invocation_id: Optional[str]) -> None: + self.logger = logger + self.invocation_id = invocation_id + + @classmethod + def from_config(cls, config: TrackerConfig) -> Self: + file_handler = RotatingFileHandler( + filename=config.output_file_name, + maxBytes=config.output_file_max_bytes, # type: ignore + backupCount=5, + encoding="utf8", + ) + file_handler.setFormatter(logging.Formatter(fmt="%(message)s")) + + logger = logging.getLogger(config.name) + logger.setLevel(logging.DEBUG) + logger.handlers.clear() + logger.propagate = False + logger.addHandler(file_handler) + return cls(logger, config.invocation_id) + + def track(self, msg: EventMsg) -> None: + ts: str = timestamp_to_datetime_string(msg.info.ts) + log_line = f"{ts} | {msg.info.msg}" + self.logger.debug(log_line) + + def enable_tracking(self, cookie: Dict[str, Any]) -> None: + pass + + def disable_tracking(self) -> None: + pass + + +class SnowplowTracker(Tracker): + def __init__( self, - endpoint: str, - protocol: Optional[str] = "https", - on_failure: Optional[FailureCallback] = None, - ) -> Tracker: - emitter = Emitter( - endpoint, - protocol, + tracker: snowplow_tracker.Tracker, + msg_schemas: Dict[str, str], + invocation_id: Optional[str], + ) -> None: + self.tracker = tracker + self.msg_schemas = msg_schemas + self.invocation_id = invocation_id + + @classmethod + def from_config(cls, config: TrackerConfig) -> Self: + emitter = snowplow_tracker.Emitter( + config.endpoint, + config.protocol, method="post", batch_size=30, - on_failure=on_failure, + on_failure=config.on_failure, byte_limit=None, request_timeout=5.0, ) - tracker = Tracker( + tracker = snowplow_tracker.Tracker( emitters=emitter, namespace="cf", app_id="dbt", ) - return tracker + return cls(tracker, config.msg_schemas, config.invocation_id) + + def track(self, msg: EventMsg) -> None: + data = msg_to_dict(msg) + schema = self.msg_schemas.get(msg.info.name) + context = [snowplow_tracker.SelfDescribingJson(schema, data)] + event = snowplow_tracker.StructuredEvent( + category="dbt", + action=msg.info.name, + label=self.invocation_id, + context=context, + ) + self.tracker.track(event) + + def enable_tracking(self, cookie: Dict[str, Any]) -> None: + subject = snowplow_tracker.Subject() + subject.set_user_id(cookie.get("id")) + self.tracker.set_subject(subject) + + def disable_tracking(self) -> None: + self.tracker.set_subject(None) diff --git a/tests/unit/test_tracker.py b/tests/unit/test_tracker.py new file mode 100644 index 00000000..e69de29b From 1e5a7be42bf6282aa9019275648da90f688501b8 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Mon, 16 Sep 2024 13:34:51 -0400 Subject: [PATCH 3/6] add generic snowplow tracker with file logger for testing --- dbt_common/events/functions.py | 7 +++---- dbt_common/events/types.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/dbt_common/events/functions.py b/dbt_common/events/functions.py index 27e86cdd..0eec1f53 100644 --- a/dbt_common/events/functions.py +++ b/dbt_common/events/functions.py @@ -19,7 +19,7 @@ from dbt_common.events.event_manager_client import get_event_manager from dbt_common.events.logger import LoggerConfig, LineFormat from dbt_common.events.tracker import FileTracker, SnowplowTracker, Tracker, TrackerConfig -from dbt_common.events.types import DisableTracking, Note +from dbt_common.events.types import DisableTracking, Note, SendingEvent, SendEventFailure from dbt_common.events.user import User from dbt_common.exceptions import EventCompilationError, scrub_secrets, env_secrets from dbt_common.utils.encoding import ForgivingJSONEncoder @@ -266,9 +266,8 @@ def track(tracker: Tracker, user: User, msg: EventMsg) -> None: if user.do_not_track: return - # fire_event(SendingEvent(kwargs=str(**msg_to_dict(msg)))) + fire_event(SendingEvent(kwargs=str(**msg_to_dict(msg)))) try: tracker.track(msg) except Exception: - # fire_event(SendEventFailure()) - pass + fire_event(SendEventFailure()) diff --git a/dbt_common/events/types.py b/dbt_common/events/types.py index 16a55b52..48451bea 100644 --- a/dbt_common/events/types.py +++ b/dbt_common/events/types.py @@ -163,6 +163,22 @@ def message(self) -> str: ) +class SendingEvent(DebugLevel): + def code(self) -> str: + return "Z040" + + def message(self) -> str: + return f"Sending event: {self.kwargs}" + + +class SendEventFailure(DebugLevel): + def code(self) -> str: + return "Z041" + + def message(self) -> str: + return "An error was encountered while trying to send an event" + + class Note(InfoLevel): """Unstructured events. From 37f4209758cefd5a30c2cac2f3376d2e096b6c2d Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Mon, 16 Sep 2024 13:35:39 -0400 Subject: [PATCH 4/6] add generic snowplow tracker with file logger for testing --- dbt_common/events/types.proto | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/dbt_common/events/types.proto b/dbt_common/events/types.proto index 8352ed06..c4b572f1 100644 --- a/dbt_common/events/types.proto +++ b/dbt_common/events/types.proto @@ -135,6 +135,25 @@ message DisableTrackingMsg { DisableTracking data = 2; } +// Z040 +message SendingEvent { + string kwargs = 1; +} + +message SendingEventMsg { + CoreEventInfo info = 1; + SendingEvent data = 2; +} + +// Z041 +message SendEventFailure { +} + +message SendEventFailureMsg { + CoreEventInfo info = 1; + SendEventFailure data = 2; +} + // Z050 message Note { string msg = 1; From 1b80338b1237696ee7307c27106090f6587a1aae Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Mon, 16 Sep 2024 13:38:46 -0400 Subject: [PATCH 5/6] add generic snowplow tracker with file logger for testing --- dbt_common/events/functions.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/dbt_common/events/functions.py b/dbt_common/events/functions.py index 0eec1f53..5cd04635 100644 --- a/dbt_common/events/functions.py +++ b/dbt_common/events/functions.py @@ -68,24 +68,6 @@ def get_stdout_config( ) -def get_logfile_config( - name: str, - log_path: str, - line_format: Optional[LineFormat] = LineFormat.PlainText, - use_colors: Optional[bool] = False, - log_file_max_bytes: Optional[int] = 10 * 1024 * 1024, -) -> LoggerConfig: - return LoggerConfig( - name=name, - line_format=line_format, - level=EventLevel.DEBUG, # File log is *always* debug level - use_colors=use_colors, - invocation_id=get_invocation_id(), - output_file_name=log_path, - output_file_max_bytes=log_file_max_bytes, - ) - - def make_log_dir_if_missing(log_path: Union[Path, str]) -> None: if isinstance(log_path, str): log_path = Path(log_path) From 1b315241bb7589f2742300dcee45dd580f7a7461 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Mon, 16 Sep 2024 14:05:10 -0400 Subject: [PATCH 6/6] add generic snowplow tracker with file logger for testing --- dbt_common/events/event_manager.py | 17 ++++++-- dbt_common/events/functions.py | 62 +++--------------------------- 2 files changed, 19 insertions(+), 60 deletions(-) diff --git a/dbt_common/events/event_manager.py b/dbt_common/events/event_manager.py index e647c9e5..b2eb5b72 100644 --- a/dbt_common/events/event_manager.py +++ b/dbt_common/events/event_manager.py @@ -3,15 +3,18 @@ from typing import List, Optional, Protocol, Tuple from dbt_common.events.base_types import BaseEvent, EventLevel, msg_from_base_event, TCallback +from dbt_common.events.functions import track, tracker_factory from dbt_common.events.logger import LoggerConfig, _Logger, _TextLogger, _JsonLogger, LineFormat -from dbt_common.events.tracker import TrackerConfig, _Tracker +from dbt_common.events.tracker import TrackerConfig, Tracker +from dbt_common.events.user import User class EventManager: def __init__(self) -> None: self.loggers: List[_Logger] = [] - self.trackers: List[_Tracker] = [] + self.trackers: List[Tracker] = [] self.callbacks: List[TCallback] = [] + self.user: Optional[User] = None def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: msg = msg_from_base_event(e, level=level) @@ -30,6 +33,9 @@ def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: if logger.filter(msg): # type: ignore logger.write_line(msg) + for tracker in self.trackers: + track(tracker, self.user, msg) + for callback in self.callbacks: callback(msg) @@ -40,11 +46,14 @@ def add_logger(self, config: LoggerConfig) -> None: self.loggers.append(logger) def add_tracker(self, config: TrackerConfig) -> None: - self.trackers.append(_Tracker(config)) + self.trackers.append(tracker_factory(config)) def add_callback(self, callback: TCallback) -> None: self.callbacks.append(callback) + def add_user(self, user: User) -> None: + self.user = user + def flush(self) -> None: for logger in self.loggers: logger.flush() @@ -53,7 +62,7 @@ def flush(self) -> None: class IEventManager(Protocol): callbacks: List[TCallback] loggers: List[_Logger] - trackers: List[_Tracker] + trackers: List[Tracker] def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: ... diff --git a/dbt_common/events/functions.py b/dbt_common/events/functions.py index 5cd04635..5126fd6f 100644 --- a/dbt_common/events/functions.py +++ b/dbt_common/events/functions.py @@ -5,8 +5,6 @@ import sys from typing import Any, Callable, Dict, Optional, TextIO, Union -from snowplow_tracker.typing import FailureCallback - from dbt_common.helper_types import WarnErrorOptions from dbt_common.invocation import get_invocation_id from dbt_common.events.base_types import ( @@ -19,7 +17,7 @@ from dbt_common.events.event_manager_client import get_event_manager from dbt_common.events.logger import LoggerConfig, LineFormat from dbt_common.events.tracker import FileTracker, SnowplowTracker, Tracker, TrackerConfig -from dbt_common.events.types import DisableTracking, Note, SendingEvent, SendEventFailure +from dbt_common.events.types import Note, SendingEvent, SendEventFailure from dbt_common.events.user import User from dbt_common.exceptions import EventCompilationError, scrub_secrets, env_secrets from dbt_common.utils.encoding import ForgivingJSONEncoder @@ -153,62 +151,14 @@ def reset_metadata_vars() -> None: metadata_vars = None -def _default_on_failure(num_ok, unsent): - """ - num_ok will always be 0, unsent will always be 1 entry long - because the buffer is length 1, so not much to talk about - - TODO: add `disable_tracking` as a callback on `DisableTracking` - """ - fire_event(DisableTracking()) - - -def tracker_factory( - user: User, - endpoint: Optional[str], - protocol: Optional[str] = "https", - on_failure: Optional[FailureCallback] = _default_on_failure, - name: Optional[str] = None, - output_file_name: Optional[str] = None, - output_file_max_bytes: Optional[int] = None, -) -> Tracker: - if all([user, endpoint]): - return snowplow_tracker(user, endpoint, protocol, on_failure) - elif all([user, name, output_file_name]): - return file_tracker(user, name, output_file_name, output_file_max_bytes) +def tracker_factory(config: TrackerConfig) -> Tracker: + if all([config.invocation_id, config.endpoint, config.msg_schemas]): + return SnowplowTracker.from_config(config) + elif all([config.invocation_id, config.name, config.output_file_name]): + return FileTracker.from_config(config) raise Exception("Invalid tracking configuration.") -def snowplow_tracker( - user: User, - endpoint: str, - protocol: Optional[str] = "https", - on_failure: Optional[FailureCallback] = _default_on_failure, -) -> Tracker: - config = TrackerConfig( - invocation_id=user.invocation_id, - endpoint=endpoint, - protocol=protocol, - on_failure=on_failure, - ) - return SnowplowTracker.from_config(config) - - -def file_tracker( - user: User, - name: str, - output_file_name: str, - output_file_max_bytes: Optional[int], -) -> Tracker: - config = TrackerConfig( - invocation_id=user.invocation_id, - name=name, - output_file_name=output_file_name, - output_file_max_bytes=output_file_max_bytes, - ) - return FileTracker.from_config(config) - - def enable_tracking(tracker: Tracker, user: User): cookie = _get_cookie(user) user.enable_tracking(cookie)