-
Notifications
You must be signed in to change notification settings - Fork 908
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
109 changed files
with
12,923 additions
and
1,143 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
<!-- | ||
Suggested PR template: Fill/delete/add sections as needed. Optionally delete any commented block. | ||
--> | ||
What | ||
---- | ||
<!-- | ||
Briefly describe **what** you have changed and **why**. | ||
Optionally include implementation strategy. | ||
--> | ||
|
||
Checklist | ||
------------------ | ||
- [ ] Contains customer facing changes? Including API/behavior changes <!-- This can help identify if it has introduced any breaking changes --> | ||
- [ ] Did you add sufficient unit test and/or integration test coverage for this PR? | ||
- If not, please explain why it is not required | ||
|
||
References | ||
---------- | ||
JIRA: | ||
<!-- | ||
Copy&paste links: to Jira ticket, other PRs, issues, Slack conversations... | ||
For code bumps: link to PR, tag or GitHub `/compare/master...master` | ||
--> | ||
|
||
Test & Review | ||
------------ | ||
<!-- | ||
Has it been tested? how? | ||
Copy&paste any handy instructions, steps or requirements that can save time to the reviewer or any reader. | ||
--> | ||
|
||
Open questions / Follow-ups | ||
-------------------------- | ||
<!-- | ||
Optional: anything open to discussion for the reviewer, out of scope, or follow-ups. | ||
--> | ||
|
||
<!-- | ||
Review stakeholders | ||
------------------ | ||
<!-- | ||
Optional: mention stakeholders or if special context that is required to review. | ||
--> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,3 +29,4 @@ tmp-KafkaCluster | |
.venv | ||
venv_test | ||
venv_examples | ||
*Zone.Identifier |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Copyright 2024 Confluent Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
|
||
# A simple example demonstrating use of AvroDeserializer. | ||
|
||
import argparse | ||
|
||
from confluent_kafka.schema_registry.rules.encryption.encrypt_executor import \ | ||
FieldEncryptionExecutor | ||
|
||
from confluent_kafka.schema_registry.rules.encryption.localkms.local_driver import \ | ||
LocalKmsDriver | ||
|
||
from confluent_kafka.schema_registry.rules.encryption.hcvault.hcvault_driver import \ | ||
HcVaultKmsDriver | ||
|
||
from confluent_kafka.schema_registry.rules.encryption.gcpkms.gcp_driver import \ | ||
GcpKmsDriver | ||
|
||
from confluent_kafka.schema_registry.rules.encryption.azurekms.azure_driver import \ | ||
AzureKmsDriver | ||
|
||
from confluent_kafka.schema_registry.rules.encryption.awskms.aws_driver import \ | ||
AwsKmsDriver | ||
|
||
from confluent_kafka import Consumer | ||
from confluent_kafka.serialization import SerializationContext, MessageField | ||
from confluent_kafka.schema_registry import SchemaRegistryClient | ||
from confluent_kafka.schema_registry.avro import AvroDeserializer | ||
|
||
|
||
class User(object): | ||
""" | ||
User record | ||
Args: | ||
name (str): User's name | ||
favorite_number (int): User's favorite number | ||
favorite_color (str): User's favorite color | ||
""" | ||
|
||
def __init__(self, name=None, favorite_number=None, favorite_color=None): | ||
self.name = name | ||
self.favorite_number = favorite_number | ||
self.favorite_color = favorite_color | ||
|
||
|
||
def dict_to_user(obj, ctx): | ||
""" | ||
Converts object literal(dict) to a User instance. | ||
Args: | ||
obj (dict): Object literal(dict) | ||
ctx (SerializationContext): Metadata pertaining to the serialization | ||
operation. | ||
""" | ||
|
||
if obj is None: | ||
return None | ||
|
||
return User(name=obj['name'], | ||
favorite_number=obj['favorite_number'], | ||
favorite_color=obj['favorite_color']) | ||
|
||
|
||
def main(args): | ||
# Register the KMS drivers and the field-level encryption executor | ||
AwsKmsDriver.register() | ||
AzureKmsDriver.register() | ||
GcpKmsDriver.register() | ||
HcVaultKmsDriver.register() | ||
LocalKmsDriver.register() | ||
FieldEncryptionExecutor.register() | ||
|
||
topic = args.topic | ||
|
||
# When using Data Contract rules, a schema should not be passed to the | ||
# AvroDeserializer. The schema is fetched from the Schema Registry. | ||
schema_str = None | ||
|
||
sr_conf = {'url': args.schema_registry} | ||
schema_registry_client = SchemaRegistryClient(sr_conf) | ||
|
||
avro_deserializer = AvroDeserializer(schema_registry_client, | ||
schema_str, | ||
dict_to_user) | ||
|
||
consumer_conf = {'bootstrap.servers': args.bootstrap_servers, | ||
'group.id': args.group, | ||
'auto.offset.reset': "earliest"} | ||
|
||
consumer = Consumer(consumer_conf) | ||
consumer.subscribe([topic]) | ||
|
||
while True: | ||
try: | ||
# SIGINT can't be handled when polling, limit timeout to 1 second. | ||
msg = consumer.poll(1.0) | ||
if msg is None: | ||
continue | ||
|
||
user = avro_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE)) | ||
if user is not None: | ||
print("User record {}: name: {}\n" | ||
"\tfavorite_number: {}\n" | ||
"\tfavorite_color: {}\n" | ||
.format(msg.key(), user.name, | ||
user.favorite_number, | ||
user.favorite_color)) | ||
except KeyboardInterrupt: | ||
break | ||
|
||
consumer.close() | ||
|
||
|
||
if __name__ == '__main__': | ||
parser = argparse.ArgumentParser(description="AvroDeserializer example") | ||
parser.add_argument('-b', dest="bootstrap_servers", required=True, | ||
help="Bootstrap broker(s) (host[:port])") | ||
parser.add_argument('-s', dest="schema_registry", required=True, | ||
help="Schema Registry (http(s)://host[:port]") | ||
parser.add_argument('-t', dest="topic", default="example_serde_avro", | ||
help="Topic name") | ||
parser.add_argument('-g', dest="group", default="example_serde_avro", | ||
help="Consumer group") | ||
|
||
main(parser.parse_args()) |
Oops, something went wrong.