Skip to content

Commit

Permalink
Update the nameaddress and topic configuration in logging-kafka (apac…
Browse files Browse the repository at this point in the history
…he#3850)

* update docker compose and port from 8082 to 9092

* Integration test update docker compose

* update the nameaddress and topic configuration in logging-kafka

* reformat logging-kafka
  • Loading branch information
qifanyyy authored Aug 29, 2022
1 parent 4ba581d commit 053c459
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class GenericLoggingConstant {
/**
* aliyun sls topic.
*/
public static final String TOPIC = "Topic";
public static final String TOPIC = "topic";

/**
* send thread config.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.apache.shenyu.plugin.logging.kafka.client;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -28,24 +35,16 @@
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.shenyu.common.utils.JsonUtils;
import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
import org.apache.shenyu.plugin.logging.common.utils.LogCollectConfigUtils;
import org.apache.shenyu.plugin.logging.kafka.config.KafkaLogCollectConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* queue-based logging collector.
*/
Expand Down Expand Up @@ -74,15 +73,15 @@ public void initProducer(final Properties props) {
if (isStarted.get()) {
close();
}
String topic = props.getProperty(GenericLoggingConstant.TOPIC);
String topic = "shenyu-access-logging";
String nameserverAddress = props.getProperty("bootstrap.servers");
if (StringUtils.isBlank(topic) || StringUtils.isBlank(nameserverAddress)) {
LOG.error("init kafkaLogCollectClient error, please check topic or nameserverAddress");
return;
}
this.topic = topic;
producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(this.topic, "shenyu-access-logging");
ProducerRecord<String, String> record = new ProducerRecord<>("shenyu-access-logging", StringSerializer.class.getName(), StringSerializer.class.getName());
try {
producer.send(record);
LOG.info("init kafkaLogCollectClient success");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.shenyu.plugin.logging.kafka.handler;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -28,19 +33,12 @@
import org.apache.shenyu.common.enums.SelectorTypeEnum;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
import org.apache.shenyu.plugin.logging.kafka.client.KafkaLogCollectClient;
import org.apache.shenyu.plugin.logging.kafka.collector.KafkaLogCollector;
import org.apache.shenyu.plugin.logging.kafka.config.KafkaLogCollectConfig;
import org.apache.shenyu.plugin.logging.kafka.client.KafkaLogCollectClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

/**
* The type logging kafka plugin data handler.
*/
Expand Down Expand Up @@ -99,8 +97,6 @@ public void handlerPlugin(final PluginData pluginData) {
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put("bootstrap.servers", globalLogConfig.getNamesrvAddr());
properties.put(GenericLoggingConstant.TOPIC, globalLogConfig.getTopic());
properties.put(GenericLoggingConstant.NAMESERVER_ADDRESS, globalLogConfig.getTopic());
KAFKA_LOG_COLLECT_CLIENT.initProducer(properties);
KafkaLogCollector.getInstance().start();
} else {
Expand Down

0 comments on commit 053c459

Please sign in to comment.