Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions tap_google_analytics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import datetime
import json
import sys
from os import getenv

from pathlib import Path

Expand All @@ -21,7 +22,7 @@
]

LOGGER = singer.get_logger()

LOGGER.setLevel(getenv("LOGGER_LEVEL", "INFO"))

def discover(config):
# Load the reports json file
Expand Down Expand Up @@ -78,7 +79,8 @@ def sync(config, state, catalog):
# Loop over streams in catalog
for stream in catalog['streams']:
view_id = stream['view_id']
client = GAClient(view_id, config, state)
name = stream['stream']
client = GAClient(name, view_id, config, state)

stream_id = stream['tap_stream_id']
stream_schema = stream['schema']
Expand All @@ -101,7 +103,7 @@ def sync(config, state, catalog):
singer.write_schema(stream_id, stream_schema, key_properties)
singer.write_records(stream_id, results)
state = singer.write_bookmark(
state, view_id, 'end_date', client.end_date)
state, name, 'end_date', client.end_date)
singer.write_state(state)
except TapGaInvalidArgumentError as e:
errors_encountered = True
Expand Down Expand Up @@ -177,14 +179,17 @@ def process_args():

if 'end_date' in args.config and not args.config.get('end_date'):
del args.config['end_date']

if 'sliced' in args.config and not args.config.get('sliced'):
del args.config['sliced']

# Process the [start_date, end_date) so that they define an open date
# window that ends yesterday if end_date is not defined
start_date = utils.strptime_to_utc(args.config['start_date'])
args.config['start_date'] = utils.strftime(start_date, '%Y-%m-%d')

end_date = args.config.get('end_date', utils.strftime(utils.now()))
end_date = utils.strptime_to_utc(end_date) - datetime.timedelta(days=1)
end_date = utils.strptime_to_utc(end_date)
args.config['end_date'] = utils.strftime(end_date, '%Y-%m-%d')

if end_date < start_date:
Expand Down
40 changes: 30 additions & 10 deletions tap_google_analytics/ga_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import timedelta, datetime
import sys
from os import getenv
import backoff
import logging
import json
Expand Down Expand Up @@ -29,7 +30,7 @@
# Silence the discovery_cache errors
logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR)
LOGGER = singer.get_logger()

LOGGER.setLevel(getenv("LOGGER_LEVEL", "INFO"))

def error_reason(e):
# For a given HttpError object from the googleapiclient package, this
Expand Down Expand Up @@ -71,30 +72,34 @@ def is_fatal_error(error):


class GAClient:
def __init__(self, view_id, config, state):
def __init__(self, name, view_id, config, state):
yesterday = (datetime.today() - timedelta(days=1)
).strftime(DATE_PATTERN)

state_date = state.get('bookmarks',
{}).get(view_id, {}).get('end_date')
{}).get(name, {}).get('end_date')

if state_date is not None:
state_date = (datetime.strptime(state_date, DATE_PATTERN)
+ timedelta(days=1)).strftime(DATE_PATTERN)

self.name = name
self.view_id = view_id
self.start_date = state_date or config['start_date']
self.end_date = config['end_date'] or yesterday
self.quota_user = config.get('quota_user', None)
self.is_sliced = config.get('sliced', False)

self.credentials = self.initialize_credentials(config)
self.analytics = self.initialize_analyticsreporting()

(self.dimensions_ref, self.metrics_ref) = self.fetch_metadata()

LOGGER.debug(
f"GAClient time interval from {self.start_date} to \
f"GAClient view_id: {view_id}, time interval from {self.start_date} to \
{self.end_date}")
LOGGER.debug(f'TEMP DEBUG GA_CLIENTE: view_id:{view_id}, type(view_id): {type(view_id)}, state_date: {state_date}, state: {state}')


def initialize_credentials(self, config):
if config.get('oauth_credentials', {}).get('access_token', None):
Expand Down Expand Up @@ -216,6 +221,13 @@ def lookup_data_type(self, type, attribute):

return data_type

def get_dates(self):
date = datetime.strptime(self.start_date, "%Y-%m-%d")
while date < datetime.strptime(self.end_date, "%Y-%m-%d"):
yield date.strftime("%Y-%m-%d")
date += timedelta(days=1)


def process_stream(self, stream):
"""
Retrives data from Google Analytics
Expand All @@ -241,9 +253,15 @@ def process_stream(self, stream):
nextPageToken = None

while True:
response = self.query_api(report_definition, nextPageToken)
(nextPageToken, results) = self.process_response(response)
records.extend(results)
if self.is_sliced:
for d in self.get_dates():
response = self.query_api(report_definition, nextPageToken, d)
(nextPageToken, results) = self.process_response(response)
records.extend(results)
else:
response = self.query_api(report_definition, nextPageToken)
(nextPageToken, results) = self.process_response(response)
records.extend(results)

# Keep on looping as long as we have a nextPageToken
if nextPageToken is None:
Expand Down Expand Up @@ -282,19 +300,21 @@ def generate_report_definition(self, stream):
(HttpError, socket.timeout),
max_tries=9,
giveup=is_fatal_error)
def query_api(self, report_definition, pageToken=None):
def query_api(self, report_definition, pageToken=None, date=None):
"""Queries the Analytics Reporting API V4.

Returns:
The Analytics Reporting API V4 response.
"""
start_date = date or self.start_date
end_date = date or self.end_date
return self.analytics.reports().batchGet(
body={
'reportRequests': [
{
'viewId': self.view_id,
'dateRanges': [{'startDate': self.start_date,
'endDate': self.end_date}],
'dateRanges': [{'startDate': start_date,
'endDate': end_date}],
'pageSize': '1000',
'pageToken': pageToken,
'metrics': report_definition['metrics'],
Expand Down
2 changes: 1 addition & 1 deletion tap_google_analytics/reports_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self, config, reports_definition):

# Fetch the valid (dimension, metric)
# names and their types from GAClient
self.client = GAClient(config.get("view_id"), config, dict())
self.client = GAClient(None, config.get("view_id"), config, dict())

def generate_catalog(self):
"""
Expand Down