Skip to content

xjune123/learnerdemo

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

创建生产者


  • yml配置如下
spring:
  application:
    name: kafkatest
  mvc:
    servlet:
      load-on-startup: 1
  kafka:
    producer:
    #110.211.99.254本机IP为了容器能访问
      bootstrap-servers: 10.211.99.254:9092
  • 发送消息代码
@Service
public class SpringKafkaTestServiceImpl implements SpringKafkaTestService {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Override
    public void sendMessage(String message) {
        String key = UUID.randomUUID().toString();
        //key ="1";//测试如果key 一样,给的partition 均一致保证同一个key 都会到一个partition 上执行
        ListenableFuture future = kafkaTemplate.send("test_topic", key, message);
        future.addCallback(o -> System.out.println("send-消息发送成功:" + message),
                throwable -> System.out.println("消息发送失败:" + message));
    }
}

创建消费者


  • yml配置如下
spring:
  application:
    name: kafkatest
  mvc:
    servlet:
      load-on-startup: 1
  kafka:
    producer:
    #110.211.99.254本机IP为了容器能访问,如果不构建镜像可直接写localhost
      bootstrap-servers: 10.211.99.254:9092
    consumer:
      #指定消息被消费之后自动提交偏移量(即消息的编号,表示消费到了哪个位置,消费者每消费完一条消息就会向kafka服务器汇报自己消消费到的那个消息的下次继续消费编号,以便于
      enable-auto-commit: true
      #消费者组
      group-id: kafka-group
      #从最近的地方开始消费
      auto-offset-reset: latest
      #可设置多个
      bootstrap-servers: 10.211.99.254:9092
      max-poll-records: 10
  • 消费代码如下
@Component
public class Consumer {
    @KafkaListener(topics = {"test_topic"})
    public void receive(ConsumerRecord<?, ?> record) {
        System.out.println("消费消息:" + record.value().toString()+ " 时间:"+new Date());
        System.out.println("消费消息key:" + record.key() + "  partition:" + record.partition());

    }
}

启动kafka


  • mac server.properties路径 /usr/local/etc/kafka/,将partition 更改为3
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
  • 启动zookeeper和kafka
cd  /usr/local/
./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &

kafka-server-start /usr/local/etc/kafka/server.properties &

构建docker镜像用于启动多个consumer


  • 切换到项目目录构建镜像
mvn clean package -DskipTests
docker build -t xjune/testkafka .
  • 启动项目,创建三个消费者
 docker run -d --name kafkanode1   xjune/testkafka:latest
 docker run -d --name kafkanode2   xjune/testkafka:latest
 docker run -d --name kafkanode3   xjune/testkafka:latest
  • 启动项目后,查看各个docker 启动情况
 docker logs  -f kafkanode1
 docker logs  -f kafkanode2
 docker logs  -f kafkanode3
  • 在启动后,如果partition 数量消费消费者,消费者启动后partition 会报
018-01-12 03:52:59.097  INFO 5 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
  • 若正常启动,日志如下:
2018-01-12 03:52:59.101  INFO 6 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[test_topic-0]
  • 查看kafka manager 执行情况

image

  • 发送测试消息观察发送情况:每个消费者只会消费自己partition 的消息
http://localhost:7002/sendMessage?message=你好4

总结


每个消费者只会消费一个partition ,若partition 设置为3,但是消费者数量为4,那么将会有1个消费者无法消费,浪费资源。 当然如果消费者数量为2,那么将有一个消费者消费两个partition。 github地址

https://github.com/xjune123/learnerdemo.git

About

learnerdemo

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published