Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for Google Cloud Storage #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# Plugin - Intercom to S3
# Plugin - Intercom to S3/GCS

This plugin moves data from the [Intercom](https://developers.intercom.com/v2.0/docs) API to S3 based on the specified object
This plugin moves data from the [Intercom](https://developers.intercom.com/v2.0/docs) API to S3/GCS based on the specified object

## Hooks
### IntercomHook
This hook handles the authentication and request to Intercom. Based on [python-intercom](https://github.com/jkeyes/python-intercom) module.

### S3Hook
[Core Airflow S3Hook](https://pythonhosted.org/airflow/_modules/S3_hook.html) with the standard boto dependency.

## Operators
### IntercomToS3Operator
This operator composes the logic for this plugin. It fetches the intercom specified object and saves the result in a S3 Bucket, under a specified key, in
Expand All @@ -23,5 +20,21 @@ njson format. The parameters it can accept include the following.
- `fields`: *optional* list of fields that you want to get from the object. If *None*, then this will get all fields for the object
- `replication_key`: *optional* name of the replication key, if needed.
- `replication_key_value`: *(optional)* value of the replication key, if needed. The operator will import only results with the property from replication_key grater than the value of this param.
- `intercom_method`: *(optional)* method to call from python-intercom. Default to "all".
- `**kwargs`: replication key and value, if replication_key parameter is given and extra params for intercom method if needed.
- `intercom_method`: *(optional)* method to call from python-intercom. Default to "all".
- `**kwargs`: replication key and value, if replication_key parameter is given and extra params for intercom method if needed.

### IntercomToGCSOperator
This operator composes the logic for this plugin. It fetches the intercom specified object and saves the result in a GCS Bucket, under a specified key, in
njson format. The parameters it can accept include the following.

- `intercom_conn_id`: The intercom connection id from Airflow
- `intercom_obj`: Intercom object to query
- `intercom_method`: *optional* Method from python-intercom.
- `s3_conn_id`: GCS connection id from Airflow.
- `s3_bucket`: The output sgcsbucket.
- `gcs_key`: The input gcs object.
- `fields`: *optional* list of fields that you want to get from the object. If *None*, then this will get all fields for the object
- `replication_key`: *optional* name of the replication key, if needed.
- `replication_key_value`: *(optional)* value of the replication key, if needed. The operator will import only results with the property from replication_key grater than the value of this param.
- `intercom_method`: *(optional)* method to call from python-intercom. Default to "all".
- `**kwargs`: replication key and value, if replication_key parameter is given and extra params for intercom method if needed.
3 changes: 2 additions & 1 deletion __init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from airflow.plugins_manager import AirflowPlugin
from intercom_plugin.operators.intercom_to_s3_operator import IntercomToS3Operator
from intercom_plugin.operators.intercom_to_gcs_operator import IntercomToGCSOperator
from intercom_plugin.hooks.intercom_hook import IntercomHook


class IntercomToS3Plugin(AirflowPlugin):
name = "intercom_plugin"
hooks = [IntercomHook]
operators = [IntercomToS3Operator]
operators = [IntercomToS3Operator, IntercomToGCSOperator]
executors = []
macros = []
admin_views = []
Expand Down
147 changes: 147 additions & 0 deletions operators/intercom_to_gcs_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import logging
import json
import collections
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from intercom_plugin.hooks.intercom_hook import IntercomHook
from tempfile import NamedTemporaryFile


class IntercomToGCSOperator(BaseOperator):
"""
Make a query against Intercom and write the resulting data to gcs.
"""
template_field = ('gcs_object', )

@apply_defaults
def __init__(
self,
intercom_conn_id,
intercom_obj,
intercom_method='all',
gcs_conn_id='',
gcs_bucket='',
gcs_object='',
fields=None,
replication_key_name=None,
replication_key_value=0,
*args,
**kwargs
):
"""
Initialize the operator
:param intercom_conn_id: name of the Airflow connection that has
your Intercom tokens
:param intercom_obj: name of the Intercom object we are
fetching data from
:param gcs_conn_id: name of the Airflow connection that has
your GCS conection params
:param gcs_bucket: name of the destination GCS bucket
:param gcs_object: name of the destination file from bucket
:param fields: *(optional)* list of fields that you want
to get from the object.
If *None*, then this will get all fields
for the object
:param replication_key_name: *(optional)* name of the replication key,
if needed.
:param replication_key_value: *(optional)* value of the replication key,
if needed. The operator will import only
results with the property from replication_key
grater than the value of this param.
:param intercom_method *(optional)* method to call from python-intercom
Default to "all".
:param \**kwargs: Extra params for the intercom query, based on python
intercom module
"""

super().__init__(*args, **kwargs)

self.intercom_conn_id = intercom_conn_id
self.intercom_obj = intercom_obj
self.intercom_method = intercom_method

self.gcs_conn_id = gcs_conn_id
self.gcs_bucket = gcs_bucket
self.gcs_object = gcs_object

self.fields = fields
self.replication_key_name = replication_key_name
self.replication_key_value = replication_key_value
self._kwargs = kwargs

def filter_fields(self, result):
"""
Filter the fields from an resulting object.

This will return a object only with fields given
as parameter in the constructor.

All fields are returned when "fields" param is None.
"""
if not self.fields:
return result
obj = {}
for field in self.fields:
obj[field] = result[field]
return obj

def filter(self, results):
"""
Filter the results.
This will filter the results if there's a replication key given as param.
"""
if not isinstance(results, collections.Iterable):
return json.loads((json.dumps(results, default=lambda o: o.__dict__)))

filtered = []
for result in results:
result_json = json.loads((json.dumps(result,
default=lambda o: o.__dict__)))

if not self.replication_key_name or \
int(result_json[self.replication_key_name]) >= int(self.replication_key_value):
filtered.append(self.filter_fields(result_json))
logging.info(filtered)

return filtered

def execute(self, context):
"""
Execute the operator.
This will get all the data for a particular Intercom model
and write it to a file.
"""
logging.info("Prepping to gather data from Intercom")
hook = IntercomHook(
conn_id=self.intercom_conn_id,
)

# attempt to login to Intercom
# if this process fails, it will raise an error and die right here
# we could wrap it
hook.get_conn()

logging.info(
"Making request for"
" {0} object".format(self.intercom_obj)
)

# fetch the results from intercom and filter them

results = hook.run_query(self.intercom_obj, self.intercom_method)
filterd_results = self.filter(results)

# write the results to a temporary file and save that file to gcs
with NamedTemporaryFile("w") as tmp:
for result in filterd_results:
tmp.write(json.dumps(result) + '\n')

tmp.flush()

gcs_conn = GoogleCloudStorageHook(self.gcs_conn_id)
gcs_conn.upload(self.gcs_bucket, self.gcs_object, tmp.name)

tmp.close()

logging.info("Query finished!")