Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions week14/rabbit-mq/hello_rabbitmq_java/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/

### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr

### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/

### VS Code ###
.vscode/

### Mac OS ###
.DS_Store
57 changes: 57 additions & 0 deletions week14/rabbit-mq/hello_rabbitmq_java/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>dev.mq</groupId>
<artifactId>hello_rabbitmq_java</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>hello_rabbitmq_java</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.18.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.16</version>
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.16</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
13 changes: 13 additions & 0 deletions week14/rabbit-mq/hello_rabbitmq_java/src/main/java/dev/mq/App.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package dev.mq;

/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package dev.mq.steo01;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
* Receiver - λ©”μ‹œμ§€ μ†ŒλΉ„μž (Consumer)ν–‰
*
*/
public class Receiver {
private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

// RabbitMQ μ„œλ²„(5672)와 컀λ„₯μ…˜ μ—°κ²° μˆ˜ν–‰
Connection connection = factory.newConnection();
// μ±„λ…ˆ 생성
Channel channel = connection.createChannel();

// 큐 생성
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [Receiver] Waiting ... ");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("[Receiver] λ©”μ‹œμ§€λ₯Ό λ°›μŒ: " + message);
};

channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package dev.mq.steo01;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* Sender - λ©”μ‹œμ§€ λ°œν–‰μž (Publisher)
*
* νΌλΈ”λ¦¬μ…”λŠ” RabbitMq와 μ—°κ²°, λ©”μ‹œμ§€λ₯Ό μ „μ†‘ν•˜λŠ” 처리 μˆ˜ν–‰
* μ»¨μŠˆλ¨ΈλŠ” λ©”μ‹œμ§€ νλ‘œλΆ€ν„° 퍼블리셔가 μ μž¬ν•œ λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•˜λŠ” 처리 μˆ˜ν–‰
*/
public class Sender {
private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

// RabbitMQ μ„œλ²„(5672)와 컀λ„₯μ…˜ μ—°κ²° μˆ˜ν–‰
Connection connection = factory.newConnection();
// μ±„λ…ˆ 생성
Channel channel = connection.createChannel();

// 큐 생성
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String messgae = "Hell World!";

// Publish
channel
.basicPublish("", QUEUE_NAME, null, messgae.getBytes(StandardCharsets.UTF_8));
System.out.println(" [Publisher] Sent = " + messgae);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package dev.mq.step02;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
μž‘μ—…(Task)을 μž‘μ—… 큐(Work queue)에 μŠ€μΌ€μ€„λ§ν•˜λŠ” ν”„λ‘œκ·Έλž¨
λ¬Έμžμ—΄κ°’μ΄ μ–΄λ–€ λ³΅μž‘ν•œ μž‘μ—…μ΄λΌκ³  κ°€μ •ν•  λ•Œ,
ex. Hello...이라고 ν•˜λ©΄
. ν•˜λ‚˜λŠ” 1μ΄ˆκ°€ μ†Œμš”λ˜λŠ” μž‘μ—…μ΄λΌκ³  κ°€μ •

λ”°λΌμ„œ Hello...λŠ” 3μ΄ˆκ°€ κ±Έλ¦¬λŠ” μž‘μ—…μ΄λΌκ³  κ°€μ •

μ‹€ν–‰ μΈμžκ°’(args) ν™œμš©ν•΄μ„œ Hello...κ³Ό 같은 κ°’ μž…λ ₯
*/
public class NewTask {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] args) throws IOException, TimeoutException {
// μ„œλ²„ μ—°κ²°
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

// 큐 생성
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
// μ‹€ν–‰ μ˜΅μ…˜μœΌλ‘œ λ©”μ‹œμ§€λ₯Ό 생성, ex. Hello...
String message = String.join(" ", args);

// λ©”μ‹œμ§€ 적재
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [Publisher] Sent " + message);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package dev.mq.step02;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
μž‘μ—… νμ—μ„œ κΊΌλ‚Έ μž‘μ—…μ˜ λ©”μ‹œμ§€λ₯Ό μ²˜λ¦¬ν•˜λŠ” μ—­ν• 
*/
public class Worker {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
System.out.println("[Consumer(Worker)] λ©”μ‹œμ§€λ₯Ό κΈ°λ‹€λ¦¬λŠ” 쀑..");

channel.basicQos(1);

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// λ©”μ‹œμ§€ μΆ”μΆœ
String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [Consumer(Worker)] Received '" + message + "'");
try {
doWork(message); // 받은 λ©”μ‹œμ§€ 처리 μž‘μ—… μˆ˜ν–‰
} finally {
System.out.println(" [Consumer(Worker)] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};

boolean autoAcknowledgement = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAcknowledgement, deliverCallback, consumerTag -> {
});
}

// Task 처리 λ©”μ„œλ“œ, Hello...일 경우 .ν•˜λ‚˜λ‹Ή 1μ΄ˆμ”© μ§€μ—°ν•΄μ„œ μ²˜λ¦¬ν•΄μ•Όν•¨
private static void doWork(String task) {
int cnt = 0;
for (char ch : task.toCharArray()) {
if (ch == '.') { // .(dot) ν•œ κ°œλ‹Ή 1μ΄ˆμ”© μ§€μ—°
try {
System.out.println(++cnt + "초 λ™μ•ˆ μž‘μ—… μˆ˜ν–‰μ€‘");
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package dev.mq.step03;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/*
μž‘μ—…(Task)을 μž‘μ—… 큐(Work queue)에 μŠ€μΌ€μ€„λ§ν•˜λŠ” ν”„λ‘œκ·Έλž¨
λ¬Έμžμ—΄κ°’μ΄ μ–΄λ–€ λ³΅μž‘ν•œ μž‘μ—…μ΄λΌκ³  κ°€μ •ν•  λ•Œ,
ex. Hello...이라고 ν•˜λ©΄
. ν•˜λ‚˜λŠ” 1μ΄ˆκ°€ μ†Œμš”λ˜λŠ” μž‘μ—…μ΄λΌκ³  κ°€μ •

λ”°λΌμ„œ Hello...λŠ” 3μ΄ˆκ°€ κ±Έλ¦¬λŠ” μž‘μ—…μ΄λΌκ³  κ°€μ •

μ‹€ν–‰ μΈμžκ°’(args) ν™œμš©ν•΄μ„œ Hello...κ³Ό 같은 κ°’ μž…λ ₯
*/
public class EmitLogs {


private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws IOException, TimeoutException {
// μ„œλ²„ μ—°κ²°
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

String message = args.length < 1 ? "info: Hello World!" :
String.join(" ", args);


// Exchange 생성
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package dev.mq.step03;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.MessageProperties;

public class ReceiveLogs {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
Loading
Loading