diff --git a/sources/hubspot/README.md b/sources/hubspot/README.md index 357170ef3..18139da46 100644 --- a/sources/hubspot/README.md +++ b/sources/hubspot/README.md @@ -2,17 +2,29 @@ HubSpot is a customer relationship management (CRM) software and inbound marketing platform that helps businesses attract visitors, engage customers, and close leads. -The `dlt` HubSpot verified source allows you to automatically load data from HubSpot into a [destination](https://dlthub.com/docs/dlt-ecosystem/destinations/) of your choice. It loads data from the following endpoints: - -| API | Data | -| --- | --- | -| Contacts | visitors, potential customers, leads | -| Companies | information about organizations | -| Deals | deal records, deal tracking | -| Products | goods, services | -| Tickets | requests for help from customers or users | -| Quotes | pricing information of a product | -| Web analytics | events | +The `dlt` HubSpot verified source allows you to automatically load data from HubSpot into a [destination](https://dlthub.com/docs/dlt-ecosystem/destinations/) of your choice. It loads data to the following resources: + +| resource | data | +|----------------------------|--------------------------------------------------------------------------| +| contacts | visitors, potential customers, leads | +| contacts_property_history | information about historical changes in contacts properties | +| companies | information about organizations | +| companies_property_history | information about historical changes in companies properties | +| deals | deal records, deal tracking | +| deals_property_history | information about historical changes in deals properties | +| products | goods, services | +| products_property_history | information about historical changes in products properties | +| tickets | requests for help from customers or users | +| tickets_property_history | information about historical changes in tickets properties | +| quotes | pricing information of a product | +| quotes_property_history | information about historical changes in quotes properties | +| Web analytics | events | +| owners | information about account managers or users | +| pipelines_deals | stages and progress tracking for deals | +| stages_timing_deals | history of entering and exiting different stages for the deals pipelines | +| pipelines_tickets | stages and progress tracking for tickets | +| stages_timing_tickets | history of entering and exiting different stages for the tickets pipelines | +| properties | custom labels for properties with multiple choice | ## Initialize the pipeline with Hubspot verified source ```bash diff --git a/sources/hubspot/__init__.py b/sources/hubspot/__init__.py index 5e09fc7f1..1edbc1591 100644 --- a/sources/hubspot/__init__.py +++ b/sources/hubspot/__init__.py @@ -1,5 +1,6 @@ """ -This is a module that provides a DLT source to retrieve data from multiple endpoints of the HubSpot API using a specified API key. The retrieved data is returned as a tuple of Dlt resources, one for each endpoint. +This is a module that provides a dlt source to retrieve data from multiple endpoints of the HubSpot API +using a specified API key. The retrieved data is returned as a tuple of Dlt resources, one for each endpoint. The source retrieves data from the following endpoints: - CRM Companies @@ -8,6 +9,8 @@ - CRM Tickets - CRM Products - CRM Quotes +- CRM Owners +- CRM Pipelines - Web Analytics Events For each endpoint, a resource and transformer function are defined to retrieve data and transform it to a common format. @@ -18,13 +21,9 @@ Example: To retrieve data from all endpoints, use the following code: - -python - ->>> resources = hubspot(api_key="your_api_key") """ -from typing import Any, Dict, Iterator, List, Literal, Sequence +from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Union from urllib.parse import quote import dlt @@ -32,37 +31,231 @@ from dlt.common.typing import TDataItems from dlt.sources import DltResource -from .helpers import _get_property_names, fetch_data, fetch_property_history +from .helpers import ( + _get_property_names, + fetch_data, + fetch_property_history, + get_properties_labels, +) from .settings import ( ALL, + ALL_OBJECTS, + ARCHIVED_PARAM, CRM_OBJECT_ENDPOINTS, - DEFAULT_COMPANY_PROPS, - DEFAULT_CONTACT_PROPS, - DEFAULT_DEAL_PROPS, - DEFAULT_PRODUCT_PROPS, - DEFAULT_QUOTE_PROPS, - DEFAULT_TICKET_PROPS, + CRM_PIPELINES_ENDPOINT, + ENTITY_PROPERTIES, + MAX_PROPS_LENGTH, OBJECT_TYPE_PLURAL, + OBJECT_TYPE_SINGULAR, + PIPELINES_OBJECTS, + PROPERTIES_WITH_CUSTOM_LABELS, + SOFT_DELETE_KEY, + STAGE_PROPERTY_PREFIX, STARTDATE, WEB_ANALYTICS_EVENTS_ENDPOINT, ) +from .utils import chunk_properties THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"] +def extract_properties_list(props: Sequence[Any]) -> List[str]: + """ + Flatten a list of property dictionaries to extract property names. + + Args: + props (Sequence[Any]): List of property names or property dictionaries. + + Returns: + List[str]: List of property names. + """ + return [prop if isinstance(prop, str) else prop.get("name") for prop in props] + + +def fetch_data_for_properties( + props: Sequence[str], + api_key: str, + object_type: str, + soft_delete: bool, +) -> Iterator[TDataItems]: + """ + Fetch data for a given set of properties from the HubSpot API. + + Args: + props (Sequence[str]): List of property names to fetch. + api_key (str): HubSpot API key for authentication. + object_type (str): The type of HubSpot object (e.g., 'company', 'contact'). + soft_delete (bool): Flag to fetch soft-deleted (archived) records. + + Yields: + Iterator[TDataItems]: Data retrieved from the HubSpot API. + """ + + params: Dict[str, Any] = {"properties": props, "limit": 100} + context: Optional[Dict[str, Any]] = ( + {SOFT_DELETE_KEY: False} if soft_delete else None + ) + + yield from fetch_data( + CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params, context=context + ) + if soft_delete: + yield from fetch_data( + CRM_OBJECT_ENDPOINTS[object_type], + api_key, + params={**params, **ARCHIVED_PARAM}, + context={SOFT_DELETE_KEY: True}, + ) + + +def crm_objects( + object_type: str, + api_key: str, + props: Optional[Sequence[str]] = None, + include_custom_props: bool = True, + archived: bool = False, +) -> Iterator[TDataItems]: + """ + Fetch CRM object data (e.g., companies, contacts) from the HubSpot API. + + Args: + object_type (str): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str, optional): API key for HubSpot authentication. + props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. + archived (bool, optional): Fetch archived (soft-deleted) objects. Defaults to False. + + Yields: + Iterator[TDataItems]: Data items retrieved from the API. + """ + props_entry: Sequence[str] = props or ENTITY_PROPERTIES.get(object_type, []) + props_fetched = fetch_props(object_type, api_key, props_entry, include_custom_props) + yield from fetch_data_for_properties(props_fetched, api_key, object_type, archived) + + +def crm_object_history( + object_type: THubspotObjectType, + api_key: str, + props: Optional[Sequence[str]] = None, + include_custom_props: bool = True, +) -> Iterator[TDataItems]: + """ + Fetch the history of property changes for a given CRM object type. + + Args: + object_type (THubspotObjectType): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str, optional): API key for HubSpot authentication. + props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. + + Yields: + Iterator[TDataItems]: Historical property data. + """ + + # Fetch the properties from ENTITY_PROPERTIES or default to "All" + props_entry: Union[Sequence[str], str] = props or ENTITY_PROPERTIES.get( + object_type, ALL + ) + + # Fetch the properties with the option to include custom properties + props_fetched: str = fetch_props( + object_type, api_key, props_entry, include_custom_props + ) + + # Yield the property history + yield from fetch_property_history( + CRM_OBJECT_ENDPOINTS[object_type], + api_key, + props_fetched, + ) + + +def pivot_stages_properties( + data: List[Dict[str, Any]], + property_prefix: str = STAGE_PROPERTY_PREFIX, + id_prop: str = "id", +) -> List[Dict[str, Any]]: + """ + Transform the data by pivoting stage properties. + + Args: + data (List[Dict[str, Any]]): Data containing stage properties. + property_prefix (str, optional): Prefix for stage properties. Defaults to STAGE_PROPERTY_PREFIX. + id_prop (str, optional): Name of the ID property. Defaults to "id". + + Returns: + List[Dict[str, Any]]: Transformed data with pivoted stage properties. + """ + new_data: List[Dict[str, Any]] = [] + for record in data: + record_not_null: Dict[str, Any] = { + k: v for k, v in record.items() if v is not None + } + if id_prop not in record_not_null: + continue + id_val = record_not_null.pop(id_prop) + new_data += [ + {id_prop: id_val, property_prefix: v, "stage": k.split(property_prefix)[1]} + for k, v in record_not_null.items() + if k.startswith(property_prefix) + ] + return new_data + + +def stages_timing( + object_type: str, + api_key: str, + soft_delete: bool = False, +) -> Iterator[TDataItems]: + """ + Fetch stage timing data for a specific object type from the HubSpot API. Some entities, like, + deals and tickets actually have pipelines with multiple stages, which they can enter and exit. This function fetches + history of entering and exiting different stages for the given object. + + We have to request them separately, because these properties has the pipeline stage_id in the name. + For example, "hs_date_entered_12345678", where 12345678 is the stage_id. + + + Args: + object_type (str): Type of HubSpot object (e.g., 'deal', 'ticket'). + api_key (str, optional): HubSpot API key for authentication. + soft_delete (bool, optional): Fetch soft-deleted (archived) records. Defaults to False. + + Yields: + Iterator[TDataItems]: Stage timing data. + """ + + all_properties: List[str] = list(_get_property_names(api_key, object_type)) + date_entered_properties: List[str] = [ + prop for prop in all_properties if prop.startswith(STAGE_PROPERTY_PREFIX) + ] + + # Since the length of request should be less than MAX_PROPS_LENGTH, we cannot request + # data for the whole properties list. Therefore, in the following lines we request + # data iteratively for chunks of the properties list. + for chunk in chunk_properties(date_entered_properties, MAX_PROPS_LENGTH): + props_part = ",".join(chunk) + for data in fetch_data_for_properties( + props_part, api_key, object_type, soft_delete + ): + yield pivot_stages_properties(data) + + @dlt.source(name="hubspot") def hubspot( api_key: str = dlt.secrets.value, include_history: bool = False, + soft_delete: bool = False, include_custom_props: bool = True, -) -> Sequence[DltResource]: + properties: Optional[Dict[str, Any]] = None, +) -> Iterator[DltResource]: """ - A DLT source that retrieves data from the HubSpot API using the + A dlt source that retrieves data from the HubSpot API using the specified API key. This function retrieves data for several HubSpot API endpoints, including companies, contacts, deals, tickets, products and web - analytics events. It returns a tuple of Dlt resources, one for + analytics events. It returns a tuple of dlt resources, one for each endpoint. Args: @@ -72,6 +265,14 @@ def hubspot( include_history (Optional[bool]): Whether to load history of property changes along with entities. The history entries are loaded to separate tables. + soft_delete (bool): + Whether to fetch deleted properties and mark them as `is_deleted`. + include_custom_props (bool): + Whether to include custom properties. + properties (Optional(dict)): + A dictionary containing lists of properties for all the resources. + Will override the default properties ENTITY_PROPERTIES from settings + For ex., {"contact": ["createdate", "email", "firstname", "hs_object_id", "lastmodifieddate", "lastname",]} Returns: Sequence[DltResource]: Dlt resources, one for each HubSpot API endpoint. @@ -82,147 +283,196 @@ def hubspot( `api_key` argument. """ - @dlt.resource(name="companies", write_disposition="replace") - def companies( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_COMPANY_PROPS, - include_custom_props: bool = include_custom_props, + @dlt.resource(name="owners", write_disposition="merge", primary_key="id") + def owners( + api_key: str = api_key, soft_delete: bool = soft_delete ) -> Iterator[TDataItems]: - """Hubspot companies resource""" - yield from crm_objects( - "company", - api_key, - include_history=include_history, - props=props, - include_custom_props=include_custom_props, - ) + """Fetch HubSpot owners data. The owners resource implemented separately, + because it doesn't have endpoint for properties requesting - @dlt.resource(name="contacts", write_disposition="replace") - def contacts( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_CONTACT_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot contacts resource""" - yield from crm_objects( - "contact", - api_key, - include_history, - props, - include_custom_props, - ) + Args: + api_key (str): HubSpot API key for authentication. + soft_delete (bool, optional): Fetch soft-deleted (archived) owners. Defaults to False. - @dlt.resource(name="deals", write_disposition="replace") - def deals( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_DEAL_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot deals resource""" - yield from crm_objects( - "deal", - api_key, - include_history, - props, - include_custom_props, - ) + Yields: + Iterator[TDataItems]: Owner data. + """ - @dlt.resource(name="tickets", write_disposition="replace") - def tickets( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_TICKET_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot tickets resource""" - yield from crm_objects( - "ticket", - api_key, - include_history, - props, - include_custom_props, - ) + # Fetch data for owners + for page in fetch_data(endpoint=CRM_OBJECT_ENDPOINTS["owner"], api_key=api_key): + yield page - @dlt.resource(name="products", write_disposition="replace") - def products( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_PRODUCT_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot products resource""" - yield from crm_objects( - "product", - api_key, - include_history, - props, - include_custom_props, - ) + # Fetch soft-deleted owners if requested + if soft_delete: + for page in fetch_data( + endpoint=CRM_OBJECT_ENDPOINTS["owner"], + params=ARCHIVED_PARAM, + api_key=api_key, + context={SOFT_DELETE_KEY: True}, + ): + yield page - @dlt.resource(name="quotes", write_disposition="replace") - def quotes( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_QUOTE_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot quotes resource""" - yield from crm_objects( - "quote", - api_key, - include_history, - props, - include_custom_props, + @dlt.resource(name="properties", write_disposition="replace") + def properties_custom_labels(api_key: str = api_key) -> Iterator[TDataItems]: + """ + A dlt resource that retrieves custom labels for given list of properties. + + Args: + api_key (str, optional): HubSpot API key for authentication. + + Yields: + DltResource: A dlt resource containing properties for HubSpot objects. + """ + + def get_properties_description( + properties_list_inner: List[Dict[str, Any]] + ) -> Iterator[Dict[str, Any]]: + """Fetch properties.""" + for property_info in properties_list_inner: + yield from get_properties_labels( + api_key=api_key, + object_type=property_info["object_type"], + property_name=property_info["property_name"], + ) + + if PROPERTIES_WITH_CUSTOM_LABELS: + yield from get_properties_description(PROPERTIES_WITH_CUSTOM_LABELS) + else: + return + + def pipelines_for_objects( + pipelines_objects: List[str], + api_key_inner: str = api_key, + ) -> Iterator[DltResource]: + """ + Function that yields all resources for HubSpot objects, which have pipelines. + (could be deals or/and tickets, specified in PIPELINES_OBJECTS) + + Args: + pipelines_objects (list of strings): The list of objects, which have pipelines. + api_key_inner (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value. + + Yields: + Iterator[DltResource]: dlt resources for pipelines and stages. + """ + + def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: + yield from fetch_data( + CRM_PIPELINES_ENDPOINT.format(objectType=object_type), + api_key=api_key_inner, + ) + + # get the pipelines data + for obj_type in pipelines_objects: + name = f"pipelines_{obj_type}" + yield dlt.resource( + get_pipelines, + name=name, + write_disposition="merge", + merge_key="id", + table_name=name, + primary_key="id", + )(obj_type) + + # get the history of entering for pipeline stages + name = f"stages_timing_{obj_type}" + if obj_type in OBJECT_TYPE_SINGULAR: + yield dlt.resource( + stages_timing, + name=name, + write_disposition="merge", + primary_key=["id", "stage"], + )( + OBJECT_TYPE_SINGULAR[obj_type], + api_key=api_key, + soft_delete=soft_delete, + ) + + # resources for all objects + for obj in ALL_OBJECTS: + yield dlt.resource( + crm_objects, + name=OBJECT_TYPE_PLURAL[obj], + write_disposition="merge", + primary_key="id", + )( + object_type=obj, + api_key=api_key, + props=properties.get(obj) if properties else None, + include_custom_props=include_custom_props, + archived=soft_delete, ) - return companies, contacts, deals, tickets, products, quotes + # corresponding history resources + if include_history: + for obj in ALL_OBJECTS: + yield dlt.resource( + crm_object_history, + name=f"{OBJECT_TYPE_PLURAL[obj]}_property_history", + write_disposition="append", + )( + object_type=obj, + api_key=api_key, + props=properties.get(obj) if properties else None, + include_custom_props=include_custom_props, + ) + # owners resource + yield owners -def crm_objects( + # pipelines resources + yield from pipelines_for_objects(PIPELINES_OBJECTS, api_key) + + # custom properties labels resource + yield properties_custom_labels + + +def fetch_props( object_type: str, - api_key: str = dlt.secrets.value, - include_history: bool = False, - props: Sequence[str] = None, + api_key: str, + props: Optional[Sequence[str]] = None, include_custom_props: bool = True, -) -> Iterator[TDataItems]: - """Building blocks for CRM resources.""" +) -> str: + """ + Fetch the list of properties for a HubSpot object type. + + Args: + object_type (str): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str): HubSpot API key for authentication. + props (Optional[Sequence[str]], optional): List of properties to fetch. Defaults to None. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. + + Returns: + str: Comma-separated list of properties. + """ if props == ALL: - props = list(_get_property_names(api_key, object_type)) + # Fetch all property names + props_list = list(_get_property_names(api_key, object_type)) + elif isinstance(props, str): + # If props are passed as a single string, convert it to a list + props_list = [props] + else: + # Ensure it's a list of strings, if not already + props_list = extract_properties_list(props or []) if include_custom_props: - all_props = _get_property_names(api_key, object_type) - custom_props = [prop for prop in all_props if not prop.startswith("hs_")] - props = props + custom_props # type: ignore + all_props: List[str] = _get_property_names(api_key, object_type) + custom_props: List[str] = [ + prop for prop in all_props if not prop.startswith("hs_") + ] + props_list += custom_props - props = ",".join(sorted(list(set(props)))) + props_str = ",".join(sorted(set(props_list))) - if len(props) > 2000: + if len(props_str) > MAX_PROPS_LENGTH: raise ValueError( "Your request to Hubspot is too long to process. " - "Maximum allowed query length is 2000 symbols, while " - f"your list of properties `{props[:200]}`... is {len(props)} " + f"Maximum allowed query length is {MAX_PROPS_LENGTH} symbols, while " + f"your list of properties `{props_str[:200]}`... is {len(props_str)} " "symbols long. Use the `props` argument of the resource to " "set the list of properties to extract from the endpoint." ) - - params = {"properties": props, "limit": 100} - - yield from fetch_data(CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params) - if include_history: - # Get history separately, as requesting both all properties and history together - # is likely to hit hubspot's URL length limit - for history_entries in fetch_property_history( - CRM_OBJECT_ENDPOINTS[object_type], - api_key, - props, - ): - yield dlt.mark.with_table_name( - history_entries, - OBJECT_TYPE_PLURAL[object_type] + "_property_history", - ) + return props_str @dlt.resource @@ -233,20 +483,20 @@ def hubspot_events_for_objects( start_date: pendulum.DateTime = STARTDATE, ) -> DltResource: """ - A standalone DLT resources that retrieves web analytics events from the HubSpot API for a particular object type and list of object ids. + A standalone dlt resource that retrieves web analytics events from the HubSpot API for a particular object type and list of object ids. Args: - object_type(THubspotObjectType, required): One of the hubspot object types see definition of THubspotObjectType literal - object_ids: (List[THubspotObjectType], required): List of object ids to track events + object_type (THubspotObjectType): One of the hubspot object types see definition of THubspotObjectType literal. + object_ids (List[str]): List of object ids to track events. api_key (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value. - start_date (datetime, optional): The initial date time from which start getting events, default to STARTDATE + start_date (pendulum.DateTime, optional): The initial date time from which start getting events, default to STARTDATE. Returns: - incremental dlt resource to track events for objects from the list + DltResource: Incremental dlt resource to track events for objects from the list. """ - end_date = pendulum.now().isoformat() - name = object_type + "_events" + end_date: str = pendulum.now().isoformat() + name: str = object_type + "_events" def get_web_analytics_events( occurred_at: dlt.sources.incremental[str], @@ -255,10 +505,10 @@ def get_web_analytics_events( A helper function that retrieves web analytics events for a given object type from the HubSpot API. Args: - object_type (str): The type of object for which to retrieve web analytics events. + occurred_at (dlt.sources.incremental[str]): Incremental source for event occurrence time. Yields: - dict: A dictionary representing a web analytics event. + Iterator[List[Dict[str, Any]]]: Web analytics event data. """ for object_id in object_ids: yield from fetch_data( diff --git a/sources/hubspot/helpers.py b/sources/hubspot/helpers.py index 3ab42f70e..f58fe9499 100644 --- a/sources/hubspot/helpers.py +++ b/sources/hubspot/helpers.py @@ -1,7 +1,7 @@ """Hubspot source helpers""" import urllib.parse -from typing import Any, Dict, Iterator, List, Optional +from typing import Any, Dict, Generator, Iterator, List, Optional from dlt.sources.helpers import requests @@ -31,6 +31,20 @@ def _get_headers(api_key: str) -> Dict[str, str]: return dict(authorization=f"Bearer {api_key}") +def pagination( + _data: Dict[str, Any], headers: Dict[str, Any] +) -> Optional[Dict[str, Any]]: + _next = _data.get("paging", {}).get("next", None) + # _next = False + if _next: + next_url = _next["link"] + # Get the next page response + r = requests.get(next_url, headers=headers) + return r.json() # type: ignore + else: + return None + + def extract_property_history(objects: List[Dict[str, Any]]) -> Iterator[Dict[str, Any]]: for item in objects: history = item.get("propertiesWithHistory") @@ -90,7 +104,10 @@ def fetch_property_history( def fetch_data( - endpoint: str, api_key: str, params: Optional[Dict[str, Any]] = None + endpoint: str, + api_key: str, + params: Optional[Dict[str, Any]] = None, + context: Optional[Dict[str, Any]] = None, ) -> Iterator[List[Dict[str, Any]]]: """ Fetch data from HUBSPOT endpoint using a specified API key and yield the properties of each result. @@ -99,7 +116,8 @@ def fetch_data( Args: endpoint (str): The endpoint to fetch data from, as a string. api_key (str): The API key to use for authentication, as a string. - params: Optional dict of query params to include in the request + params: Optional dict of query params to include in the request. + context (Optional[Dict[str, Any]]): Additional data which need to be added in the resulting page. Yields: A List of CRM object dicts @@ -152,18 +170,13 @@ def fetch_data( ] _obj[association] = __values + if context: + _obj.update(context) _objects.append(_obj) yield _objects # Follow pagination links if they exist - _next = _data.get("paging", {}).get("next", None) - if _next: - next_url = _next["link"] - # Get the next page response - r = requests.get(next_url, headers=headers) - _data = r.json() - else: - _data = None + _data = pagination(_data, headers) def _get_property_names(api_key: str, object_type: str) -> List[str]: @@ -186,3 +199,16 @@ def _get_property_names(api_key: str, object_type: str) -> List[str]: properties.extend([prop["name"] for prop in page]) return properties + + +def get_properties_labels( + api_key: str, object_type: str, property_name: str +) -> Iterator[Dict[str, Any]]: + endpoint = f"/crm/v3/properties/{object_type}/{property_name}" + url = get_url(endpoint) + headers = _get_headers(api_key) + r = requests.get(url, headers=headers) + _data: Optional[Dict[str, Any]] = r.json() + while _data is not None: + yield _data + _data = pagination(_data, headers) diff --git a/sources/hubspot/settings.py b/sources/hubspot/settings.py index 05fe4d9d0..09e126514 100644 --- a/sources/hubspot/settings.py +++ b/sources/hubspot/settings.py @@ -1,8 +1,7 @@ """Hubspot source settings and constants""" - from dlt.common import pendulum -STARTDATE = pendulum.datetime(year=2000, month=1, day=1) +STARTDATE = pendulum.datetime(year=2024, month=2, day=10) CRM_CONTACTS_ENDPOINT = ( "/crm/v3/objects/contacts?associations=deals,products,tickets,quotes" @@ -14,6 +13,9 @@ CRM_PRODUCTS_ENDPOINT = "/crm/v3/objects/products" CRM_TICKETS_ENDPOINT = "/crm/v3/objects/tickets" CRM_QUOTES_ENDPOINT = "/crm/v3/objects/quotes" +CRM_OWNERS_ENDPOINT = "/crm/v3/owners/" +CRM_PROPERTIES_ENDPOINT = "/crm/v3/properties/{objectType}/{property_name}" +CRM_PIPELINES_ENDPOINT = "/crm/v3/pipelines/{objectType}" CRM_OBJECT_ENDPOINTS = { "contact": CRM_CONTACTS_ENDPOINT, @@ -22,6 +24,7 @@ "product": CRM_PRODUCTS_ENDPOINT, "ticket": CRM_TICKETS_ENDPOINT, "quote": CRM_QUOTES_ENDPOINT, + "owner": CRM_OWNERS_ENDPOINT, } WEB_ANALYTICS_EVENTS_ENDPOINT = "/events/v3/events?objectType={objectType}&objectId={objectId}&occurredAfter={occurredAfter}&occurredBefore={occurredBefore}&sort=-occurredAt" @@ -36,17 +39,8 @@ } OBJECT_TYPE_PLURAL = {v: k for k, v in OBJECT_TYPE_SINGULAR.items()} +ALL_OBJECTS = OBJECT_TYPE_PLURAL.keys() -DEFAULT_DEAL_PROPS = [ - "amount", - "closedate", - "createdate", - "dealname", - "dealstage", - "hs_lastmodifieddate", - "hs_object_id", - "pipeline", -] DEFAULT_COMPANY_PROPS = [ "createdate", @@ -65,6 +59,17 @@ "lastname", ] +DEFAULT_DEAL_PROPS = [ + "amount", + "closedate", + "createdate", + "dealname", + "dealstage", + "hs_lastmodifieddate", + "hs_object_id", + "pipeline", +] + DEFAULT_TICKET_PROPS = [ "createdate", "content", @@ -96,4 +101,23 @@ "hs_title", ] -ALL = ("ALL",) +ENTITY_PROPERTIES = { + "company": DEFAULT_COMPANY_PROPS, + "contact": DEFAULT_CONTACT_PROPS, + "deal": DEFAULT_DEAL_PROPS, + "ticket": DEFAULT_TICKET_PROPS, + "product": DEFAULT_PRODUCT_PROPS, + "quote": DEFAULT_QUOTE_PROPS, +} + + +# 'ALL' represents a list of all available properties for all types +ALL = "All" + +PIPELINES_OBJECTS = ["deals", "tickets"] +SOFT_DELETE_KEY = "is_deleted" +ARCHIVED_PARAM = {"archived": True} +PREPROCESSING = {"split": ["hs_merged_object_ids"]} +STAGE_PROPERTY_PREFIX = "hs_date_entered_" +MAX_PROPS_LENGTH = 2000 +PROPERTIES_WITH_CUSTOM_LABELS = () diff --git a/sources/hubspot/utils.py b/sources/hubspot/utils.py new file mode 100644 index 000000000..ad8e81794 --- /dev/null +++ b/sources/hubspot/utils.py @@ -0,0 +1,29 @@ +from typing import Any, Dict, Iterator, List + +from .settings import PREPROCESSING + + +def split_data(doc: Dict[str, Any]) -> Dict[str, Any]: + for prop in PREPROCESSING["split"]: + if prop in doc and doc[prop] is not None: + if isinstance(doc[prop], str): + doc[prop] = doc[prop].split(";") + return doc + + +def chunk_properties(properties: List[str], max_length: int) -> Iterator[List[str]]: + """Function which yields chunk of properties list, making sure that + for each chunk, len(",".join(chunk)) =< max_length. + """ + chunk: List[str] = [] + length = 0 + for prop in properties: + prop_len = len(prop) + (1 if chunk else 0) # include comma length if not first + if length + prop_len > max_length: + yield chunk + chunk, length = [prop], len(prop) + else: + chunk.append(prop) + length += prop_len + if chunk: + yield chunk diff --git a/sources/hubspot_pipeline.py b/sources/hubspot_pipeline.py index 27d9f7352..0f2e7c5ea 100644 --- a/sources/hubspot_pipeline.py +++ b/sources/hubspot_pipeline.py @@ -1,4 +1,5 @@ from typing import List + import dlt from hubspot import hubspot, hubspot_events_for_objects, THubspotObjectType @@ -13,7 +14,7 @@ def load_crm_data() -> None: """ # Create a DLT pipeline object with the pipeline name, dataset name, and destination database type - # Add dev_mode=(True or False) if you need your pipeline to create the dataset in your destination + # Add full_refresh=(True or False) if you need your pipeline to create the dataset in your destination p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", @@ -37,7 +38,7 @@ def load_crm_data_with_history() -> None: """ # Create a DLT pipeline object with the pipeline name, dataset name, and destination database type - # Add dev_mode=(True or False) if you need your pipeline to create the dataset in your destination + # Add full_refresh=(True or False) if you need your pipeline to create the dataset in your destination p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", @@ -54,6 +55,33 @@ def load_crm_data_with_history() -> None: print(info) +def load_crm_data_with_soft_delete() -> None: + """ + Loads all HubSpot CRM resources, including soft-deleted (archived) records for each entity. + By default, only the current state of the records is loaded; property change history is not included unless explicitly enabled. + + Soft-deleted records are retrieved and marked appropriately, allowing both active and archived data to be processed. + """ + + # Create a DLT pipeline object with the pipeline name, dataset name, and destination database type. + # You can add `full_refresh=True` if the pipeline should recreate the dataset at the destination. + p = dlt.pipeline( + pipeline_name="hubspot", + dataset_name="hubspot_dataset", + destination="duckdb", + ) + + # Configure the source to load soft-deleted (archived) records. + # Property change history is disabled by default unless configured separately. + data = hubspot(soft_delete=True) + + # Run the pipeline with the HubSpot source connector. + info = p.run(data) + + # Print information about the pipeline run. + print(info) + + def load_crm_objects_with_custom_properties() -> None: """ Loads CRM objects, reading only properties defined by the user. @@ -61,30 +89,38 @@ def load_crm_objects_with_custom_properties() -> None: # Create a DLT pipeline object with the pipeline name, # dataset name, properties to read and destination database - # type Add dev_mode=(True or False) if you need your + # type Add full_refresh=(True or False) if you need your # pipeline to create the dataset in your destination - p = dlt.pipeline( + pipeline = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", destination="duckdb", ) - source = hubspot() + load_data = hubspot(properties={"contact": ("date_of_birth", "degree")}, include_custom_props=True) + load_info = pipeline.run(load_data) + print(load_info) - # By default, all the custom properties of a CRM object are extracted, - # ignoring those driven by Hubspot (prefixed with `hs_`). - # To read fields in addition to the custom ones: - # source.contacts.bind(props=["date_of_birth", "degree"]) - - # To read only two particular fields: - source.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=False) +def load_pipelines() -> None: + """ + This function loads web analytics events for a list objects in `object_ids` of type `object_type` - # Run the pipeline with the HubSpot source connector - info = p.run(source) + Returns: + None + """ - # Print information about the pipeline run - print(info) + # Create a DLT pipeline object with the pipeline name, dataset name, and destination database type + p = dlt.pipeline( + pipeline_name="hubspot", + dataset_name="hubspot_dataset", + destination="duckdb", + dev_mode=False, + ) + # To load data from pipelines in "deals" endpoint + load_data = hubspot().with_resources("pipelines_deals", "stages_timing_deals") + load_info = p.run(load_data) + print(load_info) def load_web_analytics_events( @@ -115,8 +151,9 @@ def load_web_analytics_events( if __name__ == "__main__": - # Call the functions to load HubSpot data into the database with and without company events enabled load_crm_data() load_crm_data_with_history() - load_web_analytics_events("company", ["7086461639", "7086464459"]) load_crm_objects_with_custom_properties() + load_pipelines() + load_crm_data_with_soft_delete() + load_web_analytics_events("company", ["7086461639", "7086464459"]) diff --git a/tests/hubspot/test_hubspot_source.py b/tests/hubspot/test_hubspot_source.py index a0415e01b..4913793e6 100644 --- a/tests/hubspot/test_hubspot_source.py +++ b/tests/hubspot/test_hubspot_source.py @@ -2,7 +2,6 @@ import dlt import pytest -from itertools import chain from typing import Any from urllib.parse import urljoin @@ -20,6 +19,7 @@ CRM_TICKETS_ENDPOINT, CRM_QUOTES_ENDPOINT, ) +from sources.hubspot.utils import chunk_properties from tests.hubspot.mock_data import ( mock_contacts_data, mock_companies_data, @@ -143,11 +143,13 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] api_key="fake_key", include_history=True, ) - load_info = pipeline.run(source.with_resources("contacts")) + load_info = pipeline.run( + source.with_resources("contacts", "contacts_property_history") + ) assert_load_info(load_info) - assert m.call_count == 3 + assert m.call_count == 4 # Check that API is called with all properties listed m.assert_has_calls( @@ -165,6 +167,11 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] "limit": 100, }, ), + call( + urljoin(BASE_URL, "/crm/v3/properties/contacts"), + headers=ANY, + params=None, + ), call( urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), headers=ANY, @@ -184,9 +191,11 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) def test_too_many_properties(destination_name: str) -> None: with pytest.raises(ResourceExtractionError): - source = hubspot(api_key="fake_key", include_history=True) - source.contacts.bind(props=["property"] * 500) - list(source.with_resources("contacts")) + with patch( + "sources.hubspot.ENTITY_PROPERTIES", {"contact": ["property"] * 500} + ): + source = hubspot(api_key="fake_key", include_history=True) + list(source.with_resources("contacts")) @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) @@ -205,26 +214,25 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] dataset_name="hubspot_data", dev_mode=True, ) - source = hubspot(api_key="fake_key") - source.contacts.bind(props=props, include_custom_props=False) - - with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: - load_info = pipeline.run(source.with_resources("contacts")) - - assert_load_info(load_info) - - m.assert_has_calls( - [ - call( - urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), - headers=ANY, - params={ - "properties": expected_props, - "limit": 100, - }, - ), - ] - ) + source = hubspot(api_key="fake_key", include_custom_props=False) + with patch("sources.hubspot.ENTITY_PROPERTIES", {"contact": props}): + with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: + load_info = pipeline.run(source.with_resources("contacts")) + + assert_load_info(load_info) + + m.assert_has_calls( + [ + call( + urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), + headers=ANY, + params={ + "properties": expected_props, + "limit": 100, + }, + ), + ] + ) @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) @@ -242,8 +250,7 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] dataset_name="hubspot_data", dev_mode=True, ) - source = hubspot(api_key="fake_key") - source.contacts.bind(include_custom_props=False) + source = hubspot(api_key="fake_key", include_custom_props=False) with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: load_info = pipeline.run(source.with_resources("contacts")) @@ -281,30 +288,29 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] dev_mode=True, ) source = hubspot(api_key="fake_key") - source.contacts.bind(props=props) - - with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: - load_info = pipeline.run(source.with_resources("contacts")) - - assert_load_info(load_info) - - m.assert_has_calls( - [ - call( - urljoin(BASE_URL, "/crm/v3/properties/contacts"), - headers=ANY, - params=None, - ), - call( - urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), - headers=ANY, - params={ - "properties": expected_props, - "limit": 100, - }, - ), - ] - ) + with patch("sources.hubspot.ENTITY_PROPERTIES", {"contact": props}): + with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: + load_info = pipeline.run(source.with_resources("contacts")) + + assert_load_info(load_info) + + m.assert_has_calls( + [ + call( + urljoin(BASE_URL, "/crm/v3/properties/contacts"), + headers=ANY, + params=None, + ), + call( + urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), + headers=ANY, + params={ + "properties": expected_props, + "limit": 100, + }, + ), + ] + ) @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) @@ -339,7 +345,7 @@ def test_all_resources(destination_name: str) -> None: dataset_name="hubspot_data", dev_mode=True, ) - load_info = pipeline.run(hubspot(include_history=True)) + load_info = pipeline.run(hubspot(include_history=True).with_resources("contacts", "deals", "companies", "contacts_property_history")) assert_load_info(load_info) table_names = [ @@ -352,7 +358,7 @@ def test_all_resources(destination_name: str) -> None: assert ( load_table_counts(pipeline, *table_names) == load_table_distinct_counts(pipeline, "hs_object_id", *table_names) - == {"companies": 200, "deals": 500, "contacts": 402} + == {'companies': 4, 'contacts': 3, 'deals': 2} ) history_table_names = [ @@ -363,11 +369,9 @@ def test_all_resources(destination_name: str) -> None: table_counts = load_table_counts(pipeline, *history_table_names) # Check history tables # NOTE: this value is increasing... maybe we should start testing ranges - assert table_counts["companies_property_history"] >= 4018 - assert table_counts["contacts_property_history"] >= 5935 - assert table_counts["deals_property_history"] >= 5162 + assert table_counts["contacts_property_history"] >= 76 - # Check property from couple of contacts against known data + # Check property from a couple of contacts against known data with pipeline.sql_client() as client: rows = [ list(row) @@ -391,14 +395,20 @@ def test_all_resources(destination_name: str) -> None: "email", "emailmaria@hubspot.com", "API", - pendulum.parse("2022-06-15 08:51:51.399"), + pendulum.parse("2023-06-28 13:55:47.558"), + ), + ( + "email", + "thisisnewemail@hubspot.com", + "CRM_UI", + pendulum.parse("2023-07-01 23:34:57.837"), ), ( "email", "bh@hubspot.com", "API", - pendulum.parse("2022-06-15 08:51:51.399"), - ), + pendulum.parse("2023-06-28 13:55:47.558"), + ) ] ) @@ -416,3 +426,23 @@ def test_event_resources(destination_name: str) -> None: ) print(load_info) assert_load_info(load_info) + + +def test_chunk_properties(): + properties = ["prop1", "prop2", "prop3", "prop4"] + max_length = 12 + expected = [["prop1", "prop2"], ["prop3", "prop4"]] + result = list(chunk_properties(properties, max_length)) + assert result == expected + + properties = ["prop1", "prop2"] + max_length = len("prop1,prop2") + expected = [["prop1", "prop2"]] + result = list(chunk_properties(properties, max_length)) + assert result == expected + + properties = ["p1", "p2", "p3", "p4", "p5"] + max_length = 5 # Should accommodate 'p1,p2' + expected = [["p1", "p2"], ["p3", "p4"], ["p5"]] + result = list(chunk_properties(properties, max_length)) + assert result == expected