Skip to content

Commit

Permalink
feat(ingest): data-lake - remove spark requirement if not profiling (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinhu authored Feb 25, 2022
1 parent 4b4c7d5 commit 02fe05e
Show file tree
Hide file tree
Showing 19 changed files with 1,829 additions and 501 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/metadata-ingestion.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ jobs:

metadata-ingestion-general:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
strategy:
matrix:
python-version: ["3.6", "3.9.9"]
Expand Down Expand Up @@ -46,6 +48,8 @@ jobs:
metadata-ingestion-by-version:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
strategy:
matrix:
python-version: ["3.6", "3.9.9"]
Expand Down
18 changes: 17 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,21 @@ def get_long_description():
"cryptography",
}

data_lake_base = {
*aws_common,
"parse>=1.19.0",
"pyarrow>=6.0.1",
"tableschema>=1.20.2",
"ujson>=4.3.0",
"types-ujson>=4.2.1",
"smart-open[s3]>=5.2.1",
}

data_lake_profiling = {
"pydeequ==1.0.1",
"pyspark==3.0.3",
}

# Note: for all of these, framework_common will be added.
plugins: Dict[str, Set[str]] = {
# Sink plugins.
Expand All @@ -118,7 +133,7 @@ def get_long_description():
"clickhouse-usage": sql_common | {"clickhouse-sqlalchemy==0.1.8"},
"datahub-lineage-file": set(),
"datahub-business-glossary": set(),
"data-lake": {*aws_common, "pydeequ==1.0.1", "pyspark==3.0.3", "parse==1.19.0"},
"data-lake": {*data_lake_base, *data_lake_profiling},
"dbt": {"requests"},
"druid": sql_common | {"pydruid>=0.6.2"},
# Starting with 7.14.0 python client is checking if it is connected to elasticsearch client. If its not it throws
Expand Down Expand Up @@ -196,6 +211,7 @@ def get_long_description():
*base_requirements,
*framework_common,
*mypy_stubs,
*data_lake_base,
"black>=21.12b0",
"coverage>=5.1",
"flake8>=3.8.3",
Expand Down
24 changes: 14 additions & 10 deletions metadata-ingestion/source_docs/data_lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This source is in **Beta** and under active development. Not yet considered read

## Setup

To install this plugin, run `pip install 'acryl-datahub[data-lake]'`. Because the files are read using PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed.
To install this plugin, run `pip install 'acryl-datahub[data-lake]'`. Note that because the profiling is run with PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed (see [compatibility](#compatibility) for more details).

The data lake connector extracts schemas and profiles from a variety of file formats (see below for an exhaustive list).
Individual files are ingested as tables, and profiles are computed similar to the [SQL profiler](./sql_profiles.md).
Expand All @@ -37,7 +37,7 @@ If you would like to write a more complicated function for resolving file names,
Extracts:

- Row and column counts for each table
- For each column, if applicable:
- For each column, if profiling is enabled:
- null counts and proportions
- distinct counts and proportions
- minimum, maximum, mean, median, standard deviation, some quantile values
Expand All @@ -47,20 +47,25 @@ This connector supports both local files as well as those stored on AWS S3 (whic

- CSV
- TSV
- Parquet
- JSON
- Parquet
- Apache Avro

Schemas for Parquet and Avro files are extracted as provided.

Schemas for schemaless formats (CSV, TSV, JSON) are inferred. For CSV and TSV files, we consider the first 100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details))
JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few objects of the file), which may impact performance.
We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object.

:::caution

If you are ingesting datasets from AWS S3, we recommend running the ingestion on a server in the same region to avoid high egress costs.

:::

| Capability | Status | Details |
| -----------| ------ | ---- |
| Platform Instance | 🛑 | [link](../../docs/platform-instances.md) |

| Capability | Status | Details |
| ----------------- | ------ | ---------------------------------------- |
| Platform Instance | 🛑 | [link](../../docs/platform-instances.md) |

## Quickstart recipe

Expand Down Expand Up @@ -99,6 +104,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `aws_config.aws_access_key_id` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_config.aws_secret_access_key` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_config.aws_session_token` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `max_rows` | | `100` | Maximum number of rows to use when inferring schemas for TSV and CSV files. |
| `schema_patterns.allow` | | `*` | List of regex patterns for tables to ingest. Defaults to all. |
| `schema_patterns.deny` | | | List of regex patterns for tables to not ingest. Defaults to none. |
| `schema_patterns.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching of tables to ingest. |
Expand All @@ -121,9 +127,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.

## Compatibility

Files are read using PySpark and profiles are computed with PyDeequ.
We currently require Spark 3.0.3 with Hadoop 3.2 to be installed and the `SPARK_HOME` environment variable to be set for PySpark.
The Spark+Hadoop binary can be downloaded [here](https://www.apache.org/dyn/closer.lua/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz).
Profiles are computed with PyDeequ, which relies on PySpark. Therefore, for computing profiles, we currently require Spark 3.0.3 with Hadoop 3.2 to be installed and the `SPARK_HOME` and `SPARK_VERSION` environment variables to be set. The Spark+Hadoop binary can be downloaded [here](https://www.apache.org/dyn/closer.lua/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz).

For an example guide on setting up PyDeequ on AWS, see [this guide](https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/).

Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,8 @@ def process_dataflow_node(
# append S3 format if different ones exist
if len(s3_formats[s3_uri]) > 1:
node_urn = make_s3_urn(
s3_uri,
f"{s3_uri}.{node_args.get('format')}",
self.env,
suffix=node_args.get("format"),
)

else:
Expand Down
32 changes: 25 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,35 @@
from typing import Optional
import os

S3_PREFIXES = ["s3://", "s3n://", "s3a://"]

def make_s3_urn(s3_uri: str, env: str, suffix: Optional[str] = None) -> str:

if not s3_uri.startswith("s3://"):
raise ValueError("S3 URIs should begin with 's3://'")
def is_s3_uri(uri: str) -> bool:
return any(uri.startswith(prefix) for prefix in S3_PREFIXES)


def strip_s3_prefix(s3_uri: str) -> str:
# remove S3 prefix (s3://)
s3_name = s3_uri[5:]
for s3_prefix in S3_PREFIXES:
if s3_uri.startswith(s3_prefix):
plain_base_path = s3_uri[len(s3_prefix) :]
return plain_base_path

raise ValueError(
f"Not an S3 URI. Must start with one of the following prefixes: {str(S3_PREFIXES)}"
)


def make_s3_urn(s3_uri: str, env: str) -> str:

s3_name = strip_s3_prefix(s3_uri)

if s3_name.endswith("/"):
s3_name = s3_name[:-1]

if suffix is not None:
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name}_{suffix},{env})"
name, extension = os.path.splitext(s3_name)

if extension != "":
extension = extension[1:] # remove the dot
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{name}_{extension},{env})"

return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{env})"
Loading

0 comments on commit 02fe05e

Please sign in to comment.