From dfe309acebb6131ebee04e28971b0c34abe48c2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mirko=20K=C3=A4mpf?= Date: Mon, 2 Sep 2024 20:59:25 +0200 Subject: [PATCH] finished WordCount with KafkaSource and KafkaSink --- .../org/apache/wayang/api/DataQuanta.scala | 4 +- wayang-applications/bin/bootstrap.sh | 52 ++++++++ wayang-applications/bin/cleanup.sh | 25 ++++ wayang-applications/bin/run_wordcount.sh | 2 +- .../bin/run_wordcount_kafka.sh | 2 +- wayang-applications/pom.xml | 4 +- .../applications/WordCountOnKafkaTopic.java | 126 ++++++++++++++++++ .../basic/operators/KafkaTopicSink.java | 1 + .../operators/SparkKafkaTopicSource.java | 2 +- 9 files changed, 212 insertions(+), 6 deletions(-) create mode 100755 wayang-applications/bin/bootstrap.sh create mode 100755 wayang-applications/bin/cleanup.sh create mode 100644 wayang-applications/src/main/java/org/apache/wayang/applications/WordCountOnKafkaTopic.java diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala index 80ae6742e..0dd247a3b 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala @@ -966,7 +966,8 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I topicName, new TransformationDescriptor(formatterUdf, basicDataUnitType[Out], basicDataUnitType[String], udfLoad) ) - sink.setName(s"Write to KafkaTopic $topicName") + sink.setName(s"*#-> Write to KafkaTopic $topicName") + println(s"*#-> Write to KafkaTopic $topicName") this.connectTo(sink, 0) // Do the execution. @@ -991,6 +992,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I new TransformationDescriptor(formatterUdf, basicDataUnitType[Out], basicDataUnitType[String], udfLoad) ) sink.setName(s"Write to $url") + this.connectTo(sink, 0) // Do the execution. diff --git a/wayang-applications/bin/bootstrap.sh b/wayang-applications/bin/bootstrap.sh new file mode 100755 index 000000000..cdf2a4d79 --- /dev/null +++ b/wayang-applications/bin/bootstrap.sh @@ -0,0 +1,52 @@ + +#brew install confluentinc/tap/cli +#brew install jq +#brew install git-lfs + +export topic_l1_a=region_emea_counts +export topic_l1_b=region_apac_counts +export topic_l1_c=region_uswest_counts +export topic_l2_a=global_contribution +export topic_l2_b=global_averages + +#confluent login +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +confluent kafka topic delete topic_l1_a --cluster lkc-m2kpj2 +confluent kafka topic delete topic_l1_b --cluster lkc-m2kpj2 +confluent kafka topic delete topic_l1_c --cluster lkc-m2kpj2 +confluent kafka topic delete topic_l2_a --cluster lkc-m2kpj2 +confluent kafka topic delete topic_l2_b --cluster lkc-m2kpj2 + +confluent kafka topic create topic_l1_a --cluster lkc-m2kpj2 +confluent kafka topic create topic_l1_b --cluster lkc-m2kpj2 +confluent kafka topic create topic_l1_c --cluster lkc-m2kpj2 +confluent kafka topic create topic_l2_a --cluster lkc-m2kpj2 +confluent kafka topic create topic_l2_b --cluster lkc-m2kpj2 + +###################################################################################################### +# https://docs.confluent.io/cloud/current/sr/schema_registry_ccloud_tutorial.html +export SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="ZZUZ3HASNNFGE2DF:EQNB0QpzDd868qlW0Nz49anodp7JjeDkoCaZelCJiUhfTX7BhuRPhNlDA/swx/Fa" +export SCHEMA_REGISTRY_URL="https://psrc-lo5k9.eu-central-1.aws.confluent.cloud" + +confluent kafka topic list --cluster lkc-m2kpj2 + +### +# List schemas +curl --silent -X GET -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO $SCHEMA_REGISTRY_URL/subjects | jq . diff --git a/wayang-applications/bin/cleanup.sh b/wayang-applications/bin/cleanup.sh new file mode 100755 index 000000000..a81a432c1 --- /dev/null +++ b/wayang-applications/bin/cleanup.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +confluent kafka topic delete topic_l1_a --cluster lkc-m2kpj2 +confluent kafka topic delete topic_l1_b --cluster lkc-m2kpj2 +confluent kafka topic delete topic_l1_c --cluster lkc-m2kpj2 +confluent kafka topic delete topic_l2_a --cluster lkc-m2kpj2 +confluent kafka topic delete topic_l2_b --cluster lkc-m2kpj2 + +confluent kafka topic list --cluster lkc-m2kpj2 \ No newline at end of file diff --git a/wayang-applications/bin/run_wordcount.sh b/wayang-applications/bin/run_wordcount.sh index 87b026a9c..9f1a26e89 100755 --- a/wayang-applications/bin/run_wordcount.sh +++ b/wayang-applications/bin/run_wordcount.sh @@ -21,7 +21,7 @@ export JAVA_HOME=/Users/kamir/.sdkman/candidates/java/current cd .. cd .. -#mvn clean compile package install -pl :wayang-assembly -Pdistribution -DskipTests +mvn clean compile package install -pl :wayang-assembly -Pdistribution -DskipTests cd wayang-applications mvn compile package install -DskipTests diff --git a/wayang-applications/bin/run_wordcount_kafka.sh b/wayang-applications/bin/run_wordcount_kafka.sh index 87b026a9c..89b2ff1b0 100755 --- a/wayang-applications/bin/run_wordcount_kafka.sh +++ b/wayang-applications/bin/run_wordcount_kafka.sh @@ -28,4 +28,4 @@ mvn compile package install -DskipTests cd .. -source ./.env.sh; bin/wayang-submit org.apache.wayang.applications.WordCount java file://$(pwd)/wayang-applications/data/case-study/DATA_REPO_001/README.md +source ./.env.sh; bin/wayang-submit org.apache.wayang.applications.WordCountOnKafkaTopic \ No newline at end of file diff --git a/wayang-applications/pom.xml b/wayang-applications/pom.xml index 756b6b203..3dab581ea 100644 --- a/wayang-applications/pom.xml +++ b/wayang-applications/pom.xml @@ -83,8 +83,8 @@ org.apache.wayang wayang-api-scala-java_2.12 ${WAYANG_VERSION} - - + system + /Users/kamir/GITHUB.merge/incubator-wayang/wayang-assembly/target/apache-wayang-assembly-0.7.1-incubating-dist/wayang-0.7.1/jars/wayang-api-scala-java-0.7.1.jar