From 327b658e946e9ed68c25d32720c84f1c9b6b1a93 Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Wed, 9 Oct 2024 06:39:56 +0000 Subject: [PATCH 1/9] Updated Hubspot VFS --- sources/hubspot/README.md | 2 + sources/hubspot/__init__.py | 519 +++++++++++++++++++++++++++--------- sources/hubspot/helpers.py | 44 ++- sources/hubspot/settings.py | 49 +++- sources/hubspot/utils.py | 7 + sources/hubspot_pipeline.py | 84 ++++-- 6 files changed, 534 insertions(+), 171 deletions(-) create mode 100644 sources/hubspot/utils.py diff --git a/sources/hubspot/README.md b/sources/hubspot/README.md index 357170ef3..17ca44e1d 100644 --- a/sources/hubspot/README.md +++ b/sources/hubspot/README.md @@ -13,6 +13,8 @@ The `dlt` HubSpot verified source allows you to automatically load data from Hub | Tickets | requests for help from customers or users | | Quotes | pricing information of a product | | Web analytics | events | +| Owners | information about account managers or users | +| Pipelines | stages and progress tracking | ## Initialize the pipeline with Hubspot verified source ```bash diff --git a/sources/hubspot/__init__.py b/sources/hubspot/__init__.py index 5e09fc7f1..09b46fbf3 100644 --- a/sources/hubspot/__init__.py +++ b/sources/hubspot/__init__.py @@ -1,5 +1,6 @@ """ -This is a module that provides a DLT source to retrieve data from multiple endpoints of the HubSpot API using a specified API key. The retrieved data is returned as a tuple of Dlt resources, one for each endpoint. +This is a module that provides a DLT source to retrieve data from multiple endpoints of the HubSpot API +using a specified API key. The retrieved data is returned as a tuple of Dlt resources, one for each endpoint. The source retrieves data from the following endpoints: - CRM Companies @@ -8,6 +9,8 @@ - CRM Tickets - CRM Products - CRM Quotes +- CRM Owners +- CRM Pipelines - Web Analytics Events For each endpoint, a resource and transformer function are defined to retrieve data and transform it to a common format. @@ -19,9 +22,6 @@ Example: To retrieve data from all endpoints, use the following code: -python - ->>> resources = hubspot(api_key="your_api_key") """ from typing import Any, Dict, Iterator, List, Literal, Sequence @@ -32,17 +32,25 @@ from dlt.common.typing import TDataItems from dlt.sources import DltResource -from .helpers import _get_property_names, fetch_data, fetch_property_history +from .helpers import ( + _get_property_names, + fetch_data, + fetch_property_history, + get_properties_labels, +) from .settings import ( ALL, + ALL_OBJECTS, + ARCHIVED_PARAM, CRM_OBJECT_ENDPOINTS, - DEFAULT_COMPANY_PROPS, - DEFAULT_CONTACT_PROPS, - DEFAULT_DEAL_PROPS, - DEFAULT_PRODUCT_PROPS, - DEFAULT_QUOTE_PROPS, - DEFAULT_TICKET_PROPS, + CRM_PIPELINES_ENDPOINT, + ENTITY_PROPERTIES, + MAX_PROPS_LENGTH, OBJECT_TYPE_PLURAL, + OBJECT_TYPE_SINGULAR, + PIPELINES_OBJECTS, + SOFT_DELETE_KEY, + STAGE_PROPERTY_PREFIX, STARTDATE, WEB_ANALYTICS_EVENTS_ENDPOINT, ) @@ -50,11 +58,290 @@ THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"] +def extract_properties_list(props): + """ + Flatten a list of property dictionaries to extract property names. + + Args: + props (List[Dict[str, Any]]): List of property dictionaries. + + Returns: + List[str]: List of property names. + """ + return [prop if isinstance(prop, str) else prop.get("name") for prop in props] + + + +def fetch_data_for_properties(props, api_key, object_type, soft_delete): + """ + Fetch data for a given set of properties from the HubSpot API. + + Args: + props (Sequence[str]): List of property names to fetch. + api_key (str): HubSpot API key for authentication. + object_type (str): The type of HubSpot object (e.g., 'company', 'contact'). + soft_delete (bool): Flag to fetch soft-deleted (archived) records. + + Yields: + Iterator[TDataItems]: Data retrieved from the HubSpot API. + """ + + params = {"properties": props, "limit": 100} + context = {SOFT_DELETE_KEY: False} if soft_delete else None + + yield from fetch_data( + CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params, context=context + ) + if soft_delete: + yield from fetch_data( + CRM_OBJECT_ENDPOINTS[object_type], + api_key, + params={**params, **ARCHIVED_PARAM}, + context={SOFT_DELETE_KEY: True}, + ) + + +def crm_objects( + object_type: str, + api_key: str = dlt.secrets.value, + props: Sequence[str] = None, + include_custom_props: bool = True, + archived: bool = False, +) -> Iterator[TDataItems]: + """ + Fetch CRM object data (e.g., companies, contacts) from the HubSpot API. + + Args: + object_type (str): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str, optional): API key for HubSpot authentication. + props (Sequence[str], optional): List of properties to retrieve. Defaults to None. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. + archived (bool, optional): Fetch archived (soft-deleted) objects. Defaults to False. + + Yields: + Iterator[TDataItems]: Data items retrieved from the API. + """ + props = fetch_props(object_type, api_key, props, include_custom_props) + yield from fetch_data_for_properties(props, api_key, object_type, archived) + + +def crm_object_history( + object_type: THubspotObjectType, + api_key: str = dlt.secrets.value, + include_custom_props: bool = True, +): + """ + Fetch the history of property changes for a given CRM object type. + + Args: + object_type (THubspotObjectType): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str, optional): API key for HubSpot authentication. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. + + Yields: + Iterator[TDataItems]: Historical property data. + """ + + # Fetch the properties from ENTITY_PROPERTIES or default to "All" + props = ENTITY_PROPERTIES.get(object_type, "All") + + # Fetch the properties with the option to include custom properties + props = fetch_props(object_type, api_key, props, include_custom_props) + + # Yield the property history + yield from fetch_property_history( + CRM_OBJECT_ENDPOINTS[object_type], + api_key, + props, + ) + +def resource_template( + entity: THubspotObjectType, + api_key: str = dlt.config.value, + props: Sequence[str] = None, # Add props as an argument + include_custom_props: bool = False, + soft_delete: bool = False, +): + """ + Template function to yield CRM resources for a specific HubSpot entity. + + Args: + entity (THubspotObjectType): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str, optional): HubSpot API key for authentication. + props (Sequence[str], optional): List of properties to retrieve. Defaults to None. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to False. + soft_delete (bool, optional): Fetch soft-deleted (archived) records. Defaults to False. + + Yields: + Iterator[TDataItems]: CRM object data retrieved from the API. + """ + + # Use provided props or fetch from ENTITY_PROPERTIES if not provided + properties = ENTITY_PROPERTIES.get(entity, props or []) + + # Use these properties to yield the crm_objects + yield from crm_objects( + entity, + api_key, + props=properties, # Pass the properties to the crm_objects function + include_custom_props=include_custom_props, + archived=soft_delete, + ) + + + +def resource_history_template( + entity: THubspotObjectType, + api_key: str = dlt.config.value, + include_custom_props: bool = False, +): + """ + Template function to yield historical CRM resource data for a specific HubSpot entity. + + Args: + entity (THubspotObjectType): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str, optional): HubSpot API key for authentication. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to False. + + Yields: + Iterator[TDataItems]: Historical data for the CRM object. + """ + yield from crm_object_history( + entity, api_key, include_custom_props=include_custom_props + ) + + +@dlt.resource(name="properties", write_disposition="replace") +def hubspot_properties( + properties_list: List[Dict[str, Any]] = None, + api_key: str = dlt.secrets.value, +) -> DltResource: + """ + A DLT resource that retrieves HubSpot properties for a given list of objects. + + Args: + properties_list (List[Dict[str, Any]], optional): List of properties to retrieve. + api_key (str, optional): HubSpot API key for authentication. + + Yields: + DltResource: A DLT resource containing properties for HubSpot objects. + """ + + def get_properties_description(properties_list): + """Fetch properties.""" + for property_info in properties_list: + yield get_properties_labels( + api_key=api_key, + object_type=property_info["object_type"], + property_name=property_info["property_name"], + ) + + # Ensure properties_list is defined + properties_list = properties_list or [] + yield from get_properties_description(properties_list) + + +def pivot_stages_properties(data, property_prefix=STAGE_PROPERTY_PREFIX, id_prop="id"): + """ + Transform the data by pivoting stage properties. + + Args: + data (List[Dict[str, Any]]): Data containing stage properties. + property_prefix (str, optional): Prefix for stage properties. Defaults to STAGE_PROPERTY_PREFIX. + id_prop (str, optional): Name of the ID property. Defaults to "id". + + Returns: + List[Dict[str, Any]]: Transformed data with pivoted stage properties. + """ + new_data = [] + for record in data: + record_not_null = {k: v for k, v in record.items() if v is not None} + if id_prop not in record_not_null: + continue + id_val = record_not_null.pop(id_prop) + new_data += [ + {id_prop: id_val, property_prefix: v, "stage": k.split(property_prefix)[1]} + for k, v in record_not_null.items() + if k.startswith(property_prefix) + ] + return new_data + + +def stages_timing( + object_type: str, + api_key: str = dlt.config.value, + soft_delete: bool = False, + limit: int = None +) -> Iterator[TDataItems]: + """ + Fetch stage timing data for a specific object type from the HubSpot API. + + Args: + object_type (str): Type of HubSpot object (e.g., 'deal', 'ticket'). + api_key (str, optional): HubSpot API key for authentication. + soft_delete (bool, optional): Fetch soft-deleted (archived) records. Defaults to False. + limit (int, optional): Limit the number of properties to fetch. Defaults to None. + + Yields: + Iterator[TDataItems]: Stage timing data. + """ + all_properties = list(_get_property_names(api_key, object_type)) + date_entered_properties = [ + prop for prop in all_properties if prop.startswith(STAGE_PROPERTY_PREFIX) + ] + props = ",".join(date_entered_properties) + idx = 0 + if limit is None: + limit = len(date_entered_properties) + while idx < limit: + if len(props) - idx < MAX_PROPS_LENGTH: + props_part = ",".join(props[idx: idx + MAX_PROPS_LENGTH].split(",")[:-1]) + else: + props_part = props[idx: idx + MAX_PROPS_LENGTH] + idx += len(props_part) + for data in fetch_data_for_properties( + props_part, api_key, object_type, soft_delete + ): + yield pivot_stages_properties(data) + + +def owners( + api_key: str, # Define api_key as a required argument + soft_delete: bool = False # Add soft_delete as a parameter +) -> Iterator[TDataItems]: + """ + Fetch HubSpot owners data. + + Args: + api_key (str): HubSpot API key for authentication. + soft_delete (bool, optional): Fetch soft-deleted (archived) owners. Defaults to False. + + Yields: + Iterator[TDataItems]: Owner data. + """ + + # Fetch data for owners + for page in fetch_data(endpoint=CRM_OBJECT_ENDPOINTS["owner"], api_key=api_key): + yield page + + # Fetch soft-deleted owners if requested + if soft_delete: + for page in fetch_data( + endpoint=CRM_OBJECT_ENDPOINTS["owner"], + params=ARCHIVED_PARAM, + api_key=api_key, + context={SOFT_DELETE_KEY: True}, + ): + yield page + + @dlt.source(name="hubspot") def hubspot( api_key: str = dlt.secrets.value, include_history: bool = False, + soft_delete: bool = False, include_custom_props: bool = True, + props: Sequence[str] = None, # Add props argument here ) -> Sequence[DltResource]: """ A DLT source that retrieves data from the HubSpot API using the @@ -72,6 +359,10 @@ def hubspot( include_history (Optional[bool]): Whether to load history of property changes along with entities. The history entries are loaded to separate tables. + soft_delete (bool): + Whether to fetch deleted properties and mark them as `is_deleted`. + include_custom_props (bool): + Whether to include custom properties. Returns: Sequence[DltResource]: Dlt resources, one for each HubSpot API endpoint. @@ -82,115 +373,109 @@ def hubspot( `api_key` argument. """ - @dlt.resource(name="companies", write_disposition="replace") - def companies( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_COMPANY_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot companies resource""" - yield from crm_objects( - "company", - api_key, - include_history=include_history, - props=props, - include_custom_props=include_custom_props, - ) + def hubspot_pipelines_for_objects( + api_key: str = dlt.secrets.value, + ) -> DltResource: + """ + A standalone DLT resources that retrieves properties. - @dlt.resource(name="contacts", write_disposition="replace") - def contacts( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_CONTACT_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot contacts resource""" - yield from crm_objects( - "contact", - api_key, - include_history, - props, - include_custom_props, - ) + Args: + object_type(List[THubspotObjectType], required): List of the hubspot object types see definition of THubspotObjectType literal. + api_key (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value. - @dlt.resource(name="deals", write_disposition="replace") - def deals( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_DEAL_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot deals resource""" - yield from crm_objects( - "deal", - api_key, - include_history, - props, - include_custom_props, - ) + Returns: + Incremental dlt resource to track properties for objects from the list + """ - @dlt.resource(name="tickets", write_disposition="replace") - def tickets( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_TICKET_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot tickets resource""" - yield from crm_objects( - "ticket", - api_key, - include_history, - props, - include_custom_props, - ) + def get_pipelines(object_type: THubspotObjectType): + yield from fetch_data( + CRM_PIPELINES_ENDPOINT.format(objectType=object_type), + api_key=api_key, + ) - @dlt.resource(name="products", write_disposition="replace") - def products( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_PRODUCT_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot products resource""" - yield from crm_objects( - "product", - api_key, - include_history, - props, - include_custom_props, + for obj_type in PIPELINES_OBJECTS: + name = f"pipelines_{obj_type}" + yield dlt.resource( + get_pipelines, + name=name, + write_disposition="merge", + merge_key="id", + table_name=name, + primary_key="id", + )(obj_type) + + name = f"stages_timing_{obj_type}" + if obj_type in OBJECT_TYPE_SINGULAR: + yield dlt.resource( + stages_timing, + name=name, + write_disposition="merge", + primary_key=["id", "stage"], + )(OBJECT_TYPE_SINGULAR[obj_type], soft_delete=soft_delete) + + yield dlt.resource( + owners, + name="owners", + write_disposition="merge", + primary_key="id", + )( + api_key=api_key, # Pass the API key here + soft_delete=soft_delete # Pass the soft_delete flag here + ) + + for obj in ALL_OBJECTS: + yield dlt.resource( + resource_template, + name=OBJECT_TYPE_PLURAL[obj], + write_disposition="merge", + primary_key="id", + )( + entity=obj, + props=props, # Pass the props argument here + include_custom_props=include_custom_props, + soft_delete=soft_delete, ) - @dlt.resource(name="quotes", write_disposition="replace") - def quotes( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_QUOTE_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot quotes resource""" - yield from crm_objects( - "quote", - api_key, - include_history, - props, - include_custom_props, - ) + if include_history: + for obj in ALL_OBJECTS: + yield dlt.resource( + resource_history_template, + name=f"{OBJECT_TYPE_PLURAL[obj]}_property_history", + write_disposition="merge", + primary_key="object_id", + )(entity=obj, include_custom_props=include_custom_props) - return companies, contacts, deals, tickets, products, quotes + yield from hubspot_pipelines_for_objects(api_key) + yield hubspot_properties -def crm_objects( +def fetch_props( object_type: str, - api_key: str = dlt.secrets.value, - include_history: bool = False, + api_key: str, props: Sequence[str] = None, include_custom_props: bool = True, -) -> Iterator[TDataItems]: - """Building blocks for CRM resources.""" +) -> str: + """ + Fetch the list of properties for a HubSpot object type. + + Args: + object_type (str): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str): HubSpot API key for authentication. + props (Sequence[str], optional): List of properties to fetch. Defaults to None. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. + + Returns: + str: Comma-separated list of properties. + """ if props == ALL: + # Fetch all property names props = list(_get_property_names(api_key, object_type)) + elif isinstance(props, str): + # If props are passed as a single string, convert it to a list + props = [props] + else: + # Ensure it's a list of strings, if not already + props = extract_properties_list(props) if include_custom_props: all_props = _get_property_names(api_key, object_type) @@ -199,30 +484,16 @@ def crm_objects( props = ",".join(sorted(list(set(props)))) - if len(props) > 2000: + if len(props) > MAX_PROPS_LENGTH: raise ValueError( "Your request to Hubspot is too long to process. " - "Maximum allowed query length is 2000 symbols, while " + f"Maximum allowed query length is {MAX_PROPS_LENGTH} symbols, while " f"your list of properties `{props[:200]}`... is {len(props)} " "symbols long. Use the `props` argument of the resource to " "set the list of properties to extract from the endpoint." ) + return props - params = {"properties": props, "limit": 100} - - yield from fetch_data(CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params) - if include_history: - # Get history separately, as requesting both all properties and history together - # is likely to hit hubspot's URL length limit - for history_entries in fetch_property_history( - CRM_OBJECT_ENDPOINTS[object_type], - api_key, - props, - ): - yield dlt.mark.with_table_name( - history_entries, - OBJECT_TYPE_PLURAL[object_type] + "_property_history", - ) @dlt.resource @@ -278,4 +549,4 @@ def get_web_analytics_events( write_disposition="append", selected=True, table_name=lambda e: name + "_" + str(e["eventType"]), - )(dlt.sources.incremental("occurredAt", initial_value=start_date.isoformat())) + )(dlt.sources.incremental("occurredAt", initial_value=start_date.isoformat())) \ No newline at end of file diff --git a/sources/hubspot/helpers.py b/sources/hubspot/helpers.py index 3ab42f70e..2c2c5d415 100644 --- a/sources/hubspot/helpers.py +++ b/sources/hubspot/helpers.py @@ -31,6 +31,18 @@ def _get_headers(api_key: str) -> Dict[str, str]: return dict(authorization=f"Bearer {api_key}") +def pagination(_data: Dict[str, Any], headers: Dict[str, Any]): + _next = _data.get("paging", {}).get("next", None) + # _next = False + if _next: + next_url = _next["link"] + # Get the next page response + r = requests.get(next_url, headers=headers) + return r.json() + else: + return None + + def extract_property_history(objects: List[Dict[str, Any]]) -> Iterator[Dict[str, Any]]: for item in objects: history = item.get("propertiesWithHistory") @@ -90,7 +102,10 @@ def fetch_property_history( def fetch_data( - endpoint: str, api_key: str, params: Optional[Dict[str, Any]] = None + endpoint: str, + api_key: str, + params: Optional[Dict[str, Any]] = None, + context: Optional[Dict[str, Any]] = None, ) -> Iterator[List[Dict[str, Any]]]: """ Fetch data from HUBSPOT endpoint using a specified API key and yield the properties of each result. @@ -99,7 +114,8 @@ def fetch_data( Args: endpoint (str): The endpoint to fetch data from, as a string. api_key (str): The API key to use for authentication, as a string. - params: Optional dict of query params to include in the request + params: Optional dict of query params to include in the request. + context (Optional[Dict[str, Any]]): Additional data which need to be added in the resulting page. Yields: A List of CRM object dicts @@ -132,6 +148,8 @@ def fetch_data( if "results" in _data: _objects: List[Dict[str, Any]] = [] for _result in _data["results"]: + # if _result["properties"]["hs_merged_object_ids"] is not None: + # _result["properties"]["hs_merged_object_ids"] = _result["properties"]["hs_merged_object_ids"].split(";") _obj = _result.get("properties", _result) if "id" not in _obj and "id" in _result: # Move id from properties to top level @@ -152,18 +170,13 @@ def fetch_data( ] _obj[association] = __values + if context: + _obj.update(context) _objects.append(_obj) yield _objects # Follow pagination links if they exist - _next = _data.get("paging", {}).get("next", None) - if _next: - next_url = _next["link"] - # Get the next page response - r = requests.get(next_url, headers=headers) - _data = r.json() - else: - _data = None + _data = pagination(_data, headers) def _get_property_names(api_key: str, object_type: str) -> List[str]: @@ -186,3 +199,14 @@ def _get_property_names(api_key: str, object_type: str) -> List[str]: properties.extend([prop["name"] for prop in page]) return properties + + +def get_properties_labels(api_key: str, object_type: str, property_name: str): + endpoint = f"/crm/v3/properties/{object_type}/{property_name}" + url = get_url(endpoint) + headers = _get_headers(api_key) + r = requests.get(url, headers=headers) + _data = r.json() + while _data is not None: + yield _data + _data = pagination(_data, headers) \ No newline at end of file diff --git a/sources/hubspot/settings.py b/sources/hubspot/settings.py index 05fe4d9d0..e5ab05588 100644 --- a/sources/hubspot/settings.py +++ b/sources/hubspot/settings.py @@ -1,8 +1,7 @@ """Hubspot source settings and constants""" - from dlt.common import pendulum -STARTDATE = pendulum.datetime(year=2000, month=1, day=1) +STARTDATE = pendulum.datetime(year=2024, month=2, day=10) CRM_CONTACTS_ENDPOINT = ( "/crm/v3/objects/contacts?associations=deals,products,tickets,quotes" @@ -14,6 +13,9 @@ CRM_PRODUCTS_ENDPOINT = "/crm/v3/objects/products" CRM_TICKETS_ENDPOINT = "/crm/v3/objects/tickets" CRM_QUOTES_ENDPOINT = "/crm/v3/objects/quotes" +CRM_OWNERS_ENDPOINT = "/crm/v3/owners/" +CRM_PROPERTIES_ENDPOINT = "/crm/v3/properties/{objectType}/{property_name}" +CRM_PIPELINES_ENDPOINT = "/crm/v3/pipelines/{objectType}" CRM_OBJECT_ENDPOINTS = { "contact": CRM_CONTACTS_ENDPOINT, @@ -22,6 +24,7 @@ "product": CRM_PRODUCTS_ENDPOINT, "ticket": CRM_TICKETS_ENDPOINT, "quote": CRM_QUOTES_ENDPOINT, + "owner": CRM_OWNERS_ENDPOINT, } WEB_ANALYTICS_EVENTS_ENDPOINT = "/events/v3/events?objectType={objectType}&objectId={objectId}&occurredAfter={occurredAfter}&occurredBefore={occurredBefore}&sort=-occurredAt" @@ -36,17 +39,8 @@ } OBJECT_TYPE_PLURAL = {v: k for k, v in OBJECT_TYPE_SINGULAR.items()} +ALL_OBJECTS = OBJECT_TYPE_PLURAL.keys() -DEFAULT_DEAL_PROPS = [ - "amount", - "closedate", - "createdate", - "dealname", - "dealstage", - "hs_lastmodifieddate", - "hs_object_id", - "pipeline", -] DEFAULT_COMPANY_PROPS = [ "createdate", @@ -65,6 +59,17 @@ "lastname", ] +DEFAULT_DEAL_PROPS = [ + #"amount", + #"closedate", + #"createdate", + "dealname", + "dealstage", + #"hs_lastmodifieddate", + #"hs_object_id", + #"pipeline", +] + DEFAULT_TICKET_PROPS = [ "createdate", "content", @@ -96,4 +101,22 @@ "hs_title", ] -ALL = ("ALL",) +ENTITY_PROPERTIES = { + "company": DEFAULT_COMPANY_PROPS, + "contact": DEFAULT_CONTACT_PROPS, + "deal": DEFAULT_DEAL_PROPS, + "ticket": DEFAULT_TICKET_PROPS, + "product": DEFAULT_PRODUCT_PROPS, + "quote": DEFAULT_QUOTE_PROPS, +} + + +# 'ALL' represents a list of all available properties for all types +ALL = [{"properties": "All"}] + +PIPELINES_OBJECTS = ["deals"] +SOFT_DELETE_KEY = "is_deleted" +ARCHIVED_PARAM = {"archived": True} +PREPROCESSING = {"split": ["hs_merged_object_ids"]} +STAGE_PROPERTY_PREFIX = "hs_date_entered_" +MAX_PROPS_LENGTH = 2000 \ No newline at end of file diff --git a/sources/hubspot/utils.py b/sources/hubspot/utils.py new file mode 100644 index 000000000..598afbfcf --- /dev/null +++ b/sources/hubspot/utils.py @@ -0,0 +1,7 @@ +from .settings import PREPROCESSING + +def split_data(doc): + for prop in PREPROCESSING["split"]: + if (prop in doc) and (doc[prop] is not None): + doc[prop] = doc[prop].split(";") + return doc \ No newline at end of file diff --git a/sources/hubspot_pipeline.py b/sources/hubspot_pipeline.py index 27d9f7352..ba16d027b 100644 --- a/sources/hubspot_pipeline.py +++ b/sources/hubspot_pipeline.py @@ -1,4 +1,5 @@ from typing import List + import dlt from hubspot import hubspot, hubspot_events_for_objects, THubspotObjectType @@ -13,11 +14,11 @@ def load_crm_data() -> None: """ # Create a DLT pipeline object with the pipeline name, dataset name, and destination database type - # Add dev_mode=(True or False) if you need your pipeline to create the dataset in your destination + # Add full_refresh=(True or False) if you need your pipeline to create the dataset in your destination p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="duckdb", + destination="bigquery", ) # Run the pipeline with the HubSpot source connector @@ -26,7 +27,6 @@ def load_crm_data() -> None: # Print information about the pipeline run print(info) - def load_crm_data_with_history() -> None: """ Loads all HubSpot CRM resources and property change history for each entity. @@ -37,11 +37,11 @@ def load_crm_data_with_history() -> None: """ # Create a DLT pipeline object with the pipeline name, dataset name, and destination database type - # Add dev_mode=(True or False) if you need your pipeline to create the dataset in your destination + # Add full_refresh=(True or False) if you need your pipeline to create the dataset in your destination p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="duckdb", + destination="bigquery", ) # Configure the source with `include_history` to enable property history load, history is disabled by default @@ -54,6 +54,33 @@ def load_crm_data_with_history() -> None: print(info) +def load_crm_data_with_soft_delete() -> None: + """ + Loads all HubSpot CRM resources, including soft-deleted (archived) records for each entity. + By default, only the current state of the records is loaded; property change history is not included unless explicitly enabled. + + Soft-deleted records are retrieved and marked appropriately, allowing both active and archived data to be processed. + """ + + # Create a DLT pipeline object with the pipeline name, dataset name, and destination database type. + # You can add `full_refresh=True` if the pipeline should recreate the dataset at the destination. + p = dlt.pipeline( + pipeline_name="hubspot", + dataset_name="hubspot_dataset", + destination="bigquery", + ) + + # Configure the source to load soft-deleted (archived) records. + # Property change history is disabled by default unless configured separately. + data = hubspot(soft_delete=True) + + # Run the pipeline with the HubSpot source connector. + info = p.run(data) + + # Print information about the pipeline run. + print(info) + + def load_crm_objects_with_custom_properties() -> None: """ Loads CRM objects, reading only properties defined by the user. @@ -61,31 +88,39 @@ def load_crm_objects_with_custom_properties() -> None: # Create a DLT pipeline object with the pipeline name, # dataset name, properties to read and destination database - # type Add dev_mode=(True or False) if you need your + # type Add full_refresh=(True or False) if you need your # pipeline to create the dataset in your destination - p = dlt.pipeline( + pipeline = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="duckdb", + destination="bigquery", ) - source = hubspot() - - # By default, all the custom properties of a CRM object are extracted, - # ignoring those driven by Hubspot (prefixed with `hs_`). - - # To read fields in addition to the custom ones: - # source.contacts.bind(props=["date_of_birth", "degree"]) + load_data = hubspot() + #load_data.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=True) + load_info = pipeline.run(load_data) + print(load_info) - # To read only two particular fields: - source.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=False) - # Run the pipeline with the HubSpot source connector - info = p.run(source) +def load_pipelines() -> None: + """ + This function loads web analytics events for a list objects in `object_ids` of type `object_type` - # Print information about the pipeline run - print(info) + Returns: + None + """ + # Create a DLT pipeline object with the pipeline name, dataset name, and destination database type + p = dlt.pipeline( + pipeline_name="hubspot", + dataset_name="hubspot_dataset", + destination="bigquery", + dev_mode=False, + ) + # To load data from pipelines in "deals" endpoint + load_data = hubspot().with_resources("pipelines_deals", "stages_timing_deals") + load_info = p.run(load_data) + print(load_info) def load_web_analytics_events( object_type: THubspotObjectType, object_ids: List[str] @@ -101,7 +136,7 @@ def load_web_analytics_events( p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="duckdb", + destination="bigquery", dev_mode=False, ) @@ -115,8 +150,9 @@ def load_web_analytics_events( if __name__ == "__main__": - # Call the functions to load HubSpot data into the database with and without company events enabled load_crm_data() load_crm_data_with_history() - load_web_analytics_events("company", ["7086461639", "7086464459"]) load_crm_objects_with_custom_properties() + load_pipelines() + load_crm_data_with_soft_delete() + load_web_analytics_events("company", ["7086461639", "7086464459"]) \ No newline at end of file From 104afc5f8be8de046a33e9b28f5f799a5ae655d2 Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Wed, 9 Oct 2024 10:00:23 +0000 Subject: [PATCH 2/9] Updated for lint errors --- sources/hubspot/__init__.py | 153 +++++++++++++++++++----------------- sources/hubspot/helpers.py | 4 +- sources/hubspot/utils.py | 10 ++- 3 files changed, 88 insertions(+), 79 deletions(-) diff --git a/sources/hubspot/__init__.py b/sources/hubspot/__init__.py index 09b46fbf3..daf22a2b4 100644 --- a/sources/hubspot/__init__.py +++ b/sources/hubspot/__init__.py @@ -21,10 +21,9 @@ Example: To retrieve data from all endpoints, use the following code: - """ -from typing import Any, Dict, Iterator, List, Literal, Sequence +from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Tuple from urllib.parse import quote import dlt @@ -58,12 +57,12 @@ THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"] -def extract_properties_list(props): +def extract_properties_list(props: Sequence[Any]) -> List[str]: """ Flatten a list of property dictionaries to extract property names. Args: - props (List[Dict[str, Any]]): List of property dictionaries. + props (Sequence[Any]): List of property names or property dictionaries. Returns: List[str]: List of property names. @@ -71,8 +70,12 @@ def extract_properties_list(props): return [prop if isinstance(prop, str) else prop.get("name") for prop in props] - -def fetch_data_for_properties(props, api_key, object_type, soft_delete): +def fetch_data_for_properties( + props: Sequence[str], + api_key: str, + object_type: str, + soft_delete: bool, +) -> Iterator[TDataItems]: """ Fetch data for a given set of properties from the HubSpot API. @@ -86,8 +89,8 @@ def fetch_data_for_properties(props, api_key, object_type, soft_delete): Iterator[TDataItems]: Data retrieved from the HubSpot API. """ - params = {"properties": props, "limit": 100} - context = {SOFT_DELETE_KEY: False} if soft_delete else None + params: Dict[str, Any] = {"properties": props, "limit": 100} + context: Optional[Dict[str, Any]] = {SOFT_DELETE_KEY: False} if soft_delete else None yield from fetch_data( CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params, context=context @@ -104,7 +107,7 @@ def fetch_data_for_properties(props, api_key, object_type, soft_delete): def crm_objects( object_type: str, api_key: str = dlt.secrets.value, - props: Sequence[str] = None, + props: Optional[Sequence[str]] = None, include_custom_props: bool = True, archived: bool = False, ) -> Iterator[TDataItems]: @@ -114,7 +117,7 @@ def crm_objects( Args: object_type (str): Type of HubSpot object (e.g., 'company', 'contact'). api_key (str, optional): API key for HubSpot authentication. - props (Sequence[str], optional): List of properties to retrieve. Defaults to None. + props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None. include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. archived (bool, optional): Fetch archived (soft-deleted) objects. Defaults to False. @@ -126,10 +129,10 @@ def crm_objects( def crm_object_history( - object_type: THubspotObjectType, - api_key: str = dlt.secrets.value, - include_custom_props: bool = True, -): + object_type: THubspotObjectType, + api_key: str = dlt.secrets.value, + include_custom_props: bool = True, +) -> Iterator[TDataItems]: """ Fetch the history of property changes for a given CRM object type. @@ -143,7 +146,7 @@ def crm_object_history( """ # Fetch the properties from ENTITY_PROPERTIES or default to "All" - props = ENTITY_PROPERTIES.get(object_type, "All") + props: str = ENTITY_PROPERTIES.get(object_type, "All") # Fetch the properties with the option to include custom properties props = fetch_props(object_type, api_key, props, include_custom_props) @@ -155,20 +158,21 @@ def crm_object_history( props, ) + def resource_template( - entity: THubspotObjectType, - api_key: str = dlt.config.value, - props: Sequence[str] = None, # Add props as an argument - include_custom_props: bool = False, - soft_delete: bool = False, -): + entity: THubspotObjectType, + api_key: str = dlt.config.value, + props: Optional[Sequence[str]] = None, # Add props as an argument + include_custom_props: bool = False, + soft_delete: bool = False, +) -> Iterator[TDataItems]: """ Template function to yield CRM resources for a specific HubSpot entity. Args: entity (THubspotObjectType): Type of HubSpot object (e.g., 'company', 'contact'). api_key (str, optional): HubSpot API key for authentication. - props (Sequence[str], optional): List of properties to retrieve. Defaults to None. + props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None. include_custom_props (bool, optional): Include custom properties in the result. Defaults to False. soft_delete (bool, optional): Fetch soft-deleted (archived) records. Defaults to False. @@ -177,7 +181,7 @@ def resource_template( """ # Use provided props or fetch from ENTITY_PROPERTIES if not provided - properties = ENTITY_PROPERTIES.get(entity, props or []) + properties: List[str] = ENTITY_PROPERTIES.get(entity, props or []) # Use these properties to yield the crm_objects yield from crm_objects( @@ -189,12 +193,11 @@ def resource_template( ) - def resource_history_template( entity: THubspotObjectType, api_key: str = dlt.config.value, include_custom_props: bool = False, -): +) -> Iterator[TDataItems]: """ Template function to yield historical CRM resource data for a specific HubSpot entity. @@ -213,23 +216,25 @@ def resource_history_template( @dlt.resource(name="properties", write_disposition="replace") def hubspot_properties( - properties_list: List[Dict[str, Any]] = None, + properties_list: Optional[List[Dict[str, Any]]] = None, api_key: str = dlt.secrets.value, ) -> DltResource: """ A DLT resource that retrieves HubSpot properties for a given list of objects. Args: - properties_list (List[Dict[str, Any]], optional): List of properties to retrieve. + properties_list (Optional[List[Dict[str, Any]]], optional): List of properties to retrieve. api_key (str, optional): HubSpot API key for authentication. Yields: DltResource: A DLT resource containing properties for HubSpot objects. """ - def get_properties_description(properties_list): + def get_properties_description( + properties_list_inner: List[Dict[str, Any]] + ) -> Iterator[Dict[str, Any]]: """Fetch properties.""" - for property_info in properties_list: + for property_info in properties_list_inner: yield get_properties_labels( api_key=api_key, object_type=property_info["object_type"], @@ -237,11 +242,15 @@ def get_properties_description(properties_list): ) # Ensure properties_list is defined - properties_list = properties_list or [] - yield from get_properties_description(properties_list) + properties_list_inner: List[Dict[str, Any]] = properties_list or [] + yield from get_properties_description(properties_list_inner) -def pivot_stages_properties(data, property_prefix=STAGE_PROPERTY_PREFIX, id_prop="id"): +def pivot_stages_properties( + data: List[Dict[str, Any]], + property_prefix: str = STAGE_PROPERTY_PREFIX, + id_prop: str = "id", +) -> List[Dict[str, Any]]: """ Transform the data by pivoting stage properties. @@ -253,9 +262,9 @@ def pivot_stages_properties(data, property_prefix=STAGE_PROPERTY_PREFIX, id_prop Returns: List[Dict[str, Any]]: Transformed data with pivoted stage properties. """ - new_data = [] + new_data: List[Dict[str, Any]] = [] for record in data: - record_not_null = {k: v for k, v in record.items() if v is not None} + record_not_null: Dict[str, Any] = {k: v for k, v in record.items() if v is not None} if id_prop not in record_not_null: continue id_val = record_not_null.pop(id_prop) @@ -271,7 +280,7 @@ def stages_timing( object_type: str, api_key: str = dlt.config.value, soft_delete: bool = False, - limit: int = None + limit: Optional[int] = None, ) -> Iterator[TDataItems]: """ Fetch stage timing data for a specific object type from the HubSpot API. @@ -280,24 +289,24 @@ def stages_timing( object_type (str): Type of HubSpot object (e.g., 'deal', 'ticket'). api_key (str, optional): HubSpot API key for authentication. soft_delete (bool, optional): Fetch soft-deleted (archived) records. Defaults to False. - limit (int, optional): Limit the number of properties to fetch. Defaults to None. + limit (Optional[int], optional): Limit the number of properties to fetch. Defaults to None. Yields: Iterator[TDataItems]: Stage timing data. """ - all_properties = list(_get_property_names(api_key, object_type)) - date_entered_properties = [ + all_properties: List[str] = list(_get_property_names(api_key, object_type)) + date_entered_properties: List[str] = [ prop for prop in all_properties if prop.startswith(STAGE_PROPERTY_PREFIX) ] - props = ",".join(date_entered_properties) - idx = 0 + props: str = ",".join(date_entered_properties) + idx: int = 0 if limit is None: limit = len(date_entered_properties) while idx < limit: if len(props) - idx < MAX_PROPS_LENGTH: - props_part = ",".join(props[idx: idx + MAX_PROPS_LENGTH].split(",")[:-1]) + props_part: str = ",".join(props[idx: idx + MAX_PROPS_LENGTH].split(",")[:-1]) else: - props_part = props[idx: idx + MAX_PROPS_LENGTH] + props_part: str = props[idx: idx + MAX_PROPS_LENGTH] idx += len(props_part) for data in fetch_data_for_properties( props_part, api_key, object_type, soft_delete @@ -306,8 +315,8 @@ def stages_timing( def owners( - api_key: str, # Define api_key as a required argument - soft_delete: bool = False # Add soft_delete as a parameter + api_key: str, + soft_delete: bool = False, ) -> Iterator[TDataItems]: """ Fetch HubSpot owners data. @@ -327,10 +336,10 @@ def owners( # Fetch soft-deleted owners if requested if soft_delete: for page in fetch_data( - endpoint=CRM_OBJECT_ENDPOINTS["owner"], - params=ARCHIVED_PARAM, - api_key=api_key, - context={SOFT_DELETE_KEY: True}, + endpoint=CRM_OBJECT_ENDPOINTS["owner"], + params=ARCHIVED_PARAM, + api_key=api_key, + context={SOFT_DELETE_KEY: True}, ): yield page @@ -341,7 +350,7 @@ def hubspot( include_history: bool = False, soft_delete: bool = False, include_custom_props: bool = True, - props: Sequence[str] = None, # Add props argument here + props: Optional[Sequence[str]] = None, # Add props argument here ) -> Sequence[DltResource]: """ A DLT source that retrieves data from the HubSpot API using the @@ -374,23 +383,22 @@ def hubspot( """ def hubspot_pipelines_for_objects( - api_key: str = dlt.secrets.value, - ) -> DltResource: + api_key_inner: str = dlt.secrets.value, + ) -> Iterator[DltResource]: """ - A standalone DLT resources that retrieves properties. + A standalone DLT resource that retrieves pipelines for HubSpot objects. Args: - object_type(List[THubspotObjectType], required): List of the hubspot object types see definition of THubspotObjectType literal. - api_key (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value. + api_key_inner (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value. - Returns: - Incremental dlt resource to track properties for objects from the list + Yields: + Iterator[DltResource]: DLT resources for pipelines and stages. """ - def get_pipelines(object_type: THubspotObjectType): + def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: yield from fetch_data( CRM_PIPELINES_ENDPOINT.format(objectType=object_type), - api_key=api_key, + api_key=api_key_inner, ) for obj_type in PIPELINES_OBJECTS: @@ -452,7 +460,7 @@ def get_pipelines(object_type: THubspotObjectType): def fetch_props( object_type: str, api_key: str, - props: Sequence[str] = None, + props: Optional[Sequence[str]] = None, include_custom_props: bool = True, ) -> str: """ @@ -461,7 +469,7 @@ def fetch_props( Args: object_type (str): Type of HubSpot object (e.g., 'company', 'contact'). api_key (str): HubSpot API key for authentication. - props (Sequence[str], optional): List of properties to fetch. Defaults to None. + props (Optional[Sequence[str]], optional): List of properties to fetch. Defaults to None. include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. Returns: @@ -478,8 +486,8 @@ def fetch_props( props = extract_properties_list(props) if include_custom_props: - all_props = _get_property_names(api_key, object_type) - custom_props = [prop for prop in all_props if not prop.startswith("hs_")] + all_props: List[str] = _get_property_names(api_key, object_type) + custom_props: List[str] = [prop for prop in all_props if not prop.startswith("hs_")] props = props + custom_props # type: ignore props = ",".join(sorted(list(set(props)))) @@ -495,7 +503,6 @@ def fetch_props( return props - @dlt.resource def hubspot_events_for_objects( object_type: THubspotObjectType, @@ -504,20 +511,20 @@ def hubspot_events_for_objects( start_date: pendulum.DateTime = STARTDATE, ) -> DltResource: """ - A standalone DLT resources that retrieves web analytics events from the HubSpot API for a particular object type and list of object ids. + A standalone DLT resource that retrieves web analytics events from the HubSpot API for a particular object type and list of object ids. Args: - object_type(THubspotObjectType, required): One of the hubspot object types see definition of THubspotObjectType literal - object_ids: (List[THubspotObjectType], required): List of object ids to track events + object_type (THubspotObjectType): One of the hubspot object types see definition of THubspotObjectType literal. + object_ids (List[str]): List of object ids to track events. api_key (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value. - start_date (datetime, optional): The initial date time from which start getting events, default to STARTDATE + start_date (pendulum.DateTime, optional): The initial date time from which start getting events, default to STARTDATE. Returns: - incremental dlt resource to track events for objects from the list + DltResource: Incremental DLT resource to track events for objects from the list. """ - end_date = pendulum.now().isoformat() - name = object_type + "_events" + end_date: str = pendulum.now().isoformat() + name: str = object_type + "_events" def get_web_analytics_events( occurred_at: dlt.sources.incremental[str], @@ -526,10 +533,10 @@ def get_web_analytics_events( A helper function that retrieves web analytics events for a given object type from the HubSpot API. Args: - object_type (str): The type of object for which to retrieve web analytics events. + occurred_at (dlt.sources.incremental[str]): Incremental source for event occurrence time. Yields: - dict: A dictionary representing a web analytics event. + Iterator[List[Dict[str, Any]]]: Web analytics event data. """ for object_id in object_ids: yield from fetch_data( @@ -549,4 +556,4 @@ def get_web_analytics_events( write_disposition="append", selected=True, table_name=lambda e: name + "_" + str(e["eventType"]), - )(dlt.sources.incremental("occurredAt", initial_value=start_date.isoformat())) \ No newline at end of file + )(dlt.sources.incremental("occurredAt", initial_value=start_date.isoformat())) diff --git a/sources/hubspot/helpers.py b/sources/hubspot/helpers.py index 2c2c5d415..05fe9040a 100644 --- a/sources/hubspot/helpers.py +++ b/sources/hubspot/helpers.py @@ -31,7 +31,7 @@ def _get_headers(api_key: str) -> Dict[str, str]: return dict(authorization=f"Bearer {api_key}") -def pagination(_data: Dict[str, Any], headers: Dict[str, Any]): +def pagination(_data: Dict[str, Any], headers: Dict[str, Any]) -> Optional[Dict[str, Any]]: _next = _data.get("paging", {}).get("next", None) # _next = False if _next: @@ -201,7 +201,7 @@ def _get_property_names(api_key: str, object_type: str) -> List[str]: return properties -def get_properties_labels(api_key: str, object_type: str, property_name: str): +def get_properties_labels(api_key: str, object_type: str, property_name: str) -> Iterator[Dict[str, Any]]: endpoint = f"/crm/v3/properties/{object_type}/{property_name}" url = get_url(endpoint) headers = _get_headers(api_key) diff --git a/sources/hubspot/utils.py b/sources/hubspot/utils.py index 598afbfcf..246d06d43 100644 --- a/sources/hubspot/utils.py +++ b/sources/hubspot/utils.py @@ -1,7 +1,9 @@ +from typing import Dict, Any from .settings import PREPROCESSING -def split_data(doc): +def split_data(doc: Dict[str, Any]) -> Dict[str, Any]: for prop in PREPROCESSING["split"]: - if (prop in doc) and (doc[prop] is not None): - doc[prop] = doc[prop].split(";") - return doc \ No newline at end of file + if prop in doc and doc[prop] is not None: + if isinstance(doc[prop], str): + doc[prop] = doc[prop].split(";") + return doc From a063f3eb619f9eb1c0bcf9968532adf3db0bad8a Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Wed, 9 Oct 2024 11:25:27 +0000 Subject: [PATCH 3/9] Fix errors --- sources/hubspot/__init__.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/sources/hubspot/__init__.py b/sources/hubspot/__init__.py index daf22a2b4..625fea7d5 100644 --- a/sources/hubspot/__init__.py +++ b/sources/hubspot/__init__.py @@ -181,7 +181,7 @@ def resource_template( """ # Use provided props or fetch from ENTITY_PROPERTIES if not provided - properties: List[str] = ENTITY_PROPERTIES.get(entity, props or []) + properties: List[str] = ENTITY_PROPERTIES.get(entity, list(props or [])) # Use these properties to yield the crm_objects yield from crm_objects( @@ -218,7 +218,7 @@ def resource_history_template( def hubspot_properties( properties_list: Optional[List[Dict[str, Any]]] = None, api_key: str = dlt.secrets.value, -) -> DltResource: +) -> Iterator[TDataItems]: """ A DLT resource that retrieves HubSpot properties for a given list of objects. @@ -304,9 +304,9 @@ def stages_timing( limit = len(date_entered_properties) while idx < limit: if len(props) - idx < MAX_PROPS_LENGTH: - props_part: str = ",".join(props[idx: idx + MAX_PROPS_LENGTH].split(",")[:-1]) + props_part = ",".join(props[idx: idx + MAX_PROPS_LENGTH].split(",")[:-1]) else: - props_part: str = props[idx: idx + MAX_PROPS_LENGTH] + props_part = props[idx: idx + MAX_PROPS_LENGTH] idx += len(props_part) for data in fetch_data_for_properties( props_part, api_key, object_type, soft_delete @@ -351,7 +351,7 @@ def hubspot( soft_delete: bool = False, include_custom_props: bool = True, props: Optional[Sequence[str]] = None, # Add props argument here -) -> Sequence[DltResource]: +) -> Iterator[DltResource]: """ A DLT source that retrieves data from the HubSpot API using the specified API key. @@ -477,30 +477,30 @@ def fetch_props( """ if props == ALL: # Fetch all property names - props = list(_get_property_names(api_key, object_type)) + props_list = list(_get_property_names(api_key, object_type)) elif isinstance(props, str): # If props are passed as a single string, convert it to a list - props = [props] + props_list = [props] else: # Ensure it's a list of strings, if not already - props = extract_properties_list(props) + props_list = extract_properties_list(props or []) if include_custom_props: all_props: List[str] = _get_property_names(api_key, object_type) custom_props: List[str] = [prop for prop in all_props if not prop.startswith("hs_")] - props = props + custom_props # type: ignore + props_list += custom_props - props = ",".join(sorted(list(set(props)))) + props_str = ",".join(sorted(set(props_list))) - if len(props) > MAX_PROPS_LENGTH: + if len(props_str) > MAX_PROPS_LENGTH: raise ValueError( "Your request to Hubspot is too long to process. " f"Maximum allowed query length is {MAX_PROPS_LENGTH} symbols, while " - f"your list of properties `{props[:200]}`... is {len(props)} " + f"your list of properties `{props_str[:200]}`... is {len(props_str)} " "symbols long. Use the `props` argument of the resource to " "set the list of properties to extract from the endpoint." ) - return props + return props_str @dlt.resource From 3661e22f4a7f6d3f5697e57271df73b9ff9a96b4 Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Thu, 10 Oct 2024 01:58:58 +0000 Subject: [PATCH 4/9] Corrected for mypy errors --- sources/hubspot/__init__.py | 9 ++++----- sources/hubspot/helpers.py | 10 +++++----- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/sources/hubspot/__init__.py b/sources/hubspot/__init__.py index 625fea7d5..0d9af7a61 100644 --- a/sources/hubspot/__init__.py +++ b/sources/hubspot/__init__.py @@ -23,7 +23,7 @@ To retrieve data from all endpoints, use the following code: """ -from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Tuple +from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Tuple, Union from urllib.parse import quote import dlt @@ -146,10 +146,10 @@ def crm_object_history( """ # Fetch the properties from ENTITY_PROPERTIES or default to "All" - props: str = ENTITY_PROPERTIES.get(object_type, "All") + props_entry: Union[List[str], str] = ENTITY_PROPERTIES.get(object_type, "All") # Fetch the properties with the option to include custom properties - props = fetch_props(object_type, api_key, props, include_custom_props) + props: str = fetch_props(object_type, api_key, props_entry, include_custom_props) # Yield the property history yield from fetch_property_history( @@ -158,7 +158,6 @@ def crm_object_history( props, ) - def resource_template( entity: THubspotObjectType, api_key: str = dlt.config.value, @@ -235,7 +234,7 @@ def get_properties_description( ) -> Iterator[Dict[str, Any]]: """Fetch properties.""" for property_info in properties_list_inner: - yield get_properties_labels( + yield from get_properties_labels( api_key=api_key, object_type=property_info["object_type"], property_name=property_info["property_name"], diff --git a/sources/hubspot/helpers.py b/sources/hubspot/helpers.py index 05fe9040a..aeec133c7 100644 --- a/sources/hubspot/helpers.py +++ b/sources/hubspot/helpers.py @@ -1,7 +1,7 @@ """Hubspot source helpers""" import urllib.parse -from typing import Any, Dict, Iterator, List, Optional +from typing import Any, Dict, Iterator, List, Optional, Generator from dlt.sources.helpers import requests @@ -38,7 +38,7 @@ def pagination(_data: Dict[str, Any], headers: Dict[str, Any]) -> Optional[Dict[ next_url = _next["link"] # Get the next page response r = requests.get(next_url, headers=headers) - return r.json() + return r.json() # type: ignore else: return None @@ -206,7 +206,7 @@ def get_properties_labels(api_key: str, object_type: str, property_name: str) -> url = get_url(endpoint) headers = _get_headers(api_key) r = requests.get(url, headers=headers) - _data = r.json() + _data: Optional[Dict[str, Any]] = r.json() while _data is not None: - yield _data - _data = pagination(_data, headers) \ No newline at end of file + yield _data + _data = pagination(_data, headers) From e0b596db620c6edd97216860bbcdbdf1e03058ef Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Thu, 10 Oct 2024 02:27:21 +0000 Subject: [PATCH 5/9] corrected for flake8 --- sources/hubspot/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sources/hubspot/helpers.py b/sources/hubspot/helpers.py index aeec133c7..0b5a1c833 100644 --- a/sources/hubspot/helpers.py +++ b/sources/hubspot/helpers.py @@ -208,5 +208,5 @@ def get_properties_labels(api_key: str, object_type: str, property_name: str) -> r = requests.get(url, headers=headers) _data: Optional[Dict[str, Any]] = r.json() while _data is not None: - yield _data + yield _data _data = pagination(_data, headers) From 310c42c2eb2c93f23e041b0a821121ee6b6fde3e Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Thu, 28 Nov 2024 18:28:02 +0100 Subject: [PATCH 6/9] Refactor and fix hubspot source --- sources/hubspot/README.md | 23 +-- sources/hubspot/__init__.py | 213 ++++++++++++++------------- sources/hubspot/helpers.py | 14 +- sources/hubspot/settings.py | 15 +- sources/hubspot/utils.py | 22 ++- sources/hubspot_pipeline.py | 6 +- tests/hubspot/test_hubspot_source.py | 22 ++- 7 files changed, 183 insertions(+), 132 deletions(-) diff --git a/sources/hubspot/README.md b/sources/hubspot/README.md index 17ca44e1d..e55a5ac13 100644 --- a/sources/hubspot/README.md +++ b/sources/hubspot/README.md @@ -4,17 +4,18 @@ HubSpot is a customer relationship management (CRM) software and inbound marketi The `dlt` HubSpot verified source allows you to automatically load data from HubSpot into a [destination](https://dlthub.com/docs/dlt-ecosystem/destinations/) of your choice. It loads data from the following endpoints: -| API | Data | -| --- | --- | -| Contacts | visitors, potential customers, leads | -| Companies | information about organizations | -| Deals | deal records, deal tracking | -| Products | goods, services | -| Tickets | requests for help from customers or users | -| Quotes | pricing information of a product | -| Web analytics | events | -| Owners | information about account managers or users | -| Pipelines | stages and progress tracking | +| API | Data | +| --- |-----------------------------------------------------------------| +| Contacts | visitors, potential customers, leads | +| Companies | information about organizations | +| Deals | deal records, deal tracking | +| Products | goods, services | +| Tickets | requests for help from customers or users | +| Quotes | pricing information of a product | +| Web analytics | events | +| Owners | information about account managers or users | +| Pipelines | stages and progress tracking, separate resource for each object | +| Properties | custom labels for properties with multiple choice | ## Initialize the pipeline with Hubspot verified source ```bash diff --git a/sources/hubspot/__init__.py b/sources/hubspot/__init__.py index 0d9af7a61..3097c1efb 100644 --- a/sources/hubspot/__init__.py +++ b/sources/hubspot/__init__.py @@ -1,5 +1,5 @@ """ -This is a module that provides a DLT source to retrieve data from multiple endpoints of the HubSpot API +This is a module that provides a dlt source to retrieve data from multiple endpoints of the HubSpot API using a specified API key. The retrieved data is returned as a tuple of Dlt resources, one for each endpoint. The source retrieves data from the following endpoints: @@ -23,7 +23,7 @@ To retrieve data from all endpoints, use the following code: """ -from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Tuple, Union +from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Union from urllib.parse import quote import dlt @@ -48,11 +48,13 @@ OBJECT_TYPE_PLURAL, OBJECT_TYPE_SINGULAR, PIPELINES_OBJECTS, + PROPERTIES_WITH_CUSTOM_LABELS, SOFT_DELETE_KEY, STAGE_PROPERTY_PREFIX, STARTDATE, WEB_ANALYTICS_EVENTS_ENDPOINT, ) +from .utils import chunk_properties THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"] @@ -90,7 +92,9 @@ def fetch_data_for_properties( """ params: Dict[str, Any] = {"properties": props, "limit": 100} - context: Optional[Dict[str, Any]] = {SOFT_DELETE_KEY: False} if soft_delete else None + context: Optional[Dict[str, Any]] = ( + {SOFT_DELETE_KEY: False} if soft_delete else None + ) yield from fetch_data( CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params, context=context @@ -158,10 +162,10 @@ def crm_object_history( props, ) + def resource_template( entity: THubspotObjectType, api_key: str = dlt.config.value, - props: Optional[Sequence[str]] = None, # Add props as an argument include_custom_props: bool = False, soft_delete: bool = False, ) -> Iterator[TDataItems]: @@ -180,7 +184,7 @@ def resource_template( """ # Use provided props or fetch from ENTITY_PROPERTIES if not provided - properties: List[str] = ENTITY_PROPERTIES.get(entity, list(props or [])) + properties: List[str] = ENTITY_PROPERTIES.get(entity, []) # Use these properties to yield the crm_objects yield from crm_objects( @@ -213,38 +217,6 @@ def resource_history_template( ) -@dlt.resource(name="properties", write_disposition="replace") -def hubspot_properties( - properties_list: Optional[List[Dict[str, Any]]] = None, - api_key: str = dlt.secrets.value, -) -> Iterator[TDataItems]: - """ - A DLT resource that retrieves HubSpot properties for a given list of objects. - - Args: - properties_list (Optional[List[Dict[str, Any]]], optional): List of properties to retrieve. - api_key (str, optional): HubSpot API key for authentication. - - Yields: - DltResource: A DLT resource containing properties for HubSpot objects. - """ - - def get_properties_description( - properties_list_inner: List[Dict[str, Any]] - ) -> Iterator[Dict[str, Any]]: - """Fetch properties.""" - for property_info in properties_list_inner: - yield from get_properties_labels( - api_key=api_key, - object_type=property_info["object_type"], - property_name=property_info["property_name"], - ) - - # Ensure properties_list is defined - properties_list_inner: List[Dict[str, Any]] = properties_list or [] - yield from get_properties_description(properties_list_inner) - - def pivot_stages_properties( data: List[Dict[str, Any]], property_prefix: str = STAGE_PROPERTY_PREFIX, @@ -263,7 +235,9 @@ def pivot_stages_properties( """ new_data: List[Dict[str, Any]] = [] for record in data: - record_not_null: Dict[str, Any] = {k: v for k, v in record.items() if v is not None} + record_not_null: Dict[str, Any] = { + k: v for k, v in record.items() if v is not None + } if id_prop not in record_not_null: continue id_val = record_not_null.pop(id_prop) @@ -279,80 +253,50 @@ def stages_timing( object_type: str, api_key: str = dlt.config.value, soft_delete: bool = False, - limit: Optional[int] = None, ) -> Iterator[TDataItems]: """ - Fetch stage timing data for a specific object type from the HubSpot API. + Fetch stage timing data for a specific object type from the HubSpot API. Some entities, like, + deals and tickets actually have pipelines with multiple stages, which they can enter and exit. This function fetches + history of entering and exiting different stages for the given object. + + We have to request them separately, because these properties has the pipeline stage_id in the name. + For example, "hs_date_entered_12345678", where 12345678 is the stage_id. + Args: object_type (str): Type of HubSpot object (e.g., 'deal', 'ticket'). api_key (str, optional): HubSpot API key for authentication. soft_delete (bool, optional): Fetch soft-deleted (archived) records. Defaults to False. - limit (Optional[int], optional): Limit the number of properties to fetch. Defaults to None. Yields: Iterator[TDataItems]: Stage timing data. """ + all_properties: List[str] = list(_get_property_names(api_key, object_type)) date_entered_properties: List[str] = [ prop for prop in all_properties if prop.startswith(STAGE_PROPERTY_PREFIX) ] - props: str = ",".join(date_entered_properties) - idx: int = 0 - if limit is None: - limit = len(date_entered_properties) - while idx < limit: - if len(props) - idx < MAX_PROPS_LENGTH: - props_part = ",".join(props[idx: idx + MAX_PROPS_LENGTH].split(",")[:-1]) - else: - props_part = props[idx: idx + MAX_PROPS_LENGTH] - idx += len(props_part) + + # Since the length of request should be less than MAX_PROPS_LENGTH, we cannot request + # data for the whole properties list. Therefore, in the following lines we request + # data iteratively for chunks of the properties list. + for chunk in chunk_properties(date_entered_properties, MAX_PROPS_LENGTH): + props_part = ",".join(chunk) for data in fetch_data_for_properties( props_part, api_key, object_type, soft_delete ): yield pivot_stages_properties(data) -def owners( - api_key: str, - soft_delete: bool = False, -) -> Iterator[TDataItems]: - """ - Fetch HubSpot owners data. - - Args: - api_key (str): HubSpot API key for authentication. - soft_delete (bool, optional): Fetch soft-deleted (archived) owners. Defaults to False. - - Yields: - Iterator[TDataItems]: Owner data. - """ - - # Fetch data for owners - for page in fetch_data(endpoint=CRM_OBJECT_ENDPOINTS["owner"], api_key=api_key): - yield page - - # Fetch soft-deleted owners if requested - if soft_delete: - for page in fetch_data( - endpoint=CRM_OBJECT_ENDPOINTS["owner"], - params=ARCHIVED_PARAM, - api_key=api_key, - context={SOFT_DELETE_KEY: True}, - ): - yield page - - @dlt.source(name="hubspot") def hubspot( api_key: str = dlt.secrets.value, include_history: bool = False, soft_delete: bool = False, include_custom_props: bool = True, - props: Optional[Sequence[str]] = None, # Add props argument here ) -> Iterator[DltResource]: """ - A DLT source that retrieves data from the HubSpot API using the + A dlt source that retrieves data from the HubSpot API using the specified API key. This function retrieves data for several HubSpot API endpoints, @@ -381,17 +325,77 @@ def hubspot( `api_key` argument. """ - def hubspot_pipelines_for_objects( - api_key_inner: str = dlt.secrets.value, + @dlt.resource(name="owners", write_disposition="merge", primary_key="id") + def owners( + api_key: str = api_key, soft_delete: bool = soft_delete + ) -> Iterator[TDataItems]: + """Fetch HubSpot owners data. The owners resource implemented separately, + because it doesn't have endpoint for properties requesting + + Args: + api_key (str): HubSpot API key for authentication. + soft_delete (bool, optional): Fetch soft-deleted (archived) owners. Defaults to False. + + Yields: + Iterator[TDataItems]: Owner data. + """ + + # Fetch data for owners + for page in fetch_data(endpoint=CRM_OBJECT_ENDPOINTS["owner"], api_key=api_key): + yield page + + # Fetch soft-deleted owners if requested + if soft_delete: + for page in fetch_data( + endpoint=CRM_OBJECT_ENDPOINTS["owner"], + params=ARCHIVED_PARAM, + api_key=api_key, + context={SOFT_DELETE_KEY: True}, + ): + yield page + + @dlt.resource(name="properties", write_disposition="replace") + def properties_custom_labels(api_key: str = api_key) -> Iterator[TDataItems]: + """ + A dlt resource that retrieves custom labels for given list of properties. + + Args: + api_key (str, optional): HubSpot API key for authentication. + + Yields: + DltResource: A dlt resource containing properties for HubSpot objects. + """ + + def get_properties_description( + properties_list_inner: List[Dict[str, Any]] + ) -> Iterator[Dict[str, Any]]: + """Fetch properties.""" + for property_info in properties_list_inner: + yield from get_properties_labels( + api_key=api_key, + object_type=property_info["object_type"], + property_name=property_info["property_name"], + ) + + if PROPERTIES_WITH_CUSTOM_LABELS: + yield from get_properties_description(PROPERTIES_WITH_CUSTOM_LABELS) + else: + return + + def pipelines_for_objects( + pipelines_objects: List[str], + api_key_inner: str = api_key, ) -> Iterator[DltResource]: """ - A standalone DLT resource that retrieves pipelines for HubSpot objects. + Function that yields all resources for HubSpot objects, which have pipelines. + (could be deals or/and tickets, specified in PIPELINES_OBJECTS) Args: + pipelines_objects (list of strings): The list of objects, which have pipelines. api_key_inner (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value. Yields: - Iterator[DltResource]: DLT resources for pipelines and stages. + Iterator[DltResource]: dlt resources for pipelines and stages. """ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: @@ -400,7 +404,8 @@ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: api_key=api_key_inner, ) - for obj_type in PIPELINES_OBJECTS: + # get the pipelines data + for obj_type in pipelines_objects: name = f"pipelines_{obj_type}" yield dlt.resource( get_pipelines, @@ -411,6 +416,7 @@ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: primary_key="id", )(obj_type) + # get the history of entering for pipeline stages name = f"stages_timing_{obj_type}" if obj_type in OBJECT_TYPE_SINGULAR: yield dlt.resource( @@ -420,16 +426,7 @@ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: primary_key=["id", "stage"], )(OBJECT_TYPE_SINGULAR[obj_type], soft_delete=soft_delete) - yield dlt.resource( - owners, - name="owners", - write_disposition="merge", - primary_key="id", - )( - api_key=api_key, # Pass the API key here - soft_delete=soft_delete # Pass the soft_delete flag here - ) - + # resources for all objects for obj in ALL_OBJECTS: yield dlt.resource( resource_template, @@ -438,11 +435,11 @@ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: primary_key="id", )( entity=obj, - props=props, # Pass the props argument here include_custom_props=include_custom_props, soft_delete=soft_delete, ) + # corresponding history resources if include_history: for obj in ALL_OBJECTS: yield dlt.resource( @@ -452,8 +449,14 @@ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: primary_key="object_id", )(entity=obj, include_custom_props=include_custom_props) - yield from hubspot_pipelines_for_objects(api_key) - yield hubspot_properties + # owners resource + yield owners + + # pipelines resources + yield from pipelines_for_objects(PIPELINES_OBJECTS, api_key) + + # custom properties labels resource + yield properties_custom_labels def fetch_props( @@ -486,7 +489,9 @@ def fetch_props( if include_custom_props: all_props: List[str] = _get_property_names(api_key, object_type) - custom_props: List[str] = [prop for prop in all_props if not prop.startswith("hs_")] + custom_props: List[str] = [ + prop for prop in all_props if not prop.startswith("hs_") + ] props_list += custom_props props_str = ",".join(sorted(set(props_list))) @@ -510,7 +515,7 @@ def hubspot_events_for_objects( start_date: pendulum.DateTime = STARTDATE, ) -> DltResource: """ - A standalone DLT resource that retrieves web analytics events from the HubSpot API for a particular object type and list of object ids. + A standalone dlt resource that retrieves web analytics events from the HubSpot API for a particular object type and list of object ids. Args: object_type (THubspotObjectType): One of the hubspot object types see definition of THubspotObjectType literal. @@ -519,7 +524,7 @@ def hubspot_events_for_objects( start_date (pendulum.DateTime, optional): The initial date time from which start getting events, default to STARTDATE. Returns: - DltResource: Incremental DLT resource to track events for objects from the list. + DltResource: Incremental dlt resource to track events for objects from the list. """ end_date: str = pendulum.now().isoformat() diff --git a/sources/hubspot/helpers.py b/sources/hubspot/helpers.py index 0b5a1c833..f58fe9499 100644 --- a/sources/hubspot/helpers.py +++ b/sources/hubspot/helpers.py @@ -1,7 +1,7 @@ """Hubspot source helpers""" import urllib.parse -from typing import Any, Dict, Iterator, List, Optional, Generator +from typing import Any, Dict, Generator, Iterator, List, Optional from dlt.sources.helpers import requests @@ -31,14 +31,16 @@ def _get_headers(api_key: str) -> Dict[str, str]: return dict(authorization=f"Bearer {api_key}") -def pagination(_data: Dict[str, Any], headers: Dict[str, Any]) -> Optional[Dict[str, Any]]: +def pagination( + _data: Dict[str, Any], headers: Dict[str, Any] +) -> Optional[Dict[str, Any]]: _next = _data.get("paging", {}).get("next", None) # _next = False if _next: next_url = _next["link"] # Get the next page response r = requests.get(next_url, headers=headers) - return r.json() # type: ignore + return r.json() # type: ignore else: return None @@ -148,8 +150,6 @@ def fetch_data( if "results" in _data: _objects: List[Dict[str, Any]] = [] for _result in _data["results"]: - # if _result["properties"]["hs_merged_object_ids"] is not None: - # _result["properties"]["hs_merged_object_ids"] = _result["properties"]["hs_merged_object_ids"].split(";") _obj = _result.get("properties", _result) if "id" not in _obj and "id" in _result: # Move id from properties to top level @@ -201,7 +201,9 @@ def _get_property_names(api_key: str, object_type: str) -> List[str]: return properties -def get_properties_labels(api_key: str, object_type: str, property_name: str) -> Iterator[Dict[str, Any]]: +def get_properties_labels( + api_key: str, object_type: str, property_name: str +) -> Iterator[Dict[str, Any]]: endpoint = f"/crm/v3/properties/{object_type}/{property_name}" url = get_url(endpoint) headers = _get_headers(api_key) diff --git a/sources/hubspot/settings.py b/sources/hubspot/settings.py index e5ab05588..a73529ea5 100644 --- a/sources/hubspot/settings.py +++ b/sources/hubspot/settings.py @@ -60,14 +60,14 @@ ] DEFAULT_DEAL_PROPS = [ - #"amount", - #"closedate", - #"createdate", + # "amount", + # "closedate", + # "createdate", "dealname", "dealstage", - #"hs_lastmodifieddate", - #"hs_object_id", - #"pipeline", + # "hs_lastmodifieddate", + # "hs_object_id", + # "pipeline", ] DEFAULT_TICKET_PROPS = [ @@ -119,4 +119,5 @@ ARCHIVED_PARAM = {"archived": True} PREPROCESSING = {"split": ["hs_merged_object_ids"]} STAGE_PROPERTY_PREFIX = "hs_date_entered_" -MAX_PROPS_LENGTH = 2000 \ No newline at end of file +MAX_PROPS_LENGTH = 2000 +PROPERTIES_WITH_CUSTOM_LABELS = () diff --git a/sources/hubspot/utils.py b/sources/hubspot/utils.py index 246d06d43..ad8e81794 100644 --- a/sources/hubspot/utils.py +++ b/sources/hubspot/utils.py @@ -1,9 +1,29 @@ -from typing import Dict, Any +from typing import Any, Dict, Iterator, List + from .settings import PREPROCESSING + def split_data(doc: Dict[str, Any]) -> Dict[str, Any]: for prop in PREPROCESSING["split"]: if prop in doc and doc[prop] is not None: if isinstance(doc[prop], str): doc[prop] = doc[prop].split(";") return doc + + +def chunk_properties(properties: List[str], max_length: int) -> Iterator[List[str]]: + """Function which yields chunk of properties list, making sure that + for each chunk, len(",".join(chunk)) =< max_length. + """ + chunk: List[str] = [] + length = 0 + for prop in properties: + prop_len = len(prop) + (1 if chunk else 0) # include comma length if not first + if length + prop_len > max_length: + yield chunk + chunk, length = [prop], len(prop) + else: + chunk.append(prop) + length += prop_len + if chunk: + yield chunk diff --git a/sources/hubspot_pipeline.py b/sources/hubspot_pipeline.py index ba16d027b..7f5ac0af0 100644 --- a/sources/hubspot_pipeline.py +++ b/sources/hubspot_pipeline.py @@ -27,6 +27,7 @@ def load_crm_data() -> None: # Print information about the pipeline run print(info) + def load_crm_data_with_history() -> None: """ Loads all HubSpot CRM resources and property change history for each entity. @@ -97,7 +98,7 @@ def load_crm_objects_with_custom_properties() -> None: ) load_data = hubspot() - #load_data.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=True) + # load_data.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=True) load_info = pipeline.run(load_data) print(load_info) @@ -122,6 +123,7 @@ def load_pipelines() -> None: load_info = p.run(load_data) print(load_info) + def load_web_analytics_events( object_type: THubspotObjectType, object_ids: List[str] ) -> None: @@ -155,4 +157,4 @@ def load_web_analytics_events( load_crm_objects_with_custom_properties() load_pipelines() load_crm_data_with_soft_delete() - load_web_analytics_events("company", ["7086461639", "7086464459"]) \ No newline at end of file + load_web_analytics_events("company", ["7086461639", "7086464459"]) diff --git a/tests/hubspot/test_hubspot_source.py b/tests/hubspot/test_hubspot_source.py index a0415e01b..7e4c6b36f 100644 --- a/tests/hubspot/test_hubspot_source.py +++ b/tests/hubspot/test_hubspot_source.py @@ -2,7 +2,6 @@ import dlt import pytest -from itertools import chain from typing import Any from urllib.parse import urljoin @@ -20,6 +19,7 @@ CRM_TICKETS_ENDPOINT, CRM_QUOTES_ENDPOINT, ) +from sources.hubspot.utils import chunk_properties from tests.hubspot.mock_data import ( mock_contacts_data, mock_companies_data, @@ -416,3 +416,23 @@ def test_event_resources(destination_name: str) -> None: ) print(load_info) assert_load_info(load_info) + + +def test_chunk_properties(): + properties = ["prop1", "prop2", "prop3", "prop4"] + max_length = 12 + expected = [["prop1", "prop2"], ["prop3", "prop4"]] + result = list(chunk_properties(properties, max_length)) + assert result == expected + + properties = ["prop1", "prop2"] + max_length = len("prop1,prop2") + expected = [["prop1", "prop2"]] + result = list(chunk_properties(properties, max_length)) + assert result == expected + + properties = ["p1", "p2", "p3", "p4", "p5"] + max_length = 5 # Should accommodate 'p1,p2' + expected = [["p1", "p2"], ["p3", "p4"], ["p5"]] + result = list(chunk_properties(properties, max_length)) + assert result == expected From 17254a431b4e96977285bfd6143bf0c4b58adb33 Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Thu, 5 Dec 2024 18:54:59 +0100 Subject: [PATCH 7/9] Fix the api_key, simplify, docs --- sources/hubspot/README.md | 37 +++++---- sources/hubspot/__init__.py | 110 ++++++++++----------------- sources/hubspot/settings.py | 4 +- tests/hubspot/test_hubspot_source.py | 108 +++++++++++++------------- 4 files changed, 121 insertions(+), 138 deletions(-) diff --git a/sources/hubspot/README.md b/sources/hubspot/README.md index e55a5ac13..18139da46 100644 --- a/sources/hubspot/README.md +++ b/sources/hubspot/README.md @@ -2,20 +2,29 @@ HubSpot is a customer relationship management (CRM) software and inbound marketing platform that helps businesses attract visitors, engage customers, and close leads. -The `dlt` HubSpot verified source allows you to automatically load data from HubSpot into a [destination](https://dlthub.com/docs/dlt-ecosystem/destinations/) of your choice. It loads data from the following endpoints: - -| API | Data | -| --- |-----------------------------------------------------------------| -| Contacts | visitors, potential customers, leads | -| Companies | information about organizations | -| Deals | deal records, deal tracking | -| Products | goods, services | -| Tickets | requests for help from customers or users | -| Quotes | pricing information of a product | -| Web analytics | events | -| Owners | information about account managers or users | -| Pipelines | stages and progress tracking, separate resource for each object | -| Properties | custom labels for properties with multiple choice | +The `dlt` HubSpot verified source allows you to automatically load data from HubSpot into a [destination](https://dlthub.com/docs/dlt-ecosystem/destinations/) of your choice. It loads data to the following resources: + +| resource | data | +|----------------------------|--------------------------------------------------------------------------| +| contacts | visitors, potential customers, leads | +| contacts_property_history | information about historical changes in contacts properties | +| companies | information about organizations | +| companies_property_history | information about historical changes in companies properties | +| deals | deal records, deal tracking | +| deals_property_history | information about historical changes in deals properties | +| products | goods, services | +| products_property_history | information about historical changes in products properties | +| tickets | requests for help from customers or users | +| tickets_property_history | information about historical changes in tickets properties | +| quotes | pricing information of a product | +| quotes_property_history | information about historical changes in quotes properties | +| Web analytics | events | +| owners | information about account managers or users | +| pipelines_deals | stages and progress tracking for deals | +| stages_timing_deals | history of entering and exiting different stages for the deals pipelines | +| pipelines_tickets | stages and progress tracking for tickets | +| stages_timing_tickets | history of entering and exiting different stages for the tickets pipelines | +| properties | custom labels for properties with multiple choice | ## Initialize the pipeline with Hubspot verified source ```bash diff --git a/sources/hubspot/__init__.py b/sources/hubspot/__init__.py index 3097c1efb..1edbc1591 100644 --- a/sources/hubspot/__init__.py +++ b/sources/hubspot/__init__.py @@ -110,7 +110,7 @@ def fetch_data_for_properties( def crm_objects( object_type: str, - api_key: str = dlt.secrets.value, + api_key: str, props: Optional[Sequence[str]] = None, include_custom_props: bool = True, archived: bool = False, @@ -128,13 +128,15 @@ def crm_objects( Yields: Iterator[TDataItems]: Data items retrieved from the API. """ - props = fetch_props(object_type, api_key, props, include_custom_props) - yield from fetch_data_for_properties(props, api_key, object_type, archived) + props_entry: Sequence[str] = props or ENTITY_PROPERTIES.get(object_type, []) + props_fetched = fetch_props(object_type, api_key, props_entry, include_custom_props) + yield from fetch_data_for_properties(props_fetched, api_key, object_type, archived) def crm_object_history( object_type: THubspotObjectType, - api_key: str = dlt.secrets.value, + api_key: str, + props: Optional[Sequence[str]] = None, include_custom_props: bool = True, ) -> Iterator[TDataItems]: """ @@ -143,6 +145,7 @@ def crm_object_history( Args: object_type (THubspotObjectType): Type of HubSpot object (e.g., 'company', 'contact'). api_key (str, optional): API key for HubSpot authentication. + props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None. include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. Yields: @@ -150,70 +153,20 @@ def crm_object_history( """ # Fetch the properties from ENTITY_PROPERTIES or default to "All" - props_entry: Union[List[str], str] = ENTITY_PROPERTIES.get(object_type, "All") + props_entry: Union[Sequence[str], str] = props or ENTITY_PROPERTIES.get( + object_type, ALL + ) # Fetch the properties with the option to include custom properties - props: str = fetch_props(object_type, api_key, props_entry, include_custom_props) + props_fetched: str = fetch_props( + object_type, api_key, props_entry, include_custom_props + ) # Yield the property history yield from fetch_property_history( CRM_OBJECT_ENDPOINTS[object_type], api_key, - props, - ) - - -def resource_template( - entity: THubspotObjectType, - api_key: str = dlt.config.value, - include_custom_props: bool = False, - soft_delete: bool = False, -) -> Iterator[TDataItems]: - """ - Template function to yield CRM resources for a specific HubSpot entity. - - Args: - entity (THubspotObjectType): Type of HubSpot object (e.g., 'company', 'contact'). - api_key (str, optional): HubSpot API key for authentication. - props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None. - include_custom_props (bool, optional): Include custom properties in the result. Defaults to False. - soft_delete (bool, optional): Fetch soft-deleted (archived) records. Defaults to False. - - Yields: - Iterator[TDataItems]: CRM object data retrieved from the API. - """ - - # Use provided props or fetch from ENTITY_PROPERTIES if not provided - properties: List[str] = ENTITY_PROPERTIES.get(entity, []) - - # Use these properties to yield the crm_objects - yield from crm_objects( - entity, - api_key, - props=properties, # Pass the properties to the crm_objects function - include_custom_props=include_custom_props, - archived=soft_delete, - ) - - -def resource_history_template( - entity: THubspotObjectType, - api_key: str = dlt.config.value, - include_custom_props: bool = False, -) -> Iterator[TDataItems]: - """ - Template function to yield historical CRM resource data for a specific HubSpot entity. - - Args: - entity (THubspotObjectType): Type of HubSpot object (e.g., 'company', 'contact'). - api_key (str, optional): HubSpot API key for authentication. - include_custom_props (bool, optional): Include custom properties in the result. Defaults to False. - - Yields: - Iterator[TDataItems]: Historical data for the CRM object. - """ - yield from crm_object_history( - entity, api_key, include_custom_props=include_custom_props + props_fetched, ) @@ -251,7 +204,7 @@ def pivot_stages_properties( def stages_timing( object_type: str, - api_key: str = dlt.config.value, + api_key: str, soft_delete: bool = False, ) -> Iterator[TDataItems]: """ @@ -294,6 +247,7 @@ def hubspot( include_history: bool = False, soft_delete: bool = False, include_custom_props: bool = True, + properties: Optional[Dict[str, Any]] = None, ) -> Iterator[DltResource]: """ A dlt source that retrieves data from the HubSpot API using the @@ -301,7 +255,7 @@ def hubspot( This function retrieves data for several HubSpot API endpoints, including companies, contacts, deals, tickets, products and web - analytics events. It returns a tuple of Dlt resources, one for + analytics events. It returns a tuple of dlt resources, one for each endpoint. Args: @@ -315,6 +269,10 @@ def hubspot( Whether to fetch deleted properties and mark them as `is_deleted`. include_custom_props (bool): Whether to include custom properties. + properties (Optional(dict)): + A dictionary containing lists of properties for all the resources. + Will override the default properties ENTITY_PROPERTIES from settings + For ex., {"contact": ["createdate", "email", "firstname", "hs_object_id", "lastmodifieddate", "lastname",]} Returns: Sequence[DltResource]: Dlt resources, one for each HubSpot API endpoint. @@ -424,30 +382,40 @@ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: name=name, write_disposition="merge", primary_key=["id", "stage"], - )(OBJECT_TYPE_SINGULAR[obj_type], soft_delete=soft_delete) + )( + OBJECT_TYPE_SINGULAR[obj_type], + api_key=api_key, + soft_delete=soft_delete, + ) # resources for all objects for obj in ALL_OBJECTS: yield dlt.resource( - resource_template, + crm_objects, name=OBJECT_TYPE_PLURAL[obj], write_disposition="merge", primary_key="id", )( - entity=obj, + object_type=obj, + api_key=api_key, + props=properties.get(obj) if properties else None, include_custom_props=include_custom_props, - soft_delete=soft_delete, + archived=soft_delete, ) # corresponding history resources if include_history: for obj in ALL_OBJECTS: yield dlt.resource( - resource_history_template, + crm_object_history, name=f"{OBJECT_TYPE_PLURAL[obj]}_property_history", - write_disposition="merge", - primary_key="object_id", - )(entity=obj, include_custom_props=include_custom_props) + write_disposition="append", + )( + object_type=obj, + api_key=api_key, + props=properties.get(obj) if properties else None, + include_custom_props=include_custom_props, + ) # owners resource yield owners diff --git a/sources/hubspot/settings.py b/sources/hubspot/settings.py index a73529ea5..6c4e2698a 100644 --- a/sources/hubspot/settings.py +++ b/sources/hubspot/settings.py @@ -112,9 +112,9 @@ # 'ALL' represents a list of all available properties for all types -ALL = [{"properties": "All"}] +ALL = "All" -PIPELINES_OBJECTS = ["deals"] +PIPELINES_OBJECTS = ["deals", "tickets"] SOFT_DELETE_KEY = "is_deleted" ARCHIVED_PARAM = {"archived": True} PREPROCESSING = {"split": ["hs_merged_object_ids"]} diff --git a/tests/hubspot/test_hubspot_source.py b/tests/hubspot/test_hubspot_source.py index 7e4c6b36f..262d2b27a 100644 --- a/tests/hubspot/test_hubspot_source.py +++ b/tests/hubspot/test_hubspot_source.py @@ -143,11 +143,13 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] api_key="fake_key", include_history=True, ) - load_info = pipeline.run(source.with_resources("contacts")) + load_info = pipeline.run( + source.with_resources("contacts", "contacts_property_history") + ) assert_load_info(load_info) - assert m.call_count == 3 + assert m.call_count == 4 # Check that API is called with all properties listed m.assert_has_calls( @@ -165,6 +167,11 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] "limit": 100, }, ), + call( + urljoin(BASE_URL, "/crm/v3/properties/contacts"), + headers=ANY, + params=None, + ), call( urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), headers=ANY, @@ -184,9 +191,11 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) def test_too_many_properties(destination_name: str) -> None: with pytest.raises(ResourceExtractionError): - source = hubspot(api_key="fake_key", include_history=True) - source.contacts.bind(props=["property"] * 500) - list(source.with_resources("contacts")) + with patch( + "sources.hubspot.ENTITY_PROPERTIES", {"contact": ["property"] * 500} + ): + source = hubspot(api_key="fake_key", include_history=True) + list(source.with_resources("contacts")) @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) @@ -205,26 +214,25 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] dataset_name="hubspot_data", dev_mode=True, ) - source = hubspot(api_key="fake_key") - source.contacts.bind(props=props, include_custom_props=False) - - with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: - load_info = pipeline.run(source.with_resources("contacts")) - - assert_load_info(load_info) - - m.assert_has_calls( - [ - call( - urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), - headers=ANY, - params={ - "properties": expected_props, - "limit": 100, - }, - ), - ] - ) + source = hubspot(api_key="fake_key", include_custom_props=False) + with patch("sources.hubspot.ENTITY_PROPERTIES", {"contact": props}): + with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: + load_info = pipeline.run(source.with_resources("contacts")) + + assert_load_info(load_info) + + m.assert_has_calls( + [ + call( + urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), + headers=ANY, + params={ + "properties": expected_props, + "limit": 100, + }, + ), + ] + ) @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) @@ -242,8 +250,7 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] dataset_name="hubspot_data", dev_mode=True, ) - source = hubspot(api_key="fake_key") - source.contacts.bind(include_custom_props=False) + source = hubspot(api_key="fake_key", include_custom_props=False) with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: load_info = pipeline.run(source.with_resources("contacts")) @@ -281,30 +288,29 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] dev_mode=True, ) source = hubspot(api_key="fake_key") - source.contacts.bind(props=props) - - with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: - load_info = pipeline.run(source.with_resources("contacts")) - - assert_load_info(load_info) - - m.assert_has_calls( - [ - call( - urljoin(BASE_URL, "/crm/v3/properties/contacts"), - headers=ANY, - params=None, - ), - call( - urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), - headers=ANY, - params={ - "properties": expected_props, - "limit": 100, - }, - ), - ] - ) + with patch("sources.hubspot.ENTITY_PROPERTIES", {"contact": props}): + with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: + load_info = pipeline.run(source.with_resources("contacts")) + + assert_load_info(load_info) + + m.assert_has_calls( + [ + call( + urljoin(BASE_URL, "/crm/v3/properties/contacts"), + headers=ANY, + params=None, + ), + call( + urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), + headers=ANY, + params={ + "properties": expected_props, + "limit": 100, + }, + ), + ] + ) @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) From 60fe52bb38960d1e73150410bfef9c7144f963c9 Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Wed, 18 Dec 2024 17:12:09 +0100 Subject: [PATCH 8/9] Fix hubspot tests --- tests/hubspot/test_hubspot_source.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/tests/hubspot/test_hubspot_source.py b/tests/hubspot/test_hubspot_source.py index 262d2b27a..4913793e6 100644 --- a/tests/hubspot/test_hubspot_source.py +++ b/tests/hubspot/test_hubspot_source.py @@ -345,7 +345,7 @@ def test_all_resources(destination_name: str) -> None: dataset_name="hubspot_data", dev_mode=True, ) - load_info = pipeline.run(hubspot(include_history=True)) + load_info = pipeline.run(hubspot(include_history=True).with_resources("contacts", "deals", "companies", "contacts_property_history")) assert_load_info(load_info) table_names = [ @@ -358,7 +358,7 @@ def test_all_resources(destination_name: str) -> None: assert ( load_table_counts(pipeline, *table_names) == load_table_distinct_counts(pipeline, "hs_object_id", *table_names) - == {"companies": 200, "deals": 500, "contacts": 402} + == {'companies': 4, 'contacts': 3, 'deals': 2} ) history_table_names = [ @@ -369,11 +369,9 @@ def test_all_resources(destination_name: str) -> None: table_counts = load_table_counts(pipeline, *history_table_names) # Check history tables # NOTE: this value is increasing... maybe we should start testing ranges - assert table_counts["companies_property_history"] >= 4018 - assert table_counts["contacts_property_history"] >= 5935 - assert table_counts["deals_property_history"] >= 5162 + assert table_counts["contacts_property_history"] >= 76 - # Check property from couple of contacts against known data + # Check property from a couple of contacts against known data with pipeline.sql_client() as client: rows = [ list(row) @@ -397,14 +395,20 @@ def test_all_resources(destination_name: str) -> None: "email", "emailmaria@hubspot.com", "API", - pendulum.parse("2022-06-15 08:51:51.399"), + pendulum.parse("2023-06-28 13:55:47.558"), + ), + ( + "email", + "thisisnewemail@hubspot.com", + "CRM_UI", + pendulum.parse("2023-07-01 23:34:57.837"), ), ( "email", "bh@hubspot.com", "API", - pendulum.parse("2022-06-15 08:51:51.399"), - ), + pendulum.parse("2023-06-28 13:55:47.558"), + ) ] ) From 7794ad15487c2687fb7c74e22cf3f82959ad7d24 Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Wed, 18 Dec 2024 17:16:57 +0100 Subject: [PATCH 9/9] Minor --- sources/hubspot/settings.py | 12 ++++++------ sources/hubspot_pipeline.py | 15 +++++++-------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/sources/hubspot/settings.py b/sources/hubspot/settings.py index 6c4e2698a..09e126514 100644 --- a/sources/hubspot/settings.py +++ b/sources/hubspot/settings.py @@ -60,14 +60,14 @@ ] DEFAULT_DEAL_PROPS = [ - # "amount", - # "closedate", - # "createdate", + "amount", + "closedate", + "createdate", "dealname", "dealstage", - # "hs_lastmodifieddate", - # "hs_object_id", - # "pipeline", + "hs_lastmodifieddate", + "hs_object_id", + "pipeline", ] DEFAULT_TICKET_PROPS = [ diff --git a/sources/hubspot_pipeline.py b/sources/hubspot_pipeline.py index 7f5ac0af0..0f2e7c5ea 100644 --- a/sources/hubspot_pipeline.py +++ b/sources/hubspot_pipeline.py @@ -18,7 +18,7 @@ def load_crm_data() -> None: p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="bigquery", + destination="duckdb", ) # Run the pipeline with the HubSpot source connector @@ -42,7 +42,7 @@ def load_crm_data_with_history() -> None: p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="bigquery", + destination="duckdb", ) # Configure the source with `include_history` to enable property history load, history is disabled by default @@ -68,7 +68,7 @@ def load_crm_data_with_soft_delete() -> None: p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="bigquery", + destination="duckdb", ) # Configure the source to load soft-deleted (archived) records. @@ -94,11 +94,10 @@ def load_crm_objects_with_custom_properties() -> None: pipeline = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="bigquery", + destination="duckdb", ) - load_data = hubspot() - # load_data.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=True) + load_data = hubspot(properties={"contact": ("date_of_birth", "degree")}, include_custom_props=True) load_info = pipeline.run(load_data) print(load_info) @@ -115,7 +114,7 @@ def load_pipelines() -> None: p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="bigquery", + destination="duckdb", dev_mode=False, ) # To load data from pipelines in "deals" endpoint @@ -138,7 +137,7 @@ def load_web_analytics_events( p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="bigquery", + destination="duckdb", dev_mode=False, )