diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
index 25f37f01d03f..aec3a97d7fb7 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
@@ -30,14 +30,17 @@ kubectl apply -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-cm.yml
# init shenyu sync
SYNC_ARRAY=("websocket" "http" "zookeeper" "etcd")
#SYNC_ARRAY=("websocket" "nacos")
-MIDDLEWARE_SYNC_ARRAY=("zookeeper" "etcd" "nacos")
+MIDDLEWARE_SYNC_ARRAY=("etcd" "nacos")
for sync in ${SYNC_ARRAY[@]}; do
echo -e "------------------\n"
kubectl apply -f "$SHENYU_TESTCASE_DIR"/k8s/shenyu-mysql.yml
-
+ kubectl apply -f "$SHENYU_TESTCASE_DIR"/k8s/shenyu-zookeeper.yml
kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml
-
sleep 30s
+ # rely on zookeeper
+ kubectl apply -f "${PRGDIR}"/shenyu-kafka.yml
+ sleep 20s
+
echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
# shellcheck disable=SC2199
# shellcheck disable=SC2076
@@ -71,10 +74,12 @@ for sync in ${SYNC_ARRAY[@]}; do
exit 1
fi
kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-mysql.yml
+ kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-zookeeper.yml
kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-admin-"${sync}".yml
kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml
kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml
+ kubectl delete -f "${PRGDIR}"/shenyu-kafka.yml
# shellcheck disable=SC2199
# shellcheck disable=SC2076
if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/shenyu-kafka.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/shenyu-kafka.yml
new file mode 100644
index 000000000000..a324c1b66797
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/shenyu-kafka.yml
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: shenyu-kafka
+ labels:
+ app: shenyu-kafka
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: shenyu-kafka
+ strategy: {}
+ template:
+ metadata:
+ labels:
+ app: shenyu-kafka
+ spec:
+ containers:
+ - image: apache/kafka:latest
+ name: shenyu-kafka
+ ports:
+ - containerPort: 9092
+ name: TCP
+ env:
+ - name: KAFKA_BROKER_ID
+ value: "0"
+ - name: KAFKA_ZOOKEEPER_CONNECT
+ value: "shenyu-zookeeper:2181/kafka"
+ - name: KAFKA_LISTENERS
+ value: "PLAINTEXT://:9092"
+ - name: KAFKA_ADVERTISED_LISTENERS
+ value: "PLAINTEXT://shenyu-kafka:9092"
+ restartPolicy: Always
+status: {}
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: shenyu-kafka
+ labels:
+ app: shenyu-kafka
+spec:
+ type: NodePort
+ selector:
+ app: shenyu-kafka
+ ports:
+ - port: 9092
+ targetPort: 9092
+ name: TCP
\ No newline at end of file
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
index 2a3141908e2b..6c7a5baba517 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
@@ -32,5 +32,10 @@
rocketmq-client
4.9.3
+
+ org.apache.kafka
+ kafka-clients
+ 3.4.0
+
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
index d42bfc3aab10..2971f96131ed 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
@@ -20,6 +20,10 @@
import com.google.common.collect.Lists;
import io.restassured.http.Method;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@@ -35,7 +39,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
+import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists;
@@ -45,10 +52,14 @@
public class DividePluginCases implements ShenYuScenarioProvider {
- private static final String NAMESERVER = "http://localhost:31876";
+ private static final String ROCKETMQ_NAMESERVER = "http://localhost:31876";
+
+ private static final String KAFKA_BROKER = "localhost:9092";
private static final String CONSUMERGROUP = "shenyu-plugin-logging-rocketmq";
+ private static final String KAFKA_CONSUMER = "shenyu-plugin-logging-kafka";
+
private static final String TOPIC = "shenyu-access-logging";
private static final String TEST = "/http/order/findById?id=123";
@@ -59,7 +70,8 @@ public class DividePluginCases implements ShenYuScenarioProvider {
public List get() {
return Lists.newArrayList(
testDivideHello(),
- testRocketMQHello()
+ testRocketMQHello(),
+ testKafkaHello()
);
}
@@ -103,7 +115,7 @@ private ShenYuScenarioSpec testRocketMQHello() {
Thread.sleep(1000 * 30);
request.request(Method.GET, "/http/order/findById?id=23");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMERGROUP);
- consumer.setNamesrvAddr(NAMESERVER);
+ consumer.setNamesrvAddr(ROCKETMQ_NAMESERVER);
consumer.subscribe(TOPIC, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {
LOG.info("Msg:{}", msgs);
@@ -132,4 +144,65 @@ private ShenYuScenarioSpec testRocketMQHello() {
// .deleteWaiting(notExists(TEST)).build())
.build();
}
+
+ private ShenYuScenarioSpec testKafkaHello() {
+ return ShenYuScenarioSpec.builder()
+ .name("testKafkaHello")
+ .beforeEachSpec(
+ ShenYuBeforeEachSpec.builder()
+ .addSelectorAndRule(
+ newSelectorBuilder("selector", Plugin.LOGGING_ROCKETMQ)
+ .name("1")
+ .matchMode(MatchMode.OR)
+ .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http"))
+ .build(),
+ newRuleBuilder("rule")
+ .name("1")
+ .matchMode(MatchMode.OR)
+ .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http"))
+ .build()
+ )
+ .checker(exists(TEST))
+ .build()
+ )
+ .caseSpec(
+ ShenYuCaseSpec.builder()
+ .add(request -> {
+ AtomicBoolean isLog = new AtomicBoolean(false);
+ try {
+ Thread.sleep(1000 * 30);
+ request.request(Method.GET, "/http/order/findById?id=23");
+ KafkaConsumer consumer = defaultKafkaConsumer();
+ consumer.subscribe(Collections.singletonList(TOPIC));
+ LOG.info("kafka consumer start, isLog: isLog.get():{}", isLog.get());
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
+ LOG.info("kafka consumer fetch count: {}", records.count());
+ records.forEach(record -> {
+ String value = record.value();
+ LOG.info("kafka msg:{}", value);
+ if (value.contains("/http/order/findById?id=23")) {
+ isLog.set(true);
+ }
+ });
+ consumer.commitSync();
+ LOG.info("isLog.get():{}", isLog.get());
+ Assertions.assertTrue(isLog.get());
+ } catch (Exception e) {
+ LOG.error("error", e);
+ Assertions.assertTrue(isLog.get());
+ }
+ })
+ .build()
+ )
+ .build();
+ }
+
+ private KafkaConsumer defaultKafkaConsumer() {
+ Properties consumerProperties = new Properties();
+ consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);
+ consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONSUMER);
+ return new KafkaConsumer<>(consumerProperties);
+ }
}
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
index 309ff66bc58e..b6104e5d35eb 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
@@ -101,6 +101,11 @@ void setup(final AdminClient adminClient, final GatewayClient gatewayClient) thr
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData, gatewayClient::getMetaDataCache, adminClient);
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules, gatewayClient::getRuleCache, adminClient);
LOG.info("start loggingRocketMQ plugin");
+ enableLoggingRocketMQPlugin(adminClient, gatewayClient);
+ enableLoggingKafkaPlugin(adminClient, gatewayClient);
+ }
+
+ private void enableLoggingRocketMQPlugin(AdminClient adminClient, GatewayClient gatewayClient) throws Exception {
MultiValueMap formData = new LinkedMultiValueMap<>();
formData.add("id", "29");
formData.add("name", "loggingRocketMQ");
@@ -112,6 +117,18 @@ void setup(final AdminClient adminClient, final GatewayClient gatewayClient) thr
WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.rocketmq");
}
+ private void enableLoggingKafkaPlugin(AdminClient adminClient, GatewayClient gatewayClient) throws Exception {
+ MultiValueMap formData = new LinkedMultiValueMap<>();
+ formData.add("id", "33");
+ formData.add("name", "loggingKafka");
+ formData.add("enabled", "true");
+ formData.add("role", "Logging");
+ formData.add("sort", "180");
+ formData.add("config", "{\"topic\":\"shenyu-access-logging\", \"namesrvAddr\": \"localhost:9092\"}");
+ adminClient.changePluginStatus("29", formData);
+ WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.kafka");
+ }
+
@ShenYuScenario(provider = DividePluginCases.class)
void testDivide(final GatewayClient gateway, final CaseSpec spec) {
spec.getVerifiers().forEach(verifier -> verifier.verify(gateway.getHttpRequesterSupplier().get()));