forked from datahub-project/datahub
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(datahub-client): add Java REST emitter (datahub-project#3781)
- Loading branch information
1 parent
450cdc1
commit c65609a
Showing
28 changed files
with
1,690 additions
and
414 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
# Python Emitter | ||
|
||
In some cases, you might want to construct Metadata events directly and use programmatic ways to emit that metadata to DataHub. Use-cases are typically push-based and include emitting metadata events from CI/CD pipelines, custom orchestrators etc. | ||
|
||
The `acryl-datahub` Python package offers REST and Kafka emitter API-s, which can easily be imported and called from your own code. | ||
|
||
## Installation | ||
|
||
Follow the installation guide for the main `acryl-datahub` package [here](./README.md#install-from-pypi). Read on for emitter specific installation instructions. | ||
## REST Emitter | ||
|
||
The REST emitter is a thin wrapper on top of the `requests` module and offers a blocking interface for sending metadata events over HTTP. Use this when simplicity and acknowledgement of metadata being persisted to DataHub's metadata store is more important than throughput of metadata emission. Also use this when read-after-write scenarios exist, e.g. writing metadata and then immediately reading it back. | ||
|
||
### Installation | ||
|
||
```console | ||
pip install -U `acryl-datahub[datahub-rest]` | ||
``` | ||
|
||
### Example Usage | ||
```python | ||
import datahub.emitter.mce_builder as builder | ||
from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||
from datahub.metadata.schema_classes import ChangeTypeClass, DatasetPropertiesClass | ||
|
||
from datahub.emitter.rest_emitter import DatahubRestEmitter | ||
|
||
# Create an emitter to DataHub over REST | ||
emitter = DatahubRestEmitter(gms_server="http://localhost:8080", extra_headers={}) | ||
|
||
# Test the connection | ||
emitter.test_connection() | ||
|
||
# Construct a dataset properties object | ||
dataset_properties = DatasetPropertiesClass(description="This table stored the canonical User profile", | ||
customProperties={ | ||
"governance": "ENABLED" | ||
}) | ||
|
||
# Construct a MetadataChangeProposalWrapper object. | ||
metadata_event = MetadataChangeProposalWrapper( | ||
entityType="dataset", | ||
changeType=ChangeTypeClass.UPSERT, | ||
entityUrn=builder.make_dataset_urn("bigquery", "my-project.my-dataset.user-table"), | ||
aspectName="datasetProperties", | ||
aspect=dataset_properties, | ||
) | ||
|
||
# Emit metadata! This is a blocking call | ||
emitter.emit(metadata_event) | ||
``` | ||
|
||
Other examples: | ||
- [lineage_emitter_mcpw_rest.py](./examples/library/lineage_emitter_mcpw_rest.py) - emits simple bigquery table-to-table (dataset-to-dataset) lineage via REST as MetadataChangeProposalWrapper. | ||
|
||
### Emitter Code | ||
|
||
If you're interested in looking at the REST emitter code, it is available [here](./src/datahub/emitter/rest_emitter.py) | ||
|
||
## Kafka Emitter | ||
|
||
The Kafka emitter is a thin wrapper on top of the SerializingProducer class from `confluent-kafka` and offers a non-blocking interface for sending metadata events to DataHub. Use this when you want to decouple your metadata producer from the uptime of your datahub metadata server by utilizing Kafka as a highly available message bus. For example, if your DataHub metadata service is down due to planned or unplanned outages, you can still continue to collect metadata from your mission critical systems by sending it to Kafka. Also use this emitter when throughput of metadata emission is more important than acknowledgement of metadata being persisted to DataHub's backend store. | ||
|
||
**_Note_**: The Kafka emitter uses Avro to serialize the Metadata events to Kafka. Changing the serializer will result in unprocessable events as DataHub currently expects the metadata events over Kafka to be serialized in Avro. | ||
|
||
### Installation | ||
|
||
```console | ||
# For emission over Kafka | ||
pip install -U `acryl-datahub[datahub-kafka]` | ||
``` | ||
|
||
|
||
### Example Usage | ||
```python | ||
import datahub.emitter.mce_builder as builder | ||
from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||
from datahub.metadata.schema_classes import ChangeTypeClass, DatasetPropertiesClass | ||
|
||
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig | ||
# Create an emitter to Kafka | ||
kafka_config = { | ||
"connection": { | ||
"bootstrap": "localhost:9092", | ||
"schema_registry_url": "http://localhost:8081", | ||
"schema_registry_config": {}, # schema_registry configs passed to underlying schema registry client | ||
"producer_config": {}, # extra producer configs passed to underlying kafka producer | ||
} | ||
} | ||
|
||
emitter = DatahubKafkaEmitter( | ||
KafkaEmitterConfig.parse_obj(kafka_config) | ||
) | ||
|
||
# Construct a dataset properties object | ||
dataset_properties = DatasetPropertiesClass(description="This table stored the canonical User profile", | ||
customProperties={ | ||
"governance": "ENABLED" | ||
}) | ||
|
||
# Construct a MetadataChangeProposalWrapper object. | ||
metadata_event = MetadataChangeProposalWrapper( | ||
entityType="dataset", | ||
changeType=ChangeTypeClass.UPSERT, | ||
entityUrn=builder.make_dataset_urn("bigquery", "my-project.my-dataset.user-table"), | ||
aspectName="datasetProperties", | ||
aspect=dataset_properties, | ||
) | ||
|
||
|
||
# Emit metadata! This is a non-blocking call | ||
emitter.emit( | ||
metadata_event, | ||
callback=lambda exc, message: print(f"Message sent to topic:{message.topic()}, partition:{message.partition()}, offset:{message.offset()}") if message else print(f"Failed to send with: {exc}") | ||
) | ||
|
||
#Send all pending events | ||
emitter.flush() | ||
``` | ||
|
||
### Emitter Code | ||
If you're interested in looking at the Kafka emitter code, it is available [here](./src/datahub/emitter/kafka_emitter.py) | ||
|
||
## Other Languages | ||
|
||
Emitter API-s are also supported for: | ||
- [Java](../metadata-integration/java/as-a-library.md) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
# Using transformers | ||
# Transformers | ||
|
||
## What’s a transformer? | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
# Java Emitter | ||
|
||
In some cases, you might want to construct Metadata events directly and use programmatic ways to emit that metadata to DataHub. Use-cases are typically push-based and include emitting metadata events from CI/CD pipelines, custom orchestrators etc. | ||
|
||
The [`io.acryl:datahub-client`](https://mvnrepository.com/artifact/io.acryl/datahub-client) Java package offers REST emitter API-s, which can be easily used to emit metadata from your JVM-based systems. For example, the Spark lineage integration uses the Java emitter to emit metadata events from Spark jobs. | ||
|
||
|
||
## Installation | ||
|
||
Follow the specific instructions for your build system to declare a dependency on the appropriate version of the package. | ||
|
||
**_Note_**: Check the [Maven repository](https://mvnrepository.com/artifact/io.acryl/datahub-client) for the latest version of the package before following the instructions below. | ||
|
||
### Gradle | ||
Add the following to your build.gradle. | ||
```gradle | ||
implementation 'io.acryl:datahub-client:0.0.1' | ||
``` | ||
### Maven | ||
Add the following to your `pom.xml`. | ||
```xml | ||
<!-- https://mvnrepository.com/artifact/io.acryl/datahub-client --> | ||
<dependency> | ||
<groupId>io.acryl</groupId> | ||
<artifactId>datahub-client</artifactId> | ||
<!-- replace with the latest version number --> | ||
<version>0.0.1</version> | ||
</dependency> | ||
``` | ||
|
||
## REST Emitter | ||
|
||
The REST emitter is a thin wrapper on top of the [`Apache HttpClient`](https://hc.apache.org/httpcomponents-client-4.5.x/index.html) library. It supports non-blocking emission of metadata and handles the details of JSON serialization of metadata aspects over the wire. | ||
|
||
Constructing a REST Emitter follows a lambda-based fluent builder pattern. The config parameters mirror the Python emitter [configuration](../../metadata-ingestion/sink_docs/datahub.md#config-details) for the most part. In addition, you can also customize the HttpClient that is constructed under the hood by passing in customizations to the HttpClient builder. | ||
```java | ||
import datahub.client.rest.RestEmitter; | ||
//... | ||
RestEmitter emitter = RestEmitter.create(b -> b | ||
.server("http://localhost:8080") | ||
//Auth token for Managed DataHub .token(AUTH_TOKEN_IF_NEEDED) | ||
//Override default timeout of 10 seconds .timeoutSec(OVERRIDE_DEFAULT_TIMEOUT_IN_SECONDS) | ||
//Add additional headers .extraHeaders(Collections.singletonMap("Session-token", "MY_SESSION")) | ||
// Customize HttpClient's connection ttl .customizeHttpAsyncClient(c -> c.setConnectionTimeToLive(30, TimeUnit.SECONDS)) | ||
); | ||
``` | ||
|
||
### Usage | ||
|
||
```java | ||
import com.linkedin.dataset.DatasetProperties; | ||
import com.linkedin.events.metadata.ChangeType; | ||
import datahub.event.MetadataChangeProposalWrapper; | ||
import datahub.client.rest.RestEmitter; | ||
import datahub.client.Callback; | ||
// ... followed by | ||
|
||
// Creates the emitter with the default coordinates and settings | ||
RestEmitter emitter = RestEmitter.createWithDefaults(); | ||
|
||
MetadataChangeProposalWrapper mcpw = MetadataChangeProposalWrapper.builder() | ||
.entityType("dataset") | ||
.changeType(ChangeType.UPSERT) | ||
.aspect(new DatasetProperties().setDescription("This is the canonical User profile dataset")) | ||
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.user-table,PROD)") | ||
.build(); | ||
|
||
// Blocking call using future | ||
Future<MetadataWriteResponse> requestFuture = emitter.emit(mcpw, null).get(); | ||
|
||
// Non-blocking using callback | ||
emitter.emit(mcpw, new Callback() { | ||
@Override | ||
public void onCompletion(MetadataWriteResponse response) { | ||
if (response.isSuccess()) { | ||
System.out.println(String.format("Successfully emitted metadata event for %s", mcpw.getEntityUrn())); | ||
} else { | ||
// Get the underlying http response | ||
HttpResponse httpResponse = (HttpResponse) response.getUnderlyingResponse(); | ||
System.out.println(String.format("Failed to emit metadata event for %s, aspect: %s with status code: %d", | ||
mcpw.getEntityUrn(), mcpw.getAspectName(), httpResponse.getStatusLine().getStatusCode())); | ||
// Print the server side exception if it was captured | ||
if (response.getServerException() != null) { | ||
System.out.println(String.format("Server side exception was %s", response.getServerException())); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Throwable exception) { | ||
System.out.println( | ||
String.format("Failed to emit metadata event for %s, aspect: %s due to %s", mcpw.getEntityUrn(), | ||
mcpw.getAspectName(), exception.getMessage())); | ||
} | ||
}); | ||
``` | ||
|
||
### Emitter Code | ||
|
||
If you're interested in looking at the REST emitter code, it is available [here](./datahub-client/src/main/java/datahub/client/rest/RestEmitter.java). | ||
|
||
## Kafka Emitter | ||
|
||
The Java package doesn't currently support a Kafka emitter, but this will be available shortly. | ||
|
||
|
||
## Other Languages | ||
|
||
Emitter API-s are also supported for: | ||
- [Python](../../metadata-ingestion/as-a-library.md) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
apply plugin: 'java' | ||
apply plugin: 'com.github.johnrengelman.shadow' | ||
|
||
dependencies { | ||
|
||
compile project(':metadata-models') | ||
compile externalDependency.httpAsyncClient | ||
compile externalDependency.jacksonDataBind | ||
compileOnly externalDependency.lombok | ||
annotationProcessor externalDependency.lombok | ||
testCompile externalDependency.mockito | ||
testCompile externalDependency.mockServer | ||
testCompile externalDependency.mockServerClient | ||
} | ||
|
||
test { | ||
useJUnit() | ||
} | ||
|
||
shadowJar { | ||
zip64=true | ||
classifier='' | ||
} | ||
|
||
assemble { | ||
dependsOn shadowJar | ||
} |
22 changes: 22 additions & 0 deletions
22
metadata-integration/java/datahub-client/src/main/java/datahub/client/Callback.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package datahub.client; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
|
||
public interface Callback { | ||
|
||
/** | ||
* Called when the client request has completed. | ||
* Completion does not imply success. Inspect the response object to understand if | ||
* this was a successfully processed request or not. | ||
* @param response | ||
*/ | ||
void onCompletion(@Nullable MetadataWriteResponse response); | ||
|
||
/** | ||
* Called when the client request has thrown an exception before completion. | ||
* @param exception | ||
*/ | ||
void onFailure(Throwable exception); | ||
|
||
} |
Oops, something went wrong.