Skip to content

Commit

Permalink
INIT of repo dauGA
Browse files Browse the repository at this point in the history
  • Loading branch information
Microsheep committed Jul 13, 2016
0 parents commit 713b469
Show file tree
Hide file tree
Showing 5 changed files with 450 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
config.py
**.swp
55 changes: 55 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# 把 GA 的 Log Raw Data 抓出來再上傳到 BigQuery

## 運行方式
1. Copy sample_config.py to config.py
2. 修改 config.py 中的 PROJECT_ID, DATA_SET, VIEW_ID, DATE_INIT
3. 執行 DauGA.py (python2), 須提供credential的path, 此credential需要有google analytics跟google big query的權限


## 重要資源
Google提供的[Demo網站](https://ga-dev-tools.appspot.com/dimensions-metrics-explorer/),這個真的很重要,但是有很難找,應該會有機會用到。


## 功能簡介
這個 Code 是根據 Blog 上面一篇[文章](http://daynebatten.com/2015/07/raw-data-google-analytics/)寫成的。
文章的簡述了如何把 GA 裡面的 Raw Data Dump 出來,然後存進 BigQuery 裡面(這個功能如果由Google官方的服務來進行,一個月至少要付500美金,所以讓我們懷著感恩的心地使用這項功能吧。)

能夠做到這個功能的關鍵是,你跟 GA 藉由 [Report API](https://developers.google.com/admin-sdk/reports/v1/reference/) 所撈取出來的資料,每筆資料欄位都只對應到<b>一筆資料</b> -- 理論上,Report API希望你撈取的資料是經過統計計算的結果,舉例:每個縣市的不同類型使用者(新使用者、重複使用者),在某方面(頁面停留時間or點擊次數or...)的統計結果,前者叫做 Dimension,後者叫做 Metrics,意思就是說,Report API report 給你不同 Dimension 下的 Metrics 統計結果。有關於不同的 Dimension 和 Metrics 介紹請看[這裡](https://developers.google.com/analytics/devguides/reporting/core/dimsmets)

但是 Blog 裡面提供了一個厲害而簡單作法,GA 本身就有提供把自訂的 Customized Dimension 傳給 GA。如果我們傳給 GA 的 Customized Dimension 是完全不會重複的(ex. unique_user__id and timestamp),我們跟 Report API request 資料的時候,它就會都把每一筆分開來,然後我們就可以拿到 GA log 的 Raw Data 了!!!

目前 Report API 提供 50000 次 request,每次上線 10000 筆,所以除非網站已經大到一天要 Log 5億筆資料,不然這個功能可以完全滿足我們的需求 --> 順利把 GA log 的 raw data request 出來然後上傳到 BigQuery 裡面,方便我們做各樣的分析。更棒的是,你可以順便把一些 GA 提供的很棒的使用者資料 (ex. 地區、使用系統、機型、瀏覽器)一併帶出來,供後續使用。


## 事前預備和程式執行方式
我們的目的是儘量自動化把資料重 GA dump 出來,然後自動化上傳到 BigQuery 形成我們需要的 Table,按時更新資料(把新的資料 append 到 BigQuery 上面),讓我們以後只要負責開心在 BigQuery 裡面做資料分析就好了。


- 在執行 dauGA.py 之前需要準備好的注意事項...
- 先請上 [Google Cloud Platform](https://console.cloud.google.com/iam-admin/serviceaccounts/project) 申請你專屬的 Service Account,然後把檔案路徑 update 到 Code 的 CREDENTIAL_PATH 裡面。
- 安裝特殊的 pandas,因為目前的 pandas 不支援把 table append 不是它自己創造的 BigQuery table ([相關issue](https://github.com/pydata/pandas/issues/13086)),但是我們需要這項功能,因此先 Fork 一個 Pandas 出來,目前 issue 上表示會在下一版 (0.18.2) 修正這個問題。

```
pip install git+https://github.com/junyiacademy/pandas.git@tmp_solution_to_append_existed_bq_table
```

- 視需要修改config.py的參數
- 裡面最重要的是 DATA_SET ,如果你是在 local 端執行,請務必改掉 DATA_SET 的參數,避免污染或破壞 BigQeury 上面我們真正在使用的 Table
- 如果你要新增屬於你自己的 table,請把你要新增的 table 加在 ga\_bq\_config 這個變數裡面,它決定了你要從 GA report api dump 哪些東西下來,最終上傳哪些資料。關於 report\_request 的寫法,可以參考[這裡](https://developers.google.com/analytics/devguides/reporting/core/v4/rest/v4/reports/batchGet)。另外還要指定 "destination\_table" (你想把檔案存取到 BigQuery 的哪個 Table)
- 機制概述:程式每次跑的時候,都會去到 {{DATA\_SET}}.upload\_progress\_log 的 Table 裡面,逐一去 check 從 2016-6-4(DATE_INIT) 到跑程式的前一天,每天每個要 update 的表格,是否有 "success" 的記錄,如果沒有,則把對應 table 裡面這一天的資料刪除(確定之後再次 load 資料的時候不會發生重複),然後在把那一天的資料重新 dump 一次。以此來確定系統和資料的穩定性。
- 不論每次程式 Dump 資料的成功與否,都會再 Update 到 {{DATA\_SET}}.upload\_progress\_log,供之後程式 Dump 資料時做參考,或供工程師 Review。

## 測試方式
#### 整個功能最主要做的事情有 1. 把資料從 GA 抓出來 2. 把資料做一些調整 3. 把資料上傳到 BigQuery 4. 確保 Robustness

1. 可以在 function request\_df\_from\_ga 裡面去 check 你從 GA 抓下來的資料是否符合你在 ga\_bq\_config 裡面的設定
2. 把資料做調整的過程寫在 function columns\_conversion 裡面,你可以讀取 function 回傳的 dataframe 確保要準備上傳的 table 符合你的需要。
3. 最後在 table 成功上傳後,check BigQuery 上面相對應的 table 和你 local 端的 dataframe 是一致的。
4. Robustness 是最需要小心測試的部份,因為 Code 是預期每天自動跑的,又牽扯到很多 外部的API,發生錯誤是預期當中的事情,我們不會希望每次出問題所有的資料都毀了需要重新來過,最理想的狀況是,哪一次程式 fail 了,之後重跑的時候可以直接修正掉之前的錯誤。程式實作的方法寫在前面的<b>機制概述</b>裡面。
5. Robustness 測試的方法,是最重要的,目前的作法是在程式跑到不同階段 Raise Error,然後看 a. 會不會有殘缺的資料 upload 到 BigQuery?如果沒有,下次跑程式的時候這一天這個 table 的資料是不是還是會補上;如果有殘缺的資料上到 BigQuery,下次跑程式的時候是不是會先把殘缺的資料刪除掉。
6. Check {{DATA\_SET}}.upload\_progress\_log 裡面每個 table 每天成功資料所記錄的 uploaded_data_size 和相對應的 table ga_session_date 的 count 是否一樣,來確認有沒有重複或缺少 load 資料的情況。

## Future Work

1. 目前上傳的 BrowserUTC time 本身是含有 milli-second 的資料,但是因為 pd.to_gbq function 似乎只會上傳到 second 而已,如果真的需要更高的 resolution,之後可以有兩個作法 a. 修改 Pandas, b. 改成上傳 string,之後再用 query 的方式處理。
2. ga_session_time & backup_date 其實都不是 UTC time,但是資料欄位裡面還是顯示是 utc ,但是目前看起來也不好調,很可能要動到 pandas,之後可以找機會來處理。
231 changes: 231 additions & 0 deletions dauGA.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
import argparse
import datetime
'''
pandas package now is installed from git+https://github.com/junyiacademy/pandas.git@tmp_solution_to_append_existed_bq_table
It is a fork from pandas for us to be able to append to existed bq table.
please use the following command to install pandas:
pip install git+https://github.com/junyiacademy/pandas.git@tmp_solution_to_append_existed_bq_table
'''
import pandas as pd
# A self written library to handle google authorization
from google_auth import google_auth
# Import our config file
import config

def bq_query_to_table(query, table): # query from bq then save into a bq table
dataset = table.split('.')[0]
table = table.split('.')[1]
job = bigquery.jobs().insert(projectId=config.PROJECT_ID,
body={"projectId": config.PROJECT_ID,
"configuration":{
"query": {
"query": query,
"destinationTable": {
"projectId": config.PROJECT_ID,
"datasetId": dataset,
"tableId": table
},
"writeDisposition":"WRITE_TRUNCATE",
"createDisposition":"CREATE_IF_NEEDED"
}
}
}).execute()
return job['id']


def check_table_exist(table):
dataset = table.split('.')[0]
table = table.split('.')[1]
result = bigquery.tables().list(projectId=config.PROJECT_ID, datasetId=dataset).execute()
if not 'tables' in result:
return False
table_list = [i['tableReference']['tableId'] for i in result['tables']]
return table in table_list


def check_ga_session_date_exist(destination_table, date, credential_path): # check if destination table has data of certain date
if not check_table_exist(destination_table): # has no certain date if the table not exist
return False
query = 'SELECT count(*) FROM [%s] WHERE DATE(ga_session_date) == "%s"' % (destination_table, date.strftime("%Y-%m-%d"))
return (pd.read_gbq(query, project_id=config.PROJECT_ID, verbose=False, private_key=credential_path).iloc[0, 0] > 0)


def remove_certain_ga_session_date_data(destination_table, date): # remove data of certain date
query = 'SELECT * FROM [%s] WHERE DATE(ga_session_date) != "%s"' % (destination_table, date.strftime("%Y-%m-%d"))
return bq_query_to_table(query, destination_table)


def parse_result_to_df(result): # convert ga request response to df
columns_list = []
columns_list.extend(result['reports'][0]['columnHeader']['dimensions'])
columns_list.extend([i['name'] for i in result['reports'][0]['columnHeader']['metricHeader']['metricHeaderEntries']])

row_num = len(result['reports'][0]['data']['rows'])
df = pd.DataFrame(columns = columns_list, index=range(row_num))
for i, row in enumerate(result['reports'][0]['data']['rows']):
list_to_append = []
list_to_append.extend(row['dimensions'])
list_to_append.extend(row['metrics'][0]['values'])
for j in range(len(list_to_append)):
df.iat[i, j] = list_to_append[j] # df.append(my_dict, ignore_index=True)
return df


def unix_time_millis(dt):
epoch = datetime.datetime.utcfromtimestamp(0)
return (dt - epoch).total_seconds() * 1000.0


def convert_js_date_format(date_str): # change the js datetime format to python datetime format
if date_str.isdigit():
return date_str
date_str = date_str.replace(' GMT', '').replace(' UTC', '')
if date_str.count(":") == 3:
try:
return unix_time_millis(datetime.datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S:%f"))
except:
# print "date_str: %s cannot be converted" % date_str
return date_str
elif date_str.count(":") == 2:
try:
return unix_time_millis(datetime.datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S"))
except:
# print "date_str: %s cannot be converted" % date_str
return date_str
else:
return date_str


def columns_conversion(df, date): # change the df we request from ga to the one we upload to bq
columns = [c.replace(':', '_') for c in df.columns]
for i, c in enumerate(columns):
if c == "ga_dimension8":
columns[i] = "user_key_name"
elif c == "ga_dimension9":
columns[i] = "browser_utc_time"
df.iloc[:, i] = df.iloc[:, i].apply(convert_js_date_format).astype(str)
elif c == "ga_dimension10":
columns[i] = "cookie_uuid"
elif c == "ga_timeOnPage" or c == "ga_pageviews" or c == "ga_hits":
df.iloc[:, i] = df.iloc[:, i].apply(lambda x: int(float(x)))
elif c == "ga_exits":
df.iloc[:, i] = df.iloc[:, i].astype(bool)
df.columns = columns
if 'ga_dateHour' in df.columns and 'ga_minute' in df.columns:
df.loc[:, 'ga_session_time'] = pd.to_datetime((df.loc[:, 'ga_dateHour'] + df.loc[:, 'ga_minute']), format="%Y%m%d%H%M")
df.drop(['ga_dateHour', 'ga_minute'], inplace=True, axis=1)
df['ga_session_date'] = pd.to_datetime(date) # we always add ga session date to data
return df


def request_df_from_ga(request, page_token=""):
request["reportRequests"][0]["pageToken"] = page_token
result = analytics.reports().batchGet(body=request).execute()
if 'rows' not in result['reports'][0]['data']: # get no data from GA
print 'reqeust from Ga get no data. Row number is 0'
return (0, -1)
df = parse_result_to_df(result)
if 'nextPageToken' in result['reports'][0]:
return (df, result['reports'][0]['nextPageToken'])
else:
return (df, -1)


def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days)):
yield start_date + datetime.timedelta(n)


def ga_upload_to_bq_by_day(ga_to_bg_config_name, date, credential_path):
if not isinstance(date, datetime.date):
print 'force the date parameter as datetiem.date format'
return None

request_body = config.ga_bq_config[ga_to_bg_config_name]["request_body"]
destination_table = config.ga_bq_config[ga_to_bg_config_name]["destination_table"]

if len(request_body["reportRequests"]) > 1:
print 'only allowed one reportRequests at this time'
return None

request_body["reportRequests"][0]['dateRanges'] = [{"startDate": date.strftime("%Y-%m-%d"), "endDate": date.strftime("%Y-%m-%d")}]

cont_page_token = ''
total_row = 0
finish_flag = False
retry_limit_flag = False
cont_page_token = ''
retry_count = 0
print 'Start loading data from GA and upload to %s, from %s' % (destination_table, date)
for i in range(1000): # GA report API request limit: 1000 --> set limit to 50,000,000 row per day
try:
(df, cont_page_token) = request_df_from_ga(request_body, cont_page_token)
df = columns_conversion(df, date)
df.to_gbq(destination_table=destination_table, project_id=config.PROJECT_ID, if_exists='append', private_key=credential_path)
# df.to_csv("%s-%s-data" % (ga_to_bg_config_name, date))
row_num = len(df.index)
total_row = total_row + row_num
if cont_page_token == -1:
finish_flag = True

except Exception as e:
print "Failing download response from Ga or upload to %s" % destination_table
print str(e)
retry_count += 1
print "already tried %s times" % retry_count
if retry_count == 10:
retry_limit_flag = True

if finish_flag:
print 'Successfully download response from Ga and upload to %s' % destination_table
return {"status": "success", "data_size": total_row}
elif retry_limit_flag:
print "Reach retry limit, Script Closed"
return {"status": "failure", "data_size": total_row}
print "Download GA data exceed row limit!!! Need to increase the GA report API request limit"
return {"status": "failure", "data_size": total_row}


if __name__ == "__main__":
# Parse the argument to get the credential_path
parser = argparse.ArgumentParser(description='Input secre_json_path and corresponding dataset')
parser.add_argument('--credential_path', type=str, dest='credential_path', required=True, help='input the path of service account credential from gcp, use $gcp_service_account in jenkings')
args = vars(parser.parse_args())
credential_path = args["credential_path"]
# Use google_auth library to get access to google
Auth = google_auth(credential_path)
bigquery = Auth.get_auth('bigquery_v2')
analytics = Auth.get_auth('analytics_v4')
# Check if the GA_BQ_UPLOAD_STATUS_LOG table exist in gbq
if check_table_exist(config.GA_BQ_UPLOAD_STATUS_LOG):
ga_bq_upload_status_log = pd.read_gbq(query="SELECT * FROM [%s]" % config.GA_BQ_UPLOAD_STATUS_LOG, project_id=config.PROJECT_ID, private_key=credential_path)
else:
ga_bq_upload_status_log = pd.DataFrame(columns=['config_name', 'ga_session_date', 'status', 'backup_date', "uploaded_data_size"])
# Set the time region
d = config.DATE_INIT.split("-")
date_init = datetime.date(int(d[0]),int(d[1]),int(d[2]))
date_now = datetime.datetime.now().date()

for config_name in config.ga_bq_config:
for date in daterange(date_init, date_now):
destination_table = config.ga_bq_config[config_name]["destination_table"]
print "start checking (%s, %s) pair for GA to BQ" % (config_name, date)
condition = (ga_bq_upload_status_log["config_name"]==config_name) & (ga_bq_upload_status_log["ga_session_date"]==date.strftime("%Y-%m-%d"))
if ga_bq_upload_status_log[condition].empty: # no such condition, totally new table-date pair
print 'find no pair within the record, try to upload data with (%s, %s)' % (config_name, date)
if check_ga_session_date_exist(destination_table, date, credential_path):
print 'find corresponding data in bq table, remove them.'
remove_certain_ga_session_date_data(destination_table, date)
upload_result = ga_upload_to_bq_by_day(config_name, date, credential_path)
current_result = pd.DataFrame(data={"config_name": config_name, "ga_session_date": date.strftime("%Y-%m-%d"), "status": upload_result['status'], "backup_date": date_now.strftime("%Y-%m-%d"), "uploaded_data_size": upload_result['data_size']}, index=[0])
print "update corresponding result of (%s, %s) to %s" % (config_name, date, config.GA_BQ_UPLOAD_STATUS_LOG)
current_result.to_gbq(destination_table=config.GA_BQ_UPLOAD_STATUS_LOG, project_id=config.PROJECT_ID, if_exists='append', private_key=credential_path)
elif 'success' in ga_bq_upload_status_log[condition]['status'].values:
print "already success in such pair"
else: # if failure, remove the data of that date/table and re-upload again
print 'find pair with failure status, remove existed data and re-uploard'
remove_certain_ga_session_date_data(destination_table, date)
upload_result = ga_upload_to_bq_by_day(config_name, date, credential_path)
current_result = pd.DataFrame(data={"config_name": config_name, "ga_session_date": date.strftime("%Y-%m-%d"), "status": upload_result['status'], "backup_date": date_now.strftime("%Y-%m-%d"), "uploaded_data_size": upload_result['data_size']}, index=[0])
print "update corresponding result of (%s, %s) to %s" % (config_name, date, config.GA_BQ_UPLOAD_STATUS_LOG)
current_result.to_gbq(destination_table=config.GA_BQ_UPLOAD_STATUS_LOG, project_id=config.PROJECT_ID, if_exists='append', private_key=credential_path)
Loading

0 comments on commit 713b469

Please sign in to comment.