Skip to content

Commit

Permalink
feat(ingest): transformers - add support for processing MCP-s (datahu…
Browse files Browse the repository at this point in the history
  • Loading branch information
swaroopjagadish authored Mar 7, 2022
1 parent 128148e commit 35b187a
Show file tree
Hide file tree
Showing 17 changed files with 1,337 additions and 209 deletions.
46 changes: 21 additions & 25 deletions metadata-ingestion/examples/transforms/custom_transform_example.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# see https://datahubproject.io/docs/metadata-ingestion/transformers for original tutorial
import json
from typing import Iterable
from typing import List, Optional

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.add_dataset_ownership import Semantics
from datahub.ingestion.transformer.base_transformer import (
BaseTransformer,
SingleAspectTransformer,
)
from datahub.metadata.schema_classes import (
DatasetSnapshotClass,
MetadataChangeEventClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
Expand All @@ -21,7 +21,7 @@ class AddCustomOwnershipConfig(ConfigModel):
semantics: Semantics = Semantics.OVERWRITE


class AddCustomOwnership(Transformer):
class AddCustomOwnership(BaseTransformer, SingleAspectTransformer):
"""Transformer that adds owners to datasets according to a callback function."""

# context param to generate run metadata such as a run ID
Expand All @@ -46,31 +46,27 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddCustomOwnership"
config = AddCustomOwnershipConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform(
self, record_envelopes: Iterable[RecordEnvelope]
) -> Iterable[RecordEnvelope]:

# loop over envelopes
for envelope in record_envelopes:
def entity_types(self) -> List[str]:
return ["dataset"]

# if envelope is an MCE, add the ownership classes
if isinstance(envelope.record, MetadataChangeEventClass):
envelope.record = self.transform_one(envelope.record)
yield envelope
def aspect_name(self) -> str:
return "ownership"

def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass):
return mce
def transform_aspect( # type: ignore
self, entity_urn: str, aspect_name: str, aspect: Optional[OwnershipClass]
) -> Optional[OwnershipClass]:

owners_to_add = self.owners
assert aspect is None or isinstance(aspect, OwnershipClass)

if owners_to_add:
ownership = builder.get_or_add_aspect(
mce,
OwnershipClass(
ownership = (
aspect
if aspect
else OwnershipClass(
owners=[],
),
)
)
ownership.owners.extend(owners_to_add)

return mce
return ownership
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ class RecordEnvelope(Generic[T]):
metadata: dict


class ControlRecord:
"""A marker class to indicate records that are control signals from the framework"""

pass


class EndOfStream(ControlRecord):
"""A marker class to indicate an end of stream"""

pass


@dataclass
class _WorkUnitId(metaclass=ABCMeta):
id: str
Expand Down
14 changes: 13 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
PipelineExecutionError,
)
from datahub.ingestion.api.committable import CommitPolicy
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.common import EndOfStream, PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import Sink, WriteCallback
from datahub.ingestion.api.source import Extractor, Source
from datahub.ingestion.api.transform import Transformer
Expand Down Expand Up @@ -195,6 +195,18 @@ def run(self) -> None:
if not self.dry_run:
self.sink.handle_work_unit_end(wu)
self.source.close()
# no more data is coming, we need to let the transformers produce any additional records if they are holding on to state
for record_envelope in self.transform(
[
RecordEnvelope(
record=EndOfStream(), metadata={"workunit_id": "end-of-stream"}
)
]
):
if not self.dry_run and not isinstance(record_envelope.record, EndOfStream):
# TODO: propagate EndOfStream and other control events to sinks, to allow them to flush etc.
self.sink.write_record_async(record_envelope, callback)

self.sink.close()
self.process_commits()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,25 @@
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer
from datahub.metadata.schema_classes import (
BrowsePathsClass,
DatasetSnapshotClass,
MetadataChangeEventClass,
from datahub.ingestion.transformer.dataset_transformer import (
DatasetBrowsePathsTransformer,
)
from datahub.metadata.schema_classes import BrowsePathsClass, MetadataChangeEventClass


class AddDatasetBrowsePathConfig(ConfigModel):
path_templates: List[str]
replace_existing: bool = False


class AddDatasetBrowsePathTransformer(DatasetTransformer):
class AddDatasetBrowsePathTransformer(DatasetBrowsePathsTransformer):
"""Transformer that can be used to set browse paths through template replacement"""

ctx: PipelineContext
config: AddDatasetBrowsePathConfig

def __init__(self, config: AddDatasetBrowsePathConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config

Expand All @@ -34,8 +33,6 @@ def create(
return cls(config, ctx)

def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass):
return mce
platform_part, dataset_fqdn, env = (
mce.proposedSnapshot.urn.replace("urn:li:dataset:(", "")
.replace(")", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from datahub.configuration.import_resolver import pydantic_resolve_key
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.metadata.schema_classes import (
DatasetSnapshotClass,
MetadataChangeEventClass,
Expand Down Expand Up @@ -48,13 +50,14 @@ def ensure_semantics_is_upper_case(cls, v):
return v


class AddDatasetOwnership(DatasetTransformer):
class AddDatasetOwnership(DatasetOwnershipTransformer):
"""Transformer that adds owners to datasets according to a callback function."""

ctx: PipelineContext
config: AddDatasetOwnershipConfig

def __init__(self, config: AddDatasetOwnershipConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
if self.config.semantics == Semantics.PATCH and self.ctx.graph is None:
Expand Down Expand Up @@ -105,8 +108,7 @@ def get_ownership_to_set(
return mce_ownership

def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass):
return mce
assert isinstance(mce.proposedSnapshot, DatasetSnapshotClass)
owners_to_add = self.config.get_owners_to_add(mce.proposedSnapshot)
if owners_to_add:
ownership = builder.get_or_add_aspect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from datahub.configuration.common import ConfigModel
from datahub.configuration.import_resolver import pydantic_resolve_key
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer
from datahub.ingestion.transformer.dataset_transformer import (
DatasetPropertiesTransformer,
)
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
DatasetSnapshotClass,
Expand All @@ -28,7 +30,7 @@ class Config:
_resolve_properties_class = pydantic_resolve_key("add_properties_resolver_class")


class AddDatasetProperties(DatasetTransformer):
class AddDatasetProperties(DatasetPropertiesTransformer):
"""Transformer that adds properties to datasets according to a callback function."""

ctx: PipelineContext
Expand All @@ -40,6 +42,7 @@ def __init__(
ctx: PipelineContext,
**resolver_args: Dict[str, Any],
):
super().__init__()
self.ctx = ctx
self.config = config
self.resolver_args = resolver_args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datahub.configuration.common import ConfigModel, KeyValuePattern
from datahub.configuration.import_resolver import pydantic_resolve_key
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer
from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer
from datahub.metadata.schema_classes import (
DatasetSnapshotClass,
GlobalTagsClass,
Expand All @@ -24,13 +24,14 @@ class AddDatasetTagsConfig(ConfigModel):
_resolve_tag_fn = pydantic_resolve_key("get_tags_to_add")


class AddDatasetTags(DatasetTransformer):
class AddDatasetTags(DatasetTagsTransformer):
"""Transformer that adds tags to datasets according to a callback function."""

ctx: PipelineContext
config: AddDatasetTagsConfig

def __init__(self, config: AddDatasetTagsConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config

Expand All @@ -40,8 +41,7 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTags":
return cls(config, ctx)

def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass):
return mce
assert isinstance(mce.proposedSnapshot, DatasetSnapshotClass)
tags_to_add = self.config.get_tags_to_add(mce.proposedSnapshot)
if tags_to_add:
tags = builder.get_or_add_aspect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datahub.configuration.common import ConfigModel, KeyValuePattern
from datahub.configuration.import_resolver import pydantic_resolve_key
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer
from datahub.ingestion.transformer.dataset_transformer import DatasetTermsTransformer
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetSnapshotClass,
Expand All @@ -25,13 +25,14 @@ class AddDatasetTermsConfig(ConfigModel):
_resolve_term_fn = pydantic_resolve_key("get_terms_to_add")


class AddDatasetTerms(DatasetTransformer):
class AddDatasetTerms(DatasetTermsTransformer):
"""Transformer that adds glossary terms to datasets according to a callback function."""

ctx: PipelineContext
config: AddDatasetTermsConfig

def __init__(self, config: AddDatasetTermsConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config

Expand Down
Loading

0 comments on commit 35b187a

Please sign in to comment.