diff --git a/tap_google_analytics/__init__.py b/tap_google_analytics/__init__.py index 3775de3..aba89d1 100644 --- a/tap_google_analytics/__init__.py +++ b/tap_google_analytics/__init__.py @@ -2,6 +2,7 @@ import datetime import json import sys +from os import getenv from pathlib import Path @@ -21,7 +22,7 @@ ] LOGGER = singer.get_logger() - +LOGGER.setLevel(getenv("LOGGER_LEVEL", "INFO")) def discover(config): # Load the reports json file @@ -78,7 +79,8 @@ def sync(config, state, catalog): # Loop over streams in catalog for stream in catalog['streams']: view_id = stream['view_id'] - client = GAClient(view_id, config, state) + name = stream['stream'] + client = GAClient(name, view_id, config, state) stream_id = stream['tap_stream_id'] stream_schema = stream['schema'] @@ -101,7 +103,7 @@ def sync(config, state, catalog): singer.write_schema(stream_id, stream_schema, key_properties) singer.write_records(stream_id, results) state = singer.write_bookmark( - state, view_id, 'end_date', client.end_date) + state, name, 'end_date', client.end_date) singer.write_state(state) except TapGaInvalidArgumentError as e: errors_encountered = True @@ -177,6 +179,9 @@ def process_args(): if 'end_date' in args.config and not args.config.get('end_date'): del args.config['end_date'] + + if 'sliced' in args.config and not args.config.get('sliced'): + del args.config['sliced'] # Process the [start_date, end_date) so that they define an open date # window that ends yesterday if end_date is not defined @@ -184,7 +189,7 @@ def process_args(): args.config['start_date'] = utils.strftime(start_date, '%Y-%m-%d') end_date = args.config.get('end_date', utils.strftime(utils.now())) - end_date = utils.strptime_to_utc(end_date) - datetime.timedelta(days=1) + end_date = utils.strptime_to_utc(end_date) args.config['end_date'] = utils.strftime(end_date, '%Y-%m-%d') if end_date < start_date: diff --git a/tap_google_analytics/ga_client.py b/tap_google_analytics/ga_client.py index 457f7cc..63ef4de 100644 --- a/tap_google_analytics/ga_client.py +++ b/tap_google_analytics/ga_client.py @@ -1,5 +1,6 @@ from datetime import timedelta, datetime import sys +from os import getenv import backoff import logging import json @@ -29,7 +30,7 @@ # Silence the discovery_cache errors logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR) LOGGER = singer.get_logger() - +LOGGER.setLevel(getenv("LOGGER_LEVEL", "INFO")) def error_reason(e): # For a given HttpError object from the googleapiclient package, this @@ -71,21 +72,23 @@ def is_fatal_error(error): class GAClient: - def __init__(self, view_id, config, state): + def __init__(self, name, view_id, config, state): yesterday = (datetime.today() - timedelta(days=1) ).strftime(DATE_PATTERN) state_date = state.get('bookmarks', - {}).get(view_id, {}).get('end_date') + {}).get(name, {}).get('end_date') if state_date is not None: state_date = (datetime.strptime(state_date, DATE_PATTERN) + timedelta(days=1)).strftime(DATE_PATTERN) + self.name = name self.view_id = view_id self.start_date = state_date or config['start_date'] self.end_date = config['end_date'] or yesterday self.quota_user = config.get('quota_user', None) + self.is_sliced = config.get('sliced', False) self.credentials = self.initialize_credentials(config) self.analytics = self.initialize_analyticsreporting() @@ -93,8 +96,10 @@ def __init__(self, view_id, config, state): (self.dimensions_ref, self.metrics_ref) = self.fetch_metadata() LOGGER.debug( - f"GAClient time interval from {self.start_date} to \ + f"GAClient view_id: {view_id}, time interval from {self.start_date} to \ {self.end_date}") + LOGGER.debug(f'TEMP DEBUG GA_CLIENTE: view_id:{view_id}, type(view_id): {type(view_id)}, state_date: {state_date}, state: {state}') + def initialize_credentials(self, config): if config.get('oauth_credentials', {}).get('access_token', None): @@ -216,6 +221,13 @@ def lookup_data_type(self, type, attribute): return data_type + def get_dates(self): + date = datetime.strptime(self.start_date, "%Y-%m-%d") + while date < datetime.strptime(self.end_date, "%Y-%m-%d"): + yield date.strftime("%Y-%m-%d") + date += timedelta(days=1) + + def process_stream(self, stream): """ Retrives data from Google Analytics @@ -241,9 +253,15 @@ def process_stream(self, stream): nextPageToken = None while True: - response = self.query_api(report_definition, nextPageToken) - (nextPageToken, results) = self.process_response(response) - records.extend(results) + if self.is_sliced: + for d in self.get_dates(): + response = self.query_api(report_definition, nextPageToken, d) + (nextPageToken, results) = self.process_response(response) + records.extend(results) + else: + response = self.query_api(report_definition, nextPageToken) + (nextPageToken, results) = self.process_response(response) + records.extend(results) # Keep on looping as long as we have a nextPageToken if nextPageToken is None: @@ -282,19 +300,21 @@ def generate_report_definition(self, stream): (HttpError, socket.timeout), max_tries=9, giveup=is_fatal_error) - def query_api(self, report_definition, pageToken=None): + def query_api(self, report_definition, pageToken=None, date=None): """Queries the Analytics Reporting API V4. Returns: The Analytics Reporting API V4 response. """ + start_date = date or self.start_date + end_date = date or self.end_date return self.analytics.reports().batchGet( body={ 'reportRequests': [ { 'viewId': self.view_id, - 'dateRanges': [{'startDate': self.start_date, - 'endDate': self.end_date}], + 'dateRanges': [{'startDate': start_date, + 'endDate': end_date}], 'pageSize': '1000', 'pageToken': pageToken, 'metrics': report_definition['metrics'], diff --git a/tap_google_analytics/reports_helper.py b/tap_google_analytics/reports_helper.py index f7cbf65..8b4c666 100644 --- a/tap_google_analytics/reports_helper.py +++ b/tap_google_analytics/reports_helper.py @@ -12,7 +12,7 @@ def __init__(self, config, reports_definition): # Fetch the valid (dimension, metric) # names and their types from GAClient - self.client = GAClient(config.get("view_id"), config, dict()) + self.client = GAClient(None, config.get("view_id"), config, dict()) def generate_catalog(self): """