Skip to content

Commit

Permalink
feat(ingest): add options for Airflow lineage backend (datahub-projec…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored May 14, 2021
1 parent e2d8a93 commit 3dfe3d3
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 192 deletions.
2 changes: 1 addition & 1 deletion docs/demo.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# DataHub Demo Environment

We have a hosted demo environment available, kindly provided by [Acryl](https://acryl.io/).
We have a hosted demo environment available, kindly provided by [Acryl Data](https://acryl.io/).

<p>
<a
Expand Down
16 changes: 13 additions & 3 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Before running any metadata ingestion job, you should make sure that DataHub bac

### Install from PyPI

The folks over at [Acryl](https://www.acryl.io/) maintain a PyPI package for DataHub metadata ingestion.
The folks over at [Acryl Data](https://www.acryl.io/) maintain a PyPI package for DataHub metadata ingestion.

```shell
# Requires Python 3.6+
Expand Down Expand Up @@ -43,7 +43,7 @@ We use a plugin architecture so that you can install only the dependencies you a
| mysql | `pip install 'acryl-datahub[mysql]'` | MySQL source |
| oracle | `pip install 'acryl-datahub[oracle]'` | Oracle source |
| postgres | `pip install 'acryl-datahub[postgres]'` | Postgres source |
| redshift | `pip install 'acryl-datahub[redshift]'` | Redshift source |
| redshift | `pip install 'acryl-datahub[redshift]'` | Redshift source |
| sqlalchemy | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source |
| snowflake | `pip install 'acryl-datahub[snowflake]'` | Snowflake source |
| superset | `pip install 'acryl-datahub[superset]'` | Supserset source |
Expand Down Expand Up @@ -732,8 +732,18 @@ The Airflow lineage backend is only supported in Airflow 1.10.15+ and 2.0.2+.
```ini
[lineage]
backend = datahub_provider.lineage.datahub.DatahubLineageBackend
datahub_conn_id = datahub_rest_default # or datahub_kafka_default - whatever you named the connection in step 1
datahub_kwargs = {
"datahub_conn_id": "datahub_rest_default",
"capture_ownership_info": true,
"capture_tags_info": true,
"graceful_exceptions": true }
# The above indentation is important!
```
Configuration options:
- `datahub_conn_id` (required): Usually `datahub_rest_default` or `datahub_kafka_default`, depending on what you named the connection in step 1.
- `capture_ownership_info` (defaults to true): If true, the owners field of the DAG will be capture as a DataHub corpuser.
- `capture_tags_info` (defaults to true): If true, the tags field of the DAG will be captured as DataHub tags.
- `graceful_exceptions` (defaults to true): If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.
3. Configure `inlets` and `outlets` for your Airflow operators. For reference, look at the sample DAG in [`lineage_backend_demo.py`](./src/datahub_provider/example_dags/lineage_backend_demo.py).
4. [optional] Learn more about [Airflow lineage](https://airflow.apache.org/docs/apache-airflow/stable/lineage.html), including shorthand notation and some automation.

Expand Down
3 changes: 3 additions & 0 deletions metadata-ingestion/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ indent = ' '
profile = 'black'
sections = 'FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER'
skip_glob = 'src/datahub/metadata'

[tool.pyright]
extraPaths = ['tests']
9 changes: 2 additions & 7 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def get_long_description():
"bigquery": sql_common | {"pybigquery >= 0.6.0"},
"hive": sql_common
| {
# Acryl maintains a fork of PyHive, which adds support for table comments
# Acryl Data maintains a fork of PyHive, which adds support for table comments
# and column comments, and also releases HTTP and HTTPS transport schemes.
"acryl-pyhive[hive]>=0.6.6"
},
Expand Down Expand Up @@ -174,12 +174,7 @@ def get_long_description():
"datahub-kafka = datahub.ingestion.sink.datahub_kafka:DatahubKafkaSink",
"datahub-rest = datahub.ingestion.sink.datahub_rest:DatahubRestSink",
],
"apache_airflow_provider": [
"provider_info=datahub.integrations.airflow.get_provider_info:get_provider_info"
],
"airflow.plugins": [
"datahub = datahub.integrations.airflow.get_provider_info:DatahubAirflowPlugin"
],
"apache_airflow_provider": ["provider_info=datahub_provider:get_provider_info"],
}

if is_py37_or_newer:
Expand Down
183 changes: 183 additions & 0 deletions metadata-ingestion/src/datahub_provider/_lineage_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import json
from typing import TYPE_CHECKING, Dict, List

import dateutil.parser
from airflow.configuration import conf

import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
from datahub.configuration.common import ConfigModel
from datahub_provider.entities import _Entity

if TYPE_CHECKING:
from airflow import DAG
from airflow.models.baseoperator import BaseOperator

from datahub_provider.hooks.datahub import DatahubGenericHook


def _entities_to_urn_list(iolets: List[_Entity]) -> List[str]:
return [let.urn for let in iolets]


class DatahubBasicLineageConfig(ConfigModel):
# DataHub hook connection ID.
datahub_conn_id: str

# If true, the owners field of the DAG will be capture as a DataHub corpuser.
capture_ownership_info: bool = True

# If true, the tags field of the DAG will be captured as DataHub tags.
capture_tags_info: bool = True

def make_emitter_hook(self) -> "DatahubGenericHook":
# This is necessary to avoid issues with circular imports.
from datahub_provider.hooks.datahub import DatahubGenericHook

return DatahubGenericHook(self.datahub_conn_id)


def send_lineage_to_datahub(
config: DatahubBasicLineageConfig,
operator: "BaseOperator",
inlets: List[_Entity],
outlets: List[_Entity],
context: Dict,
) -> None:
# This is necessary to avoid issues with circular imports.
from airflow.serialization.serialized_objects import (
SerializedBaseOperator,
SerializedDAG,
)

dag: "DAG" = context["dag"]
task = context["task"]

# TODO: capture context
# context dag_run
# task_instance: "TaskInstance" = context["task_instance"]
# TODO: capture raw sql from db operators

flow_urn = builder.make_data_flow_urn("airflow", dag.dag_id)
job_urn = builder.make_data_job_urn_with_flow(flow_urn, task.task_id)

base_url = conf.get("webserver", "base_url")
flow_url = f"{base_url}/tree?dag_id={dag.dag_id}"
job_url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={task.task_id}"
# operator.log.info(f"{flow_url=}")
# operator.log.info(f"{job_url=}")
# operator.log.info(f"{dag.get_serialized_fields()=}")
# operator.log.info(f"{task.get_serialized_fields()=}")
# operator.log.info(f"{SerializedDAG.serialize_dag(dag)=}")

flow_property_bag: Dict[str, str] = {
key: repr(value) for (key, value) in SerializedDAG.serialize_dag(dag).items()
}
for key in dag.get_serialized_fields():
if key not in flow_property_bag:
flow_property_bag[key] = repr(getattr(dag, key))
job_property_bag: Dict[str, str] = {
key: repr(value)
for (key, value) in SerializedBaseOperator.serialize_operator(task).items()
}
for key in task.get_serialized_fields():
if key not in job_property_bag:
job_property_bag[key] = repr(getattr(task, key))
# operator.log.info(f"{flow_property_bag=}")
# operator.log.info(f"{job_property_bag=}")

if config.capture_ownership_info:
timestamp = int(dateutil.parser.parse(context["ts"]).timestamp() * 1000)
ownership = models.OwnershipClass(
owners=[
models.OwnerClass(
owner=builder.make_user_urn(dag.owner),
type=models.OwnershipTypeClass.DEVELOPER,
source=models.OwnershipSourceClass(
type=models.OwnershipSourceTypeClass.SERVICE,
url=dag.filepath,
),
)
],
lastModified=models.AuditStampClass(
time=timestamp, actor=builder.make_user_urn("airflow")
),
)
# operator.log.info(f"{ownership=}")
ownership_aspect = [ownership]
else:
ownership_aspect = []

if config.capture_tags_info:
tags = models.GlobalTagsClass(
tags=[
models.TagAssociationClass(tag=builder.make_tag_urn(tag))
for tag in (dag.tags or [])
]
)
# operator.log.info(f"{tags=}")
tags_aspect = [tags]
else:
tags_aspect = []

flow_mce = models.MetadataChangeEventClass(
proposedSnapshot=models.DataFlowSnapshotClass(
urn=flow_urn,
aspects=[
models.DataFlowInfoClass(
name=dag.dag_id,
description=f"{dag.description}\n\n{dag.doc_md or ''}",
customProperties=flow_property_bag,
externalUrl=flow_url,
),
*ownership_aspect,
*tags_aspect,
],
)
)

job_mce = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn=job_urn,
aspects=[
models.DataJobInfoClass(
name=task.task_id,
type=models.AzkabanJobTypeClass.COMMAND,
description=None,
customProperties=job_property_bag,
externalUrl=job_url,
),
models.DataJobInputOutputClass(
inputDatasets=_entities_to_urn_list(inlets or []),
outputDatasets=_entities_to_urn_list(outlets or []),
),
*ownership_aspect,
*tags_aspect,
],
)
)

force_entity_materialization = [
models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn=iolet,
aspects=[
models.StatusClass(removed=False),
],
)
)
for iolet in _entities_to_urn_list((inlets or []) + (outlets or []))
]

hook = config.make_emitter_hook()

mces = [
flow_mce,
job_mce,
*force_entity_materialization,
]
operator.log.info(
"DataHub lineage backend - emitting metadata:\n"
+ "\n".join(json.dumps(mce.to_obj()) for mce in mces)
)
hook.emit_mces(mces)
7 changes: 7 additions & 0 deletions metadata-ingestion/src/datahub_provider/entities.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from abc import abstractmethod

import attr

import datahub.emitter.mce_builder as builder
Expand All @@ -12,6 +14,11 @@ def as_dict(self):
# Required for compat with Airflow 1.10.x
return attr.asdict(self)

@property
@abstractmethod
def urn(self) -> str:
pass


@attr.s(auto_attribs=True, str=True)
class Dataset(_Entity):
Expand Down
Loading

0 comments on commit 3dfe3d3

Please sign in to comment.