Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions rocketmq-test/client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.rmq</groupId>
<artifactId>hello</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
</dependencies>

</project>
77 changes: 77 additions & 0 deletions rocketmq-test/client/src/main/java/demo/ConsumerRunnable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.demo;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener. ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;



public class ConsumerRunnable implements Runnable{

//private static DefaultMQPushConsumer consumer = null;

private String url = null;
private String topicName = null;
private String tagName = null;

public ConsumerRunnable(String url, String topicName, String tagName,int size) {
this.url = url;
this.topicName = topicName;
this.tagName = tagName;
this.size = size;
}


private static String createSpecificSizeString(int size){
byte[] temp = new byte[size];
Arrays.fill(temp, (byte)0);
String temp_str = new String(temp);
return temp_str;
}
// private static void startConsumer(String topicName) throws Exception {
// System.out.println(topicName + " " + "Start consume");
// while (true) {
// // Wait for a message
// Message<byte[]> msg = consumer.receive();
// try {
// System.out.printf(topicName + " " + Thread.currentThread().getName() + " Message from: %s\n", new String(msg.getData()));
// consumer.acknowledge(msg);
// } catch (Exception e) {
// System.err.printf("Unable to consume message: %s", e.getMessage());
// consumer.negativeAcknowledge(msg);
// }
// }
// }

@Override
public void run(){
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup-"+topicName);
// 设置NameServer的地址
consumer.setNamesrvAddr(url);

// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe(topicName, tagName);
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();
System.out.printf("%s Consumer Started.%n", Thread.currentThread().getName());
}catch (Exception e){
e.printStackTrace();
}
}
}
22 changes: 22 additions & 0 deletions rocketmq-test/client/src/main/java/demo/ConsumerThreadPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.demo;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerThreadPool {
public static void main(String[] args) {
int consumerThreadCount = 20;
int topicNumber = 20;

String url = "localhost:9876";
String topicName = "topic";
String tagName = "*";

ExecutorService pool = Executors.newFixedThreadPool(consumerThreadCount);
for (int topicIndex = 0; topicIndex < topicNumber; topicIndex++) {
pool.submit(new ConsumerRunnable(url, topicName + Integer.toString(topicIndex), tagName));
}
pool.shutdown();
}
}
61 changes: 61 additions & 0 deletions rocketmq-test/client/src/main/java/demo/ProducerRunnable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.demo;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import javax.swing.*;

