diff --git a/.github/workflows/e2e-k8s.yml b/.github/workflows/e2e-k8s.yml
index a9577415aa65..62739cb4b364 100644
--- a/.github/workflows/e2e-k8s.yml
+++ b/.github/workflows/e2e-k8s.yml
@@ -218,6 +218,10 @@ jobs:
script: e2e-cluster-jdbc-compose
- case: shenyu-e2e-case-cluster
script: e2e-cluster-zookeeper-compose
+ - case: shenyu-e2e-case-logging-rocketmq
+ script: e2e-logging-rocketmq-compose
+ - case: shenyu-e2e-case-logging-kafka
+ script: e2e-logging-kafka-compose
steps:
- uses: actions/checkout@v2
diff --git a/db/init/mysql/schema.sql b/db/init/mysql/schema.sql
index d744628f1123..04f2fa3be88c 100644
--- a/db/init/mysql/schema.sql
+++ b/db/init/mysql/schema.sql
@@ -912,7 +912,7 @@ INSERT INTO `plugin` VALUES ('6', 'dubbo', '{\"register\":\"zookeeper://localhos
INSERT INTO `plugin` VALUES ('8', 'springCloud', NULL, 'Proxy', 200, 0, '2022-05-25 18:02:53', '2022-05-25 18:02:53',null);
INSERT INTO `plugin` VALUES ('9', 'hystrix', NULL, 'FaultTolerance', 130, 0, '2022-05-25 18:02:53', '2022-05-25 18:02:53',null);
INSERT INTO `plugin` VALUES ('32', 'loggingElasticSearch','{\"host\":\"localhost\", \"port\": \"9200\"}', 'Logging', 190, 0, '2022-06-19 22:00:00', '2022-06-19 22:00:00',null);
-INSERT INTO `plugin` VALUES ('33', 'loggingKafka','{\"host\":\"localhost\", \"port\": \"9092\"}', 'Logging', 180, 0, '2022-07-04 22:00:00', '2022-07-02 22:00:00',null);
+INSERT INTO `plugin` VALUES ('33', 'loggingKafka','{\"topic\":\"shenyu-access-logging\",\"namesrvAddr\":\"http://localhost:9092\",\"sampleRate\":\"1\",\"maxResponseBody\":524288,\"maxRequestBody\":524288,\"compressAlg\":\"none\"}', 'Logging', 180, 0, '2022-07-04 22:00:00', '2022-07-02 22:00:00',null);
INSERT INTO `plugin` VALUES ('34', 'loggingAliyunSls','{\"projectName\": \"shenyu\", \"logStoreName\": \"shenyu-logstore\", \"topic\": \"shenyu-topic\"}', 'Logging', 175, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00',null);
INSERT INTO `plugin` VALUES ('35', 'loggingPulsar', '{\"topic":\"shenyu-access-logging\", \"serviceUrl\": \"pulsar://localhost:6650\"}', 'Logging', 185, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00',null);
INSERT INTO `plugin` VALUES ('36', 'loggingTencentCls','{\"endpoint\": \"ap-guangzhou.cls.tencentcs.com\", \"topic\": \"shenyu-topic\"}', 'Logging', 176, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00',null);
diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
index 9bd5026202c7..796164adcf02 100644
--- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
+++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
@@ -22,7 +22,7 @@
import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration;
/**
- * shenyu admin start.
+ * shenyu admin startShenyuAdminBootstrap.
*/
@SpringBootApplication(exclude = {LdapAutoConfiguration.class})
public class ShenyuAdminBootstrap {
@@ -30,7 +30,7 @@ public class ShenyuAdminBootstrap {
/**
* Main entrance.
*
- * @param args startup arguments
+ * @param args startup arguments.
*/
public static void main(final String[] args) {
SpringApplication.run(ShenyuAdminBootstrap.class, args);
diff --git a/shenyu-e2e/pom.xml b/shenyu-e2e/pom.xml
index 98fcecf3a073..e524324212c7 100644
--- a/shenyu-e2e/pom.xml
+++ b/shenyu-e2e/pom.xml
@@ -58,6 +58,7 @@
32.0.0-jre
4.4
1.5.1
+ 3.7.1
diff --git a/shenyu-e2e/shenyu-e2e-case/pom.xml b/shenyu-e2e/shenyu-e2e-case/pom.xml
index 901a5c593615..4ca3cc3d3127 100644
--- a/shenyu-e2e/shenyu-e2e-case/pom.xml
+++ b/shenyu-e2e/shenyu-e2e-case/pom.xml
@@ -32,6 +32,8 @@
shenyu-e2e-case-cluster
shenyu-e2e-case-storage
shenyu-e2e-case-http
+ shenyu-e2e-case-logging-kafka
+ shenyu-e2e-case-logging-rocketmq
shenyu-e2e-case-spring-cloud
shenyu-e2e-case-apache-dubbo
shenyu-e2e-case-sofa
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
index a0b102413c3d..446dae7dfb93 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
@@ -37,7 +37,6 @@ for sync in "${SYNC_ARRAY[@]}"; do
sleep 30s
sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31095/actuator/health
sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31195/actuator/health
- docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml up -d --quiet-pull
docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml up -d --quiet-pull
sleep 30s
sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31189/actuator/health
@@ -55,9 +54,6 @@ for sync in "${SYNC_ARRAY[@]}"; do
echo "shenyu-bootstrap log:"
echo "------------------"
docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml logs shenyu-bootstrap
- echo "shenyu-rocketmq log:"
- echo "------------------"
- docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml logs
echo "shenyu-examples-http log:"
echo "------------------"
docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml logs shenyu-examples-http
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
index 25f37f01d03f..95d7931c0be5 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
@@ -37,6 +37,8 @@ for sync in ${SYNC_ARRAY[@]}; do
kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31877/actuator/health
+
sleep 30s
echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
# shellcheck disable=SC2199
@@ -75,6 +77,7 @@ for sync in ${SYNC_ARRAY[@]}; do
kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml
kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml
+
# shellcheck disable=SC2199
# shellcheck disable=SC2076
if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
index 2a3141908e2b..f326c8600958 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
@@ -25,12 +25,4 @@
4.0.0
shenyu-e2e-case-http
-
-
-
- org.apache.rocketmq
- rocketmq-client
- 4.9.3
-
-
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
index d42bfc3aab10..b6e3f2aab051 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
@@ -18,48 +18,22 @@
package org.apache.shenyu.e2e.testcase.http;
import com.google.common.collect.Lists;
-import io.restassured.http.Method;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.shenyu.e2e.engine.scenario.ShenYuScenarioProvider;
import org.apache.shenyu.e2e.engine.scenario.specification.ScenarioSpec;
import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuBeforeEachSpec;
import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuCaseSpec;
import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuScenarioSpec;
-import org.apache.shenyu.e2e.model.MatchMode;
-import org.apache.shenyu.e2e.model.Plugin;
-import org.apache.shenyu.e2e.model.data.Condition;
-import org.junit.jupiter.api.Assertions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists;
-import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newConditions;
-import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newRuleBuilder;
-import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newSelectorBuilder;
public class DividePluginCases implements ShenYuScenarioProvider {
- private static final String NAMESERVER = "http://localhost:31876";
-
- private static final String CONSUMERGROUP = "shenyu-plugin-logging-rocketmq";
-
- private static final String TOPIC = "shenyu-access-logging";
-
- private static final String TEST = "/http/order/findById?id=123";
-
- private static final Logger LOG = LoggerFactory.getLogger(DividePluginCases.class);
-
@Override
public List get() {
return Lists.newArrayList(
- testDivideHello(),
- testRocketMQHello()
+ testDivideHello()
);
}
@@ -74,62 +48,4 @@ private ShenYuScenarioSpec testDivideHello() {
.build())
.build();
}
-
- private ShenYuScenarioSpec testRocketMQHello() {
- return ShenYuScenarioSpec.builder()
- .name("testRocketMQHello")
- .beforeEachSpec(
- ShenYuBeforeEachSpec.builder()
- .addSelectorAndRule(
- newSelectorBuilder("selector", Plugin.LOGGING_ROCKETMQ)
- .name("1")
- .matchMode(MatchMode.OR)
- .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http"))
- .build(),
- newRuleBuilder("rule")
- .name("1")
- .matchMode(MatchMode.OR)
- .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http"))
- .build()
- )
- .checker(exists(TEST))
- .build()
- )
- .caseSpec(
- ShenYuCaseSpec.builder()
- .add(request -> {
- AtomicBoolean isLog = new AtomicBoolean(false);
- try {
- Thread.sleep(1000 * 30);
- request.request(Method.GET, "/http/order/findById?id=23");
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMERGROUP);
- consumer.setNamesrvAddr(NAMESERVER);
- consumer.subscribe(TOPIC, "*");
- consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {
- LOG.info("Msg:{}", msgs);
- if (CollectionUtils.isNotEmpty(msgs)) {
- msgs.forEach(e -> {
- if (new String(e.getBody()).contains("/http/order/findById?id=23")) {
- isLog.set(true);
- }
- });
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- LOG.info("consumer.start ; isLog.get():{}", isLog.get());
- consumer.start();
- Thread.sleep(1000 * 30);
- LOG.info("isLog.get():{}", isLog.get());
- Assertions.assertTrue(isLog.get());
- } catch (Exception e) {
- LOG.error("error", e);
- Assertions.assertTrue(isLog.get());
- }
- })
- .build()
- )
-// .afterEachSpec(ShenYuAfterEachSpec.builder()
-// .deleteWaiting(notExists(TEST)).build())
- .build();
- }
}
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
index 1b18091e7c47..b66fb619b3f4 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
@@ -21,7 +21,6 @@
import org.apache.shenyu.e2e.client.WaitDataSync;
import org.apache.shenyu.e2e.client.admin.AdminClient;
import org.apache.shenyu.e2e.client.gateway.GatewayClient;
-import org.apache.shenyu.e2e.constant.Constants;
import org.apache.shenyu.e2e.engine.annotation.ShenYuScenario;
import org.apache.shenyu.e2e.engine.annotation.ShenYuTest;
import org.apache.shenyu.e2e.engine.scenario.specification.BeforeEachSpec;
@@ -35,9 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import static org.apache.shenyu.e2e.constant.Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID;
@@ -98,23 +95,13 @@ void before(final AdminClient client, final GatewayClient gateway, final BeforeE
// selectorIds = Lists.newArrayList();
// }
+
@BeforeAll
void setup(final AdminClient adminClient, final GatewayClient gatewayClient) throws Exception {
adminClient.login();
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllSelectors, gatewayClient::getSelectorCache, adminClient);
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData, gatewayClient::getMetaDataCache, adminClient);
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules, gatewayClient::getRuleCache, adminClient);
- LOG.info("start loggingRocketMQ plugin");
- Map reqBody = new HashMap<>();
- reqBody.put("pluginId", "29");
- reqBody.put("name", "loggingRocketMQ");
- reqBody.put("enabled", "true");
- reqBody.put("role", "Logging");
- reqBody.put("sort", "170");
- reqBody.put("namespaceId", Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID);
- reqBody.put("config", "{\"topic\":\"shenyu-access-logging\", \"namesrvAddr\": \"rocketmq-dialevoneid:9876\",\"producerGroup\":\"shenyu-plugin-logging-rocketmq\"}");
- adminClient.changePluginStatus("1801816010882822166", reqBody);
- WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.rocketmq");
}
@ShenYuScenario(provider = DividePluginCases.class)
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/script/e2e-logging-kafka-compose.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/script/e2e-logging-kafka-compose.sh
new file mode 100644
index 000000000000..bef0307c66cd
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/script/e2e-logging-kafka-compose.sh
@@ -0,0 +1,71 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# init kubernetes for mysql
+SHENYU_TESTCASE_DIR=$(dirname "$(dirname "$(dirname "$(dirname "$0")")")")
+bash "${SHENYU_TESTCASE_DIR}"/k8s/script/storage/storage_init_mysql.sh
+
+# init ip
+export HOST_IP=$(hostname -I | awk '{print $1}')
+
+# init register center
+CUR_PATH=$(readlink -f "$(dirname "$0")")
+PRGDIR=$(dirname "$CUR_PATH")
+# init shenyu sync
+SYNC_ARRAY=("websocket" "http" "zookeeper" "etcd")
+#SYNC_ARRAY=("websocket" "nacos")
+#MIDDLEWARE_SYNC_ARRAY=("zookeeper" "etcd" "nacos")
+
+docker network create -d bridge shenyu
+
+for sync in "${SYNC_ARRAY[@]}"; do
+ echo -e "------------------\n"
+ docker compose -f "${PRGDIR}"/shenyu-kafka-compose.yml up -d --quiet-pull
+ sleep 30s
+ echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml "
+ docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml up -d --quiet-pull
+ sleep 30s
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31195/actuator/health
+ docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml up -d --quiet-pull
+ sleep 30s
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31095/actuator/health
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31189/actuator/health
+ sleep 10s
+ docker ps -a
+ ## run e2e-test
+ ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-logging-kafka -am test
+ # shellcheck disable=SC2181
+ if (($?)); then
+ echo "${sync}-sync-e2e-test failed"
+ echo "------------------"
+ echo "shenyu-admin log:"
+ echo "------------------"
+ docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml logs shenyu-admin
+ echo "shenyu-bootstrap log:"
+ echo "------------------"
+ docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml logs shenyu-bootstrap
+ echo "shenyu-kafka log:"
+ echo "------------------"
+ docker compose -f "${PRGDIR}"/shenyu-kafka-compose.yml logs
+ exit 1
+ fi
+ docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml down
+ docker compose -f "${PRGDIR}"/shenyu-kafka-compose.yml down
+ docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml down
+ echo "[Remove ${sync} synchronous] delete shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml "
+done
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-examples-http-compose.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-examples-http-compose.yml
new file mode 100644
index 000000000000..ce8c8a7e7b22
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-examples-http-compose.yml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.9'
+
+services:
+ shenyu-examples-http:
+ image: shenyu-examples-http:latest
+ container_name: shenyu-examples-http
+ environment:
+ - shenyu.register.serverLists=http://shenyu-admin:9095
+ ports:
+ - "31189:8189"
+ healthcheck:
+ test: [ "CMD-SHELL", "wget -q -O - http://localhost:8189/actuator/health | grep UP || exit 1" ]
+ interval: 10s
+ timeout: 2s
+ retries: 3
+ start_period: 10s
+ restart: always
+ networks:
+ - shenyu
+
+networks:
+ shenyu:
+ name: shenyu
+ driver: bridge
+ external: true
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml
new file mode 100644
index 000000000000..a29451395b6f
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+services:
+ shenyu-zk:
+ container_name: shenyu-zk
+ image: zookeeper:latest
+ restart: always
+ ports:
+ - "2181:2181"
+ networks:
+ - shenyu
+
+ shenyu-kafka:
+ container_name: shenyu-kafka
+ image: confluentinc/cp-kafka:latest
+ restart: always
+ depends_on:
+ - shenyu-zk
+ networks:
+ - shenyu
+ ports:
+ - "9092:9092"
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: "shenyu-zk:2181"
+ listeners: PLAINTEXT://0:0:0:0:9092
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+
+ shenyu-examples-http:
+ deploy:
+ resources:
+ limits:
+ memory: 2048M
+ container_name: shenyu-examples-http
+ image: shenyu-examples-http:latest
+ restart: always
+ environment:
+ - shenyu.register.serverLists=http://shenyu-admin:9095
+ healthcheck:
+ test: [ "CMD", "wget", "http://shenyu-examples-http:8189/test/path/123?name=tom" ]
+ timeout: 2s
+ retries: 30
+ ports:
+ - "8189:8189"
+ networks:
+ - shenyu
+
+networks:
+ shenyu:
+ name: shenyu
+ driver: host
+ external: true
\ No newline at end of file
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/script/e2e-http-sync.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/script/e2e-http-sync.sh
new file mode 100644
index 000000000000..2ca65f63ddaf
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/script/e2e-http-sync.sh
@@ -0,0 +1,81 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+docker save shenyu-examples-http:latest | sudo k3s ctr images import -
+
+# init kubernetes for mysql
+SHENYU_TESTCASE_DIR=$(dirname "$(dirname "$(dirname "$(dirname "$0")")")")
+bash "${SHENYU_TESTCASE_DIR}"/k8s/script/storage/storage_init_mysql.sh
+
+# init register center
+CUR_PATH=$(readlink -f "$(dirname "$0")")
+PRGDIR=$(dirname "$CUR_PATH")
+kubectl apply -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-cm.yml
+
+# init shenyu sync
+SYNC_ARRAY=("websocket" "http" "zookeeper" "etcd")
+#SYNC_ARRAY=("websocket" "nacos")
+MIDDLEWARE_SYNC_ARRAY=("zookeeper" "etcd" "nacos")
+for sync in ${SYNC_ARRAY[@]}; do
+ echo -e "------------------\n"
+ kubectl apply -f "${PRGDIR}"/shenyu-kafka.yml
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:9092/actuator/health
+
+ sleep 30s
+ echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
+ # shellcheck disable=SC2199
+ # shellcheck disable=SC2076
+ # shellcheck disable=SC2154
+ if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
+ kubectl apply -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-"${sync}".yml
+ sleep 10s
+ fi
+ kubectl apply -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-admin-"${sync}".yml
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31095/actuator/health
+ kubectl apply -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31195/actuator/health
+ sleep 10s
+ kubectl get pod -o wide
+
+ kubectl logs "$(kubectl get pod -o wide | grep shenyu-admin | awk '{print $1}')"
+
+ ## run e2e-test
+ ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-logging-kafka -am test
+ # shellcheck disable=SC2181
+ if (($?)); then
+ echo "${sync}-sync-e2e-test failed"
+ echo "shenyu-admin log:"
+ echo "------------------"
+ kubectl logs "$(kubectl get pod -o wide | grep shenyu-admin | awk '{print $1}')"
+ echo "shenyu-bootstrap log:"
+ echo "------------------"
+ kubectl logs "$(kubectl get pod -o wide | grep shenyu-bootstrap | awk '{print $1}')"
+ exit 1
+ fi
+ kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-mysql.yml
+ kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-admin-"${sync}".yml
+ kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
+ kubectl delete -f "${PRGDIR}"/shenyu-kafka.yml
+
+ # shellcheck disable=SC2199
+ # shellcheck disable=SC2076
+ if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
+ kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-"${sync}".yml
+ fi
+ echo "[Remove ${sync} synchronous] delete shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
+done
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/shenyu-kafka.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/shenyu-kafka.yml
new file mode 100644
index 000000000000..9c5c05fad911
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/shenyu-kafka.yml
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: zookeeper
+ namespace: default
+ labels:
+ app: zookeeper
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: zookeeper
+ template:
+ metadata:
+ labels:
+ app: zookeeper
+ spec:
+ containers:
+ - name: zookeeper
+ image: zookeeper:3.7
+ ports:
+ - containerPort: 2181
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: zookeeper
+ namespace: default
+ labels:
+ app: zookeeper
+spec:
+ ports:
+ - port: 2181
+ name: client
+ selector:
+ app: zookeeper
+
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: kafka
+ namespace: default
+ labels:
+ app: kafka
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: kafka
+ template:
+ metadata:
+ labels:
+ app: kafka
+ spec:
+ containers:
+ - name: kafka
+ image: bitnami/kafka:3.6.2
+ env:
+ - name: KAFKA_ADVERTISED_LISTENERS
+ value: PLAINTEXT://kafka:9092
+ - name: KAFKA_ZOOKEEPER_CONNECT
+ value: zookeeper:2181
+ ports:
+ - containerPort: 9092
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: kafka
+ namespace: default
+ labels:
+ app: kafka
+spec:
+ type: NodePort
+ ports:
+ - port: 9092
+ name: client
+ protocol: TCP
+ targetPort: 9092
+ nodePort: 9092
+ selector:
+ app: kafka
\ No newline at end of file
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/pom.xml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/pom.xml
new file mode 100644
index 000000000000..90ca689ce6fb
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/pom.xml
@@ -0,0 +1,36 @@
+
+
+
+
+ org.apache.shenyu
+ shenyu-e2e-case
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+ shenyu-e2e-case-logging-kafka
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka-clients.version}
+
+
+
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java
new file mode 100644
index 000000000000..72a5db670882
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.e2e.testcase.logging.kafka;
+
+import com.google.common.collect.Lists;
+import io.restassured.http.Method;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.shenyu.e2e.engine.scenario.ShenYuScenarioProvider;
+import org.apache.shenyu.e2e.engine.scenario.specification.ScenarioSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuBeforeEachSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuCaseSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuScenarioSpec;
+import org.apache.shenyu.e2e.model.MatchMode;
+import org.apache.shenyu.e2e.model.Plugin;
+import org.apache.shenyu.e2e.model.data.Condition;
+import org.junit.jupiter.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists;
+import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newConditions;
+import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newRuleBuilder;
+import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newSelectorBuilder;
+
+public class DividePluginCases implements ShenYuScenarioProvider {
+
+ private static final String TOPIC = "shenyu-access-logging";
+
+ private static final String TEST = "/http/order/findById?id=123";
+
+ @Value("${HOST_IP}")
+ private static String kafkaBroker;
+
+ private static final Logger LOG = LoggerFactory.getLogger(DividePluginCases.class);
+
+ @Override
+ public List get() {
+ return Lists.newArrayList(
+ testDivideHello(),
+ testKafkaHello()
+ );
+ }
+
+ private ShenYuScenarioSpec testDivideHello() {
+ return ShenYuScenarioSpec.builder()
+ .name("http client hello1")
+ .beforeEachSpec(ShenYuBeforeEachSpec.builder()
+ .checker(exists(TEST))
+ .build())
+ .caseSpec(ShenYuCaseSpec.builder()
+ .addExists(TEST)
+ .build())
+ .build();
+ }
+
+ private ShenYuScenarioSpec testKafkaHello() {
+ return ShenYuScenarioSpec.builder()
+ .name("testKafkaHello")
+ .beforeEachSpec(
+ ShenYuBeforeEachSpec.builder()
+ .addSelectorAndRule(
+ newSelectorBuilder("selector", Plugin.LOGGING_KAFKA)
+ .name("2")
+ .matchMode(MatchMode.OR)
+ .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http"))
+ .build(),
+ newRuleBuilder("rule")
+ .name("2")
+ .matchMode(MatchMode.OR)
+ .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http"))
+ .build()
+ )
+ .checker(exists(TEST))
+ .build()
+ )
+ .caseSpec(
+ ShenYuCaseSpec.builder()
+ .add(request -> {
+ AtomicBoolean isLog = new AtomicBoolean(false);
+ try {
+ Thread.sleep(1000 * 30);
+// kafkaBroker = kafkaBroker + ":9092";
+// LOG.info("kafkaBroker = " + kafkaBroker);
+ request.request(Method.GET, "/http/order/findById?id=23");
+ Properties properties = new Properties();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ KafkaConsumer consumer = new KafkaConsumer<>(properties);
+ consumer.subscribe(Arrays.asList(TOPIC));
+ Thread.sleep(1000 * 30);
+ AtomicReference keepCosuming = new AtomicReference<>(true);
+ Instant start = Instant.now();
+ while (keepCosuming.get()) {
+ if (Duration.between(start, Instant.now()).toMillis() > 300000) {
+ keepCosuming.set(false);
+ }
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
+ records.forEach(record -> {
+ String message = record.value();
+ LOG.info("kafka message:{}", message);
+ if (message.contains("/http/order/findById")) {
+ isLog.set(true);
+ keepCosuming.set(false);
+ }
+ });
+ }
+ Assertions.assertTrue(isLog.get());
+ } catch (InterruptedException e) {
+ LOG.info("isLog.get():{}", isLog.get());
+ LOG.error("error", e);
+ throw new RuntimeException(e);
+ }
+ }).build()
+ ).build();
+ }
+}
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java
new file mode 100644
index 000000000000..49cfae4ea14c
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.e2e.testcase.logging.kafka;
+
+import com.google.common.collect.Lists;
+import org.apache.shenyu.e2e.client.WaitDataSync;
+import org.apache.shenyu.e2e.client.admin.AdminClient;
+import org.apache.shenyu.e2e.client.gateway.GatewayClient;
+import org.apache.shenyu.e2e.constant.Constants;
+import org.apache.shenyu.e2e.engine.annotation.ShenYuScenario;
+import org.apache.shenyu.e2e.engine.annotation.ShenYuTest;
+import org.apache.shenyu.e2e.engine.scenario.specification.AfterEachSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.BeforeEachSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.CaseSpec;
+import org.apache.shenyu.e2e.enums.ServiceTypeEnum;
+import org.apache.shenyu.e2e.model.ResourcesData;
+import org.apache.shenyu.e2e.model.data.BindingData;
+import org.apache.shenyu.e2e.model.response.SelectorDTO;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.shenyu.e2e.constant.Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID;
+
+@ShenYuTest(environments = {
+ @ShenYuTest.Environment(
+ serviceName = "shenyu-e2e-admin",
+ service = @ShenYuTest.ServiceConfigure(moduleName = "shenyu-e2e",
+ baseUrl = "http://localhost:31095",
+ type = ServiceTypeEnum.SHENYU_ADMIN,
+ parameters = {
+ @ShenYuTest.Parameter(key = "username", value = "admin"),
+ @ShenYuTest.Parameter(key = "password", value = "123456")
+ }
+ )
+ ),
+ @ShenYuTest.Environment(
+ serviceName = "shenyu-e2e-gateway",
+ service = @ShenYuTest.ServiceConfigure(moduleName = "shenyu-e2e",
+ baseUrl = "http://localhost:31195",
+ type = ServiceTypeEnum.SHENYU_GATEWAY
+ )
+ )
+})
+public class DividePluginTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DividePluginTest.class);
+
+ private List selectorIds = Lists.newArrayList();
+
+ @BeforeEach
+ void before(final AdminClient client, final GatewayClient gateway, final BeforeEachSpec spec) {
+ spec.getChecker().check(gateway);
+
+ ResourcesData resources = spec.getResources();
+ for (ResourcesData.Resource res : resources.getResources()) {
+ SelectorDTO dto = client.create(res.getSelector());
+ selectorIds.add(dto.getId());
+ res.getRules().forEach(rule -> {
+ rule.setSelectorId(dto.getId());
+ client.create(rule);
+ });
+ BindingData bindingData = res.getBindingData();
+ if (Objects.nonNull(bindingData)) {
+ bindingData.setSelectorId(dto.getId());
+ bindingData.setNamespaceId(SYS_DEFAULT_NAMESPACE_NAMESPACE_ID);
+ client.bindingData(bindingData);
+ }
+ }
+
+ spec.getWaiting().waitFor(gateway);
+ }
+
+ @AfterEach
+ void after(final AdminClient client, final GatewayClient gateway, final AfterEachSpec spec) {
+ spec.getDeleter().delete(client, selectorIds);
+ spec.deleteWaiting().waitFor(gateway);
+ selectorIds = Lists.newArrayList();
+ }
+
+ @BeforeAll
+ void setup(final AdminClient adminClient, final GatewayClient gatewayClient) throws Exception {
+ adminClient.login();
+
+ WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllSelectors, gatewayClient::getSelectorCache, adminClient);
+ WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData, gatewayClient::getMetaDataCache, adminClient);
+ WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules, gatewayClient::getRuleCache, adminClient);
+
+ Map reqBody = new HashMap<>();
+ LOG.info("start loggingKafka plugin");
+ reqBody.put("pluginId", "33");
+ reqBody.put("name", "loggingKafka");
+ reqBody.put("enabled", "true");
+ reqBody.put("role", "Logging");
+ reqBody.put("sort", "180");
+ reqBody.put("namespaceId", Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID);
+ reqBody.put("config",
+ "{\"topic\":\"shenyu-access-logging\",\"namesrvAddr\":\"shenyu-kafka:9092\",\"sampleRate\":\"1\",\"maxResponseBody\":524288,\"maxRequestBody\":524288,\"compressAlg\":\"none\"}");
+ adminClient.changePluginStatus("1801816010882822171", reqBody);
+ Map plugins = gatewayClient.getPlugins();
+ LOG.info("shenyu e2e plugin list ={}", plugins);
+ WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.kafka.LoggingKafkaPlugin");
+ }
+
+ @ShenYuScenario(provider = DividePluginCases.class)
+ void testDivide(final GatewayClient gateway, final CaseSpec spec) {
+ spec.getVerifiers().forEach(verifier -> verifier.verify(gateway.getHttpRequesterSupplier().get()));
+ }
+}
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/script/e2e-logging-rocketmq-compose.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/script/e2e-logging-rocketmq-compose.sh
new file mode 100644
index 000000000000..f62d9b33d182
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/script/e2e-logging-rocketmq-compose.sh
@@ -0,0 +1,67 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# init kubernetes for mysql
+SHENYU_TESTCASE_DIR=$(dirname "$(dirname "$(dirname "$(dirname "$0")")")")
+bash "${SHENYU_TESTCASE_DIR}"/k8s/script/storage/storage_init_mysql.sh
+
+# init register center
+CUR_PATH=$(readlink -f "$(dirname "$0")")
+PRGDIR=$(dirname "$CUR_PATH")
+# init shenyu sync
+SYNC_ARRAY=("websocket" "http" "zookeeper" "etcd")
+#SYNC_ARRAY=("websocket" "nacos")
+#MIDDLEWARE_SYNC_ARRAY=("zookeeper" "etcd" "nacos")
+
+docker network create -d bridge shenyu
+
+for sync in "${SYNC_ARRAY[@]}"; do
+ echo -e "------------------\n"
+ echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml "
+ docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml up -d --quiet-pull
+ sleep 30s
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31095/actuator/health
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31195/actuator/health
+ docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml up -d --quiet-pull
+ docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml up -d --quiet-pull
+ sleep 30s
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31189/actuator/health
+ sleep 10s
+ docker ps -a
+ ## run e2e-test
+ ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq -am test
+ # shellcheck disable=SC2181
+ if (($?)); then
+ echo "${sync}-sync-e2e-test failed"
+ echo "------------------"
+ echo "shenyu-admin log:"
+ echo "------------------"
+ docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml logs shenyu-admin
+ echo "shenyu-bootstrap log:"
+ echo "------------------"
+ docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml logs shenyu-bootstrap
+ echo "shenyu-rocketmq log:"
+ echo "------------------"
+ docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml logs
+ exit 1
+ fi
+ docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml down
+ docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml down
+ docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml down
+ echo "[Remove ${sync} synchronous] delete shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml "
+done
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-examples-http-compose.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-examples-http-compose.yml
new file mode 100644
index 000000000000..ce8c8a7e7b22
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-examples-http-compose.yml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.9'
+
+services:
+ shenyu-examples-http:
+ image: shenyu-examples-http:latest
+ container_name: shenyu-examples-http
+ environment:
+ - shenyu.register.serverLists=http://shenyu-admin:9095
+ ports:
+ - "31189:8189"
+ healthcheck:
+ test: [ "CMD-SHELL", "wget -q -O - http://localhost:8189/actuator/health | grep UP || exit 1" ]
+ interval: 10s
+ timeout: 2s
+ retries: 3
+ start_period: 10s
+ restart: always
+ networks:
+ - shenyu
+
+networks:
+ shenyu:
+ name: shenyu
+ driver: bridge
+ external: true
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-rocketmq-compose.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-rocketmq-compose.yml
new file mode 100644
index 000000000000..5f34a739ca68
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-rocketmq-compose.yml
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.9'
+
+services:
+ rocketmq-dialevoneid:
+ image: rocketmqinc/rocketmq:4.4.0
+ container_name: rocketmq-dialevoneid
+ command: [ "/bin/sh", "mqnamesrv" ]
+ ports:
+ - "31876:9876"
+ environment:
+ - TZ=Asia/Shanghai
+ restart: always
+ networks:
+ - shenyu
+
+ rocketmq-broker:
+ image: rocketmqinc/rocketmq:4.4.0
+ container_name: rocketmq-broker
+ command: [ "/bin/sh", "mqbroker" ]
+ ports:
+ - "10909:10909"
+ - "10911:10911"
+ - "10912:10912"
+ environment:
+ - NAMESRV_ADDR=rocketmq-dialevoneid:9876
+ - TZ=Asia/Shanghai
+ restart: always
+ networks:
+ - shenyu
+
+networks:
+ shenyu:
+ name: shenyu
+ driver: bridge
+ external: true
\ No newline at end of file
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/script/e2e-http-sync.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/script/e2e-http-sync.sh
new file mode 100644
index 000000000000..2ca65f63ddaf
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/script/e2e-http-sync.sh
@@ -0,0 +1,81 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+docker save shenyu-examples-http:latest | sudo k3s ctr images import -
+
+# init kubernetes for mysql
+SHENYU_TESTCASE_DIR=$(dirname "$(dirname "$(dirname "$(dirname "$0")")")")
+bash "${SHENYU_TESTCASE_DIR}"/k8s/script/storage/storage_init_mysql.sh
+
+# init register center
+CUR_PATH=$(readlink -f "$(dirname "$0")")
+PRGDIR=$(dirname "$CUR_PATH")
+kubectl apply -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-cm.yml
+
+# init shenyu sync
+SYNC_ARRAY=("websocket" "http" "zookeeper" "etcd")
+#SYNC_ARRAY=("websocket" "nacos")
+MIDDLEWARE_SYNC_ARRAY=("zookeeper" "etcd" "nacos")
+for sync in ${SYNC_ARRAY[@]}; do
+ echo -e "------------------\n"
+ kubectl apply -f "${PRGDIR}"/shenyu-kafka.yml
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:9092/actuator/health
+
+ sleep 30s
+ echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
+ # shellcheck disable=SC2199
+ # shellcheck disable=SC2076
+ # shellcheck disable=SC2154
+ if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
+ kubectl apply -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-"${sync}".yml
+ sleep 10s
+ fi
+ kubectl apply -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-admin-"${sync}".yml
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31095/actuator/health
+ kubectl apply -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31195/actuator/health
+ sleep 10s
+ kubectl get pod -o wide
+
+ kubectl logs "$(kubectl get pod -o wide | grep shenyu-admin | awk '{print $1}')"
+
+ ## run e2e-test
+ ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-logging-kafka -am test
+ # shellcheck disable=SC2181
+ if (($?)); then
+ echo "${sync}-sync-e2e-test failed"
+ echo "shenyu-admin log:"
+ echo "------------------"
+ kubectl logs "$(kubectl get pod -o wide | grep shenyu-admin | awk '{print $1}')"
+ echo "shenyu-bootstrap log:"
+ echo "------------------"
+ kubectl logs "$(kubectl get pod -o wide | grep shenyu-bootstrap | awk '{print $1}')"
+ exit 1
+ fi
+ kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-mysql.yml
+ kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-admin-"${sync}".yml
+ kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
+ kubectl delete -f "${PRGDIR}"/shenyu-kafka.yml
+
+ # shellcheck disable=SC2199
+ # shellcheck disable=SC2076
+ if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
+ kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-"${sync}".yml
+ fi
+ echo "[Remove ${sync} synchronous] delete shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
+done
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/shenyu-kafka.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/shenyu-kafka.yml
new file mode 100644
index 000000000000..94a63c58ae47
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/shenyu-kafka.yml
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: zookeeper
+ namespace: default
+ labels:
+ app: zookeeper
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: zookeeper
+ template:
+ metadata:
+ labels:
+ app: zookeeper
+ spec:
+ containers:
+ - name: zookeeper
+ image: zookeeper:3.7
+ ports:
+ - containerPort: 2181
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: zookeeper
+ namespace: default
+ labels:
+ app: zookeeper
+spec:
+ ports:
+ - port: 2181
+ name: client
+ selector:
+ app: zookeeper
+
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: kafka
+ namespace: default
+ labels:
+ app: kafka
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: kafka
+ template:
+ metadata:
+ labels:
+ app: kafka
+ spec:
+ containers:
+ - name: kafka
+ image: bitnami/kafka:3.6.2
+ env:
+ - name: KAFKA_ADVERTISED_LISTENERS
+ value: PLAINTEXT://kafka:9092
+ - name: KAFKA_ZOOKEEPER_CONNECT
+ value: zookeeper:2181
+ ports:
+ - containerPort: 31877
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: kafka
+ namespace: default
+ labels:
+ app: kafka
+spec:
+ type: NodePort
+ ports:
+ - port: 31877
+ name: client
+ protocol: TCP
+ targetPort: 31877
+ nodePort: 31877
+ selector:
+ app: kafka
\ No newline at end of file
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/pom.xml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/pom.xml
new file mode 100644
index 000000000000..f9e8eef96397
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/pom.xml
@@ -0,0 +1,36 @@
+
+
+
+
+ org.apache.shenyu
+ shenyu-e2e-case
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+ shenyu-e2e-case-logging-rocketmq
+
+
+
+ org.apache.rocketmq
+ rocketmq-client
+ 4.9.3
+
+
+
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginCases.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginCases.java
new file mode 100644
index 000000000000..bc24271d34da
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginCases.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.e2e.testcase.logging.rocketmq;
+
+import com.google.common.collect.Lists;
+import io.restassured.http.Method;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.shenyu.e2e.engine.scenario.ShenYuScenarioProvider;
+import org.apache.shenyu.e2e.engine.scenario.specification.ScenarioSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuBeforeEachSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuCaseSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuScenarioSpec;
+import org.apache.shenyu.e2e.model.MatchMode;
+import org.apache.shenyu.e2e.model.Plugin;
+import org.apache.shenyu.e2e.model.data.Condition;
+import org.junit.jupiter.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists;
+import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newConditions;
+import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newRuleBuilder;
+import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newSelectorBuilder;
+
+public class DividePluginCases implements ShenYuScenarioProvider {
+
+ private static final String NAMESERVER = "http://localhost:31876";
+
+ private static final String CONSUMERGROUP = "shenyu-plugin-logging-rocketmq";
+
+ private static final String TOPIC = "shenyu-access-logging";
+
+ private static final String TEST = "/http/order/findById?id=123";
+
+ private static final Logger LOG = LoggerFactory.getLogger(DividePluginCases.class);
+
+ @Override
+ public List get() {
+ return Lists.newArrayList(
+ testDivideHello(),
+ testRocketMQHello()
+ );
+ }
+
+ private ShenYuScenarioSpec testDivideHello() {
+ return ShenYuScenarioSpec.builder()
+ .name("http client hello1")
+ .beforeEachSpec(ShenYuBeforeEachSpec.builder()
+ .checker(exists("/http/order/findById?id=123"))
+ .build())
+ .caseSpec(ShenYuCaseSpec.builder()
+ .addExists("/http/order/findById?id=123")
+ .build())
+ .build();
+ }
+
+ private ShenYuScenarioSpec testRocketMQHello() {
+ return ShenYuScenarioSpec.builder()
+ .name("testRocketMQHello")
+ .beforeEachSpec(
+ ShenYuBeforeEachSpec.builder()
+ .addSelectorAndRule(
+ newSelectorBuilder("selector", Plugin.LOGGING_ROCKETMQ)
+ .name("1")
+ .matchMode(MatchMode.OR)
+ .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http"))
+ .build(),
+ newRuleBuilder("rule")
+ .name("1")
+ .matchMode(MatchMode.OR)
+ .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http"))
+ .build()
+ )
+ .checker(exists(TEST))
+ .build()
+ )
+ .caseSpec(
+ ShenYuCaseSpec.builder()
+ .add(request -> {
+ AtomicBoolean isLog = new AtomicBoolean(false);
+ try {
+ Thread.sleep(1000 * 30);
+ request.request(Method.GET, "/http/order/findById?id=23");
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMERGROUP);
+ consumer.setNamesrvAddr(NAMESERVER);
+ consumer.subscribe(TOPIC, "*");
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {
+ LOG.info("Msg:{}", msgs);
+ if (CollectionUtils.isNotEmpty(msgs)) {
+ msgs.forEach(e -> {
+ if (new String(e.getBody()).contains("/http/order/findById?id=23")) {
+ isLog.set(true);
+ }
+ });
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ LOG.info("consumer.start ; isLog.get():{}", isLog.get());
+ consumer.start();
+ Thread.sleep(1000 * 30);
+ LOG.info("isLog.get():{}", isLog.get());
+ Assertions.assertTrue(isLog.get());
+ } catch (Exception e) {
+ LOG.error("error", e);
+ Assertions.assertTrue(isLog.get());
+ }
+ })
+ .build()
+ )
+// .afterEachSpec(ShenYuAfterEachSpec.builder()
+// .deleteWaiting(notExists(TEST)).build())
+ .build();
+ }
+}
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginTest.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginTest.java
new file mode 100644
index 000000000000..66f6ee5bbe27
--- /dev/null
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.e2e.testcase.logging.rocketmq;
+
+import com.google.common.collect.Lists;
+import org.apache.shenyu.e2e.client.WaitDataSync;
+import org.apache.shenyu.e2e.client.admin.AdminClient;
+import org.apache.shenyu.e2e.client.gateway.GatewayClient;
+import org.apache.shenyu.e2e.constant.Constants;
+import org.apache.shenyu.e2e.engine.annotation.ShenYuScenario;
+import org.apache.shenyu.e2e.engine.annotation.ShenYuTest;
+import org.apache.shenyu.e2e.engine.scenario.specification.AfterEachSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.BeforeEachSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.CaseSpec;
+import org.apache.shenyu.e2e.enums.ServiceTypeEnum;
+import org.apache.shenyu.e2e.model.ResourcesData;
+import org.apache.shenyu.e2e.model.data.BindingData;
+import org.apache.shenyu.e2e.model.response.SelectorDTO;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.shenyu.e2e.constant.Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID;
+
+@ShenYuTest(environments = {
+ @ShenYuTest.Environment(
+ serviceName = "shenyu-e2e-admin",
+ service = @ShenYuTest.ServiceConfigure(moduleName = "shenyu-e2e",
+ baseUrl = "http://localhost:31095",
+ type = ServiceTypeEnum.SHENYU_ADMIN,
+ parameters = {
+ @ShenYuTest.Parameter(key = "username", value = "admin"),
+ @ShenYuTest.Parameter(key = "password", value = "123456")
+ }
+ )
+ ),
+ @ShenYuTest.Environment(
+ serviceName = "shenyu-e2e-gateway",
+ service = @ShenYuTest.ServiceConfigure(moduleName = "shenyu-e2e",
+ baseUrl = "http://localhost:31195",
+ type = ServiceTypeEnum.SHENYU_GATEWAY
+ )
+ )
+})
+public class DividePluginTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DividePluginTest.class);
+
+ private List selectorIds = Lists.newArrayList();
+
+ @BeforeEach
+ void before(final AdminClient client, final GatewayClient gateway, final BeforeEachSpec spec) {
+ spec.getChecker().check(gateway);
+
+ ResourcesData resources = spec.getResources();
+ for (ResourcesData.Resource res : resources.getResources()) {
+ SelectorDTO dto = client.create(res.getSelector());
+ selectorIds.add(dto.getId());
+ res.getRules().forEach(rule -> {
+ rule.setSelectorId(dto.getId());
+ client.create(rule);
+ });
+ BindingData bindingData = res.getBindingData();
+ if (Objects.nonNull(bindingData)) {
+ bindingData.setSelectorId(dto.getId());
+ bindingData.setNamespaceId(SYS_DEFAULT_NAMESPACE_NAMESPACE_ID);
+ client.bindingData(bindingData);
+ }
+ }
+
+ spec.getWaiting().waitFor(gateway);
+ }
+
+ @AfterEach
+ void after(final AdminClient client, final GatewayClient gateway, final AfterEachSpec spec) {
+ spec.getDeleter().delete(client, selectorIds);
+ spec.deleteWaiting().waitFor(gateway);
+ selectorIds = Lists.newArrayList();
+ }
+
+ @BeforeAll
+ void setup(final AdminClient adminClient, final GatewayClient gatewayClient) throws Exception {
+ adminClient.login();
+
+ WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllSelectors, gatewayClient::getSelectorCache, adminClient);
+ WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData, gatewayClient::getMetaDataCache, adminClient);
+ WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules, gatewayClient::getRuleCache, adminClient);
+
+ LOG.info("start loggingRocketMQ plugin");
+ Map reqBody = new HashMap<>();
+ reqBody.put("pluginId", "29");
+ reqBody.put("name", "loggingRocketMQ");
+ reqBody.put("enabled", "true");
+ reqBody.put("role", "Logging");
+ reqBody.put("sort", "170");
+ reqBody.put("namespaceId", Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID);
+ reqBody.put("config", "{\"topic\":\"shenyu-access-logging\", \"namesrvAddr\": \"rocketmq-dialevoneid:9876\",\"producerGroup\":\"shenyu-plugin-logging-rocketmq\"}");
+ adminClient.changePluginStatus("1801816010882822166", reqBody);
+ Map plugins = gatewayClient.getPlugins();
+ LOG.info("shenyu e2e plugin list ={}", plugins);
+ WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.rocketmq.LoggingRocketMQPlugin");
+
+ }
+
+ @ShenYuScenario(provider = DividePluginCases.class)
+ void testDivide(final GatewayClient gateway, final CaseSpec spec) {
+ spec.getVerifiers().forEach(verifier -> verifier.verify(gateway.getHttpRequesterSupplier().get()));
+ }
+}
diff --git a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java
index ce3efdd91c9a..95f5ecf79716 100644
--- a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java
+++ b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java
@@ -17,6 +17,7 @@
package org.apache.shenyu.e2e.client;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.shenyu.e2e.client.admin.AdminClient;
import org.apache.shenyu.e2e.client.gateway.GatewayClient;
import org.junit.jupiter.api.Assertions;
@@ -34,6 +35,8 @@
public class WaitDataSync {
private static final Logger LOGGER = LoggerFactory.getLogger(WaitDataSync.class);
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
/**
* waitAdmin2GatewayDataSync.
@@ -81,9 +84,10 @@ public static , U extends List>> void waitAdmin2GatewayDataS
*/
public static void waitGatewayPluginUse(final GatewayClient gatewayClient, final String pluginClass) throws Exception {
Map pluginMap = gatewayClient.getPlugins();
+ LOGGER.info("pluginMap:{}", MAPPER.writeValueAsString(pluginMap));
int retryNum = 0;
boolean existPlugin = false;
- while (!existPlugin && retryNum < 5) {
+ while (!existPlugin && retryNum < 10) {
for (String plugin : pluginMap.keySet()) {
if (plugin.startsWith(pluginClass)) {
existPlugin = true;
@@ -93,6 +97,7 @@ public static void waitGatewayPluginUse(final GatewayClient gatewayClient, final
Thread.sleep(10000);
retryNum++;
pluginMap = gatewayClient.getPlugins();
+ LOGGER.info("pluginMap:{}", MAPPER.writeValueAsString(pluginMap));
}
if (!existPlugin) {
throw new AssertionFailedError(pluginClass + " plugin not found");
diff --git a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java
index 5ffd1a61eb3a..50afe58dd725 100644
--- a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java
+++ b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java
@@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.restassured.response.Response;
import io.restassured.specification.RequestSpecification;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.shenyu.e2e.annotation.ShenYuGatewayClient;
import org.apache.shenyu.e2e.client.BaseClient;
import org.apache.shenyu.e2e.common.RequestLogConsumer;
@@ -178,6 +179,9 @@ public List getSelectorCache() throws JsonProcessingException
List selectorDataList = new ArrayList<>();
for (Map.Entry entry : s.entrySet()) {
List list = (List) entry.getValue();
+ if (CollectionUtils.isEmpty(list)) {
+ continue;
+ }
String json = MAPPER.writeValueAsString(list.get(0));
SelectorCacheData selectorData = MAPPER.readValue(json, SelectorCacheData.class);
selectorDataList.add(selectorData);