Skip to content

Commit

Permalink
Firstdrop of ingest (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Feb 16, 2021
1 parent 90b635f commit 1287819
Show file tree
Hide file tree
Showing 38 changed files with 1,477 additions and 0 deletions.
4 changes: 4 additions & 0 deletions metadata-ingestion/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.git
.cache
env
venv
3 changes: 3 additions & 0 deletions metadata-ingestion/CHANGELOG
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
0.0.1
-----
* Modernizing python scripts and creating first package
49 changes: 49 additions & 0 deletions metadata-ingestion/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#FROM openjdk:8-jre-alpine as base
FROM python:3.7-slim AS base

#Setup env
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONFAULTHANDLER 1

ENV DOCKERIZE_VERSION v0.6.1

FROM base AS python-deps


# Install pipenv and compilation dependencies
RUN apt-get update && apt-get install -y --no-install-recommends gcc

#RUN apk --no-cache add curl tar \
# && curl -L https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz | tar -C /usr/local/bin -xzv

# Workaround alpine issue with /lib64 not being in the ld library path
# https://gitlab.alpinelinux.org/alpine/aports/-/issues/10140
#ENV LD_LIBRARY_PATH=/lib64

# Add glibc compat layer into alpine linux, needed by java-snappy if kafka topics are compressed with snappy
#RUN apk add libc6-compat

#FROM openjdk:8 as prod-build


# Copy virtual env from python-deps stage

COPY . /datahub-ingest
WORKDIR /datahub-ingest
RUN pip install -e .

RUN apt-get install -y telnet vim


#FROM ${APP_ENV}-install as final

RUN useradd --create-home datahub
USER datahub
#RUN addgroup -S datahub && adduser -S datahub -G datahub
#USER datahub

#EXPOSE PORT_NUM
ENTRYPOINT ["gometa-ingest"]
CMD ["-c","/datahub-ingest/recipes/kafka_to_console.yaml"]
20 changes: 20 additions & 0 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Dev
## Set up dev environment
- On MacOS: brew install librdkafka
- python3 -m venv venv
- source venv/bin/activate
- pip install -e .

# Run tests
- pip install -r test_requirements.txt
- pytest

# Run recipe
- ./recipes/kafka_to_console.sh

# Using Docker
## Build the image
- docker build . --tag dhub-ingest

## Run the ingestion script (recipes/kafka-to-console.yaml)
docker run --rm --network host dhub-ingest:latest
2 changes: 2 additions & 0 deletions metadata-ingestion/recipes/kafka_to_console.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#gometa-ingest -c recipes/kafka_to_console.yaml
gometa-ingest -c recipes/kafka_to_console.toml
8 changes: 8 additions & 0 deletions metadata-ingestion/recipes/kafka_to_console.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[source]
type="kafka"
extractor="gometa.ingestion.extractor.kafka.KafkaMetadataExtractor"
[source.kafka.connection]
bootstrap="localhost:9092"

[sink]
type = "console"
9 changes: 9 additions & 0 deletions metadata-ingestion/recipes/kafka_to_console.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
source:
type: "kafka"
extractor: "gometa.ingestion.extractor.kafka.KafkaMetadataExtractor"
kafka:
connection.bootstrap: "broker:9092"

sink:
type: "console"
11 changes: 11 additions & 0 deletions metadata-ingestion/recipes/kafka_to_datahub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
source:
type: "kafka"
extractor: "gometa.ingestion.extractor.kafka.KafkaMetadataExtractor"
kafka:
connection.bootstrap: "localhost:9092"

sink:
type: "datahub"
datahub:
transport: "kafka"
29 changes: 29 additions & 0 deletions metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[flake8]
max-line-length = 130
max-complexity = 15

[mypy]
mypy_path = src
namespace_packages = true
strict_optional = yes
disallow_untyped_defs = no

[isort]
line_length = 120
indent=' '
multi_line_output = 3
lines_between_types = 1
include_trailing_comma = true
use_parentheses = true
sections = FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER

[tool:pytest]
addopts = --cov src --cov-report term --cov-config setup.cfg
testpaths = test

[coverage:report]
fail_under = 96
show_missing = true
exclude_lines =
pragma: no cover
@abstract
75 changes: 75 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os
import setuptools


def get_version():
root = os.path.dirname(__file__)
changelog = os.path.join(root, "CHANGELOG")
with open(changelog) as f:
return f.readline().strip()


def get_long_description():
root = os.path.dirname(__file__)
with open(os.path.join(root, "README.md")) as f:
description = f.read()

description += "\n\nChangelog\n=========\n\n"

with open(os.path.join(root, "CHANGELOG")) as f:
description += f.read()

return description


setuptools.setup(
name="gometa",
version=get_version(),
url="https://github.com/linkedin/datahub",
author="DataHub Committers",
license="Apache License 2.0",
description="A CLI to work with DataHub metadata",
long_description=get_long_description(),
long_description_content_type="text/markdown",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
"License :: OSI Approved",
"License :: OSI Approved :: Apache Software License",
"Operating System :: Unix",
"Operating System :: POSIX :: Linux",
"Environment :: Console",
"Environment :: MacOS X",
"Topic :: Software Development",
],
python_requires=">=3.6",
package_dir={"": "src"},
packages=["gometa", "gometa.configuration"],
include_package_data=True,
package_data={"gometa": ["py.typed"]},
entry_points={
"console_scripts": [
"gometa-ingest = gometa.entrypoints:gometa_ingest"
],
},
install_requires=[
'dataclasses; python_version<="3.6"',
"click>=7.1.1",
"pyyaml>=5.4.1",
"toml>=0.10.0",
"pydantic>=1.5.1",
"watchdog>=0.10.3",
"confluent_kafka>=1.5.0",
"requests>=2.25.1",
"fastavro>=1.3.0",
"avro-python3==1.8.2",
],
)
Empty file.
3 changes: 3 additions & 0 deletions metadata-ingestion/src/gometa/configuration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DEFAULT_KAFKA_TOPIC="MetadataChangeEvent_v4"
from .common import ConfigModel, DynamicTypedConfig, DynamicFactory, ConfigurationMechanism
from .kafka import KafkaConnectionConfig
76 changes: 76 additions & 0 deletions metadata-ingestion/src/gometa/configuration/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from abc import ABC, abstractmethod
from typing import TypeVar, Type
from pydantic import BaseModel, ValidationError
from pathlib import Path


class ConfigModel(BaseModel):
class Config:
extra = "allow"


class DynamicTypedConfig(ConfigModel):
type: str


class MetaError(Exception):
"""A base class for all meta exceptions"""


class ConfigurationError(MetaError):
"""A configuration error has happened"""


T = TypeVar("T", bound=ConfigModel)


class ConfigurationMechanism(ABC):
@abstractmethod
def load_config(self, cls: Type[T], config_file: Path) -> T:
pass


class DynamicFactory:
def __init__(self):
self.factory = {}

def register(self, type, cfg_cls: Type[T]):
self.factory[type] = cfg_cls

def load_config(self, dyn_config: DynamicTypedConfig) -> ConfigModel:
if self.factory[dyn_config.type]:
config_class = self.factory[dyn_config.type]
try:
return config_class.parse_obj(dyn_config.dict()[dyn_config.type])
except ValidationError as e:
messages = []
for err in e.errors():
location = ".".join((str(x) for x in err["loc"]))
reason = err["msg"]
messages.append(f" - {location}: {reason}")

msg = "\n".join(messages)
raise ConfigurationError(f"Invalid value in configuration : \n{msg}") from e


def generic_load_file(cls: Type[T], path: Path, loader_func) -> T:
if not path.exists():
return cls()

with path.open() as f:
try:
config = loader_func(f)
except ValueError as e:
raise ConfigurationError(f'File {path} is unparseable: {e}') from e

try:
return cls.parse_obj(config)
except ValidationError as e:
messages = []
for err in e.errors():
location = ".".join((str(x) for x in err["loc"]))
reason = err["msg"]
messages.append(f" - {location}: {reason}")

msg = "\n".join(messages)
raise ConfigurationError(f"Invalid value in configuration file {path}: \n{msg}") from e
13 changes: 13 additions & 0 deletions metadata-ingestion/src/gometa/configuration/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from dataclasses import dataclass
from typing import Optional


@dataclass
class KafkaConnectionConfig:
"""Configuration class for holding connectivity information for Kafka"""

# bootstrap servers
bootstrap: Optional[str] = "localhost:9092"

# schema registry location
schema_registry_url: Optional[str] = "http://localhost:8081"
14 changes: 14 additions & 0 deletions metadata-ingestion/src/gometa/configuration/toml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from typing import Type, TypeVar
from pathlib import Path
import toml
from .common import ConfigModel, ConfigurationMechanism, generic_load_file

T = TypeVar("T", bound=ConfigModel)


class TomlConfigurationMechanism(ConfigurationMechanism):
"""Ability to load configuration from toml files"""

def load_config(self, cls: Type[T], config_file: Path) -> T:
config = generic_load_file(cls, config_file, toml.load)
return config
17 changes: 17 additions & 0 deletions metadata-ingestion/src/gometa/configuration/yaml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import Type, TypeVar
from pathlib import Path
import yaml


from .common import ConfigModel, ConfigurationMechanism, generic_load_file


T = TypeVar("T", bound=ConfigModel)


class YamlConfigurationMechanism(ConfigurationMechanism):
"""Ability to load configuration from yaml files"""

def load_config(self, cls: Type[T], config_file: Path) -> T:
config = generic_load_file(cls, config_file, yaml.safe_load)
return config
32 changes: 32 additions & 0 deletions metadata-ingestion/src/gometa/entrypoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import click

from gometa.configuration.yaml import YamlConfigurationMechanism
from gometa.configuration.toml import TomlConfigurationMechanism
from gometa.ingestion.run.pipeline import Pipeline, PipelineConfig

BASE_LOGGING_FORMAT = "%(message)s"
#CONNECTION_STRING_FORMAT_REGEX = re.compile(f"^{HOST_REGEX}(:{PATH_REGEX})?$")
DEFAULT_CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
EXECUTION_CONTEXT_SETTINGS = dict(
help_option_names=["-h", "--help"], ignore_unknown_options=True, allow_interspersed_args=False
)

import pathlib

@click.command(context_settings=DEFAULT_CONTEXT_SETTINGS)
@click.option("-c", "--config", help="Config file in .toml or .yaml format", required=True)
def gometa_ingest(config: str):
"""Main command for ingesting metadata into DataHub"""

config_file = pathlib.Path(config)
if config_file.suffix == ".yaml":
config_mech = YamlConfigurationMechanism()
elif config_file.suffix == ".toml":
config_mech = TomlConfigurationMechanism()
else:
click.serr("Cannot process this file type")

pipeline_config = config_mech.load_config(PipelineConfig, config_file)
pipeline = Pipeline().configure(pipeline_config).run()


1 change: 1 addition & 0 deletions metadata-ingestion/src/gometa/ingestion/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .common import RecordEnvelope
8 changes: 8 additions & 0 deletions metadata-ingestion/src/gometa/ingestion/api/closeable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from abc import abstractmethod, ABCMeta

class Closeable(metaclass=ABCMeta):
@abstractmethod
def close(self):
pass


Loading

0 comments on commit 1287819

Please sign in to comment.