public class ProducerRunnable implements Runnable{

//private static DefaultMQProducer producer;

private String url = null;
private String topicName = null;
private int sleepTime = 0;
private int size = 0;

public ProducerRunnable(String url, String topicName, int sleepTime,int size) {
this.url = url;
this.topicName = topicName;
this.sleepTime = sleepTime;
this.size = size;
}

private static String createSpecificSizeString(int size){
byte[] temp = new byte[size];
Arrays.fill(temp, (byte)0);
String temp_str = new String(temp);
return temp_str;
}

private static void startProducer(String topicName,DefaultMQProducer producer) throws Exception {
System.out.println(topicName + " " + "Start produce");
while (true) {
Message message = new Message(topicName,"TagTest",createSpecificSizeString(size).getBytes());
long startTime = System.currentTimeMillis();
producer.send(message);
long endTime = System.currentTimeMillis();
System.out.println(topicName + " " + "send message in %d",endTime-startTime);
Thread.sleep(1000);
}
}

@Override
public void run(){
try {
//设置生产者组名
DefaultMQProducer producer = new DefaultMQProducer("producerGroup-"+ topicName);
//指定nameServer的地址, 多个地址用分号分隔
producer.setNamesrvAddr(url);

//启动实例
producer.start();



startProducer(topicName,producer);
}catch (Exception e){
e.printStackTrace();
}
}
}
21 changes: 21 additions & 0 deletions rocketmq-test/client/src/main/java/demo/ProducerThreadPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.demo;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ProducerThreadPool {
public static void main(String[] args) {
int producerThreadNumber = 20;
int topicNumber = 20;

String url = "localhost:9876";
String topicName = "topic";
int sleepTime = 1000;

ExecutorService pool = Executors.newFixedThreadPool(producerThreadNumber);
for (int topicIndex = 0; topicIndex < topicNumber; topicIndex++) {
pool.submit(new ProducerRunnable(url, topicName + Integer.toString(topicIndex), sleepTime));
}
pool.shutdown();
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
org/demo/ConsumerRunnable$1.class
hello/Hello.class
sample/RocketConsumer$1.class
org/demo/ConsumerRunnable.class
org/demo/ProducerRunnable.class
sample/RocketConsumer.class
sample/RocketProducer.class
org/demo/ConsumerThreadPool.class
org/demo/ProducerThreadPool.class
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/home/sdn02/rocketmq/Client/src/main/java/demo/ProducerThreadPool.java
/home/sdn02/rocketmq/Client/src/main/java/demo/ConsumerRunnable.java
/home/sdn02/rocketmq/Client/src/main/java/demo/ConsumerThreadPool.java
/home/sdn02/rocketmq/Client/src/main/java/sample/RocketProducer.java
/home/sdn02/rocketmq/Client/src/main/java/hello/Hello.java
/home/sdn02/rocketmq/Client/src/main/java/demo/ProducerRunnable.java
/home/sdn02/rocketmq/Client/src/main/java/sample/RocketConsumer.java
99 changes: 99 additions & 0 deletions rocketmq-test/docker-rocketmq/base/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

FROM centos:7

RUN yum install -y java-1.8.0-openjdk-devel.x86_64 unzip gettext nmap-ncat openssl, which gnupg, telnet \
&& yum clean all -y

# FROM openjdk:8-jdk
# RUN apt-get update && apt-get install -y --no-install-recommends \
# bash libapr1 unzip telnet wget gnupg ca-certificates \
# && rm -rf /var/lib/apt/lists/*

ARG user=rocketmq
ARG group=rocketmq
ARG uid=3000
ARG gid=3000

# RocketMQ is run with user `rocketmq`, uid = 3000
# If you bind mount a volume from the host or a data container,
# ensure you use the same uid
RUN groupadd -g ${gid} ${group} \
&& useradd -u ${uid} -g ${gid} -m -s /bin/bash ${user}

ARG version=4.7.0

# Rocketmq version
ENV ROCKETMQ_VERSION ${version}

# Rocketmq home
ENV ROCKETMQ_HOME /home/rocketmq/rocketmq-${ROCKETMQ_VERSION}

WORKDIR ${ROCKETMQ_HOME}

RUN set -eux; \
curl https://archive.apache.org/dist/rocketmq/${ROCKETMQ_VERSION}/rocketmq-all-${ROCKETMQ_VERSION}-bin-release.zip -o rocketmq.zip; \
curl https://archive.apache.org/dist/rocketmq/${ROCKETMQ_VERSION}/rocketmq-all-${ROCKETMQ_VERSION}-bin-release.zip.asc -o rocketmq.zip.asc; \
#https://www.apache.org/dist/rocketmq/KEYS
#curl https://www.apache.org/dist/rocketmq/KEYS -o KEYS; \
\
#gpg --import KEYS; \
#gpg --batch --verify rocketmq.zip.asc rocketmq.zip ; \
unzip rocketmq.zip ; \
mv rocketmq-all*/* . ; \
rmdir rocketmq-all* ; \
rm -rf rocketmq.zip rocketmq.zip.asc KEYS

# add scripts
COPY scripts/ ${ROCKETMQ_HOME}/bin/

RUN chown -R ${uid}:${gid} ${ROCKETMQ_HOME}

# expose namesrv port
EXPOSE 9876

# add customized scripts for namesrv
RUN mv -bf ${ROCKETMQ_HOME}/bin/runserver-customize.sh ${ROCKETMQ_HOME}/bin/runserver.sh \
&& chmod a+x ${ROCKETMQ_HOME}/bin/runserver.sh \
&& chmod a+x ${ROCKETMQ_HOME}/bin/mqnamesrv

# expose broker ports
EXPOSE 10909 10911 10912

# add customized scripts for broker
RUN mv -bf ${ROCKETMQ_HOME}/bin/runbroker-customize.sh ${ROCKETMQ_HOME}/bin/runbroker.sh \
&& chmod a+x ${ROCKETMQ_HOME}/bin/runbroker.sh \
&& chmod a+x ${ROCKETMQ_HOME}/bin/mqbroker \
&& ln -s ${ROCKETMQ_HOME}/bin/mqadmin /usr/local/bin/mqadmin \
&& ln -s ${ROCKETMQ_HOME}/bin/runbroker /usr/local/bin/runbroker \
&& ln -s ${ROCKETMQ_HOME}/bin/mqnamesrv /usr/local/bin/mqnamesrv \
&& ln -s ${ROCKETMQ_HOME}/bin/mqbroker /usr/local/bin/mqbroker \
&& ln -s ${ROCKETMQ_HOME}/bin/runbroker.sh /usr/local/bin/runbroker.sh \
&& ln -s ${ROCKETMQ_HOME}/bin/runserver.sh /usr/local/bin/runserver.sh \
&& ln -s ${ROCKETMQ_HOME}/bin/runbroker.sh /usr/local/bin/runbroker-customize.sh \
&& ln -s ${ROCKETMQ_HOME}/bin/runserver.sh /usr/local/bin/runserver-customize.sh

# export Java options
RUN export JAVA_OPT=" -Duser.home=/opt"

# Add ${JAVA_HOME}/lib/ext as java.ext.dirs
RUN sed -i 's/${JAVA_HOME}\/jre\/lib\/ext/${JAVA_HOME}\/jre\/lib\/ext:${JAVA_HOME}\/lib\/ext/' ${ROCKETMQ_HOME}/bin/tools.sh

USER ${user}

WORKDIR ${ROCKETMQ_HOME}/bin
4 changes: 4 additions & 0 deletions rocketmq-test/docker-rocketmq/base/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@



https://github.com/apache/rocketmq-docker
Loading