2021-08-05

This commit is contained in:
2021-08-05 11:46:27 +09:00
parent 2a56dff104
commit 438bf4bd5c
58 changed files with 2418 additions and 33 deletions

62
rabbit-mq/README.md Normal file
View File

@@ -0,0 +1,62 @@
# RabbitMQ Client Example
## Docker에 설치
```bash
#!/bin/bash
docker run -d --restart=no --name=rabbitmq \
--hostname rabbit-mq \
-p 5672:5672 \
-p 15672:15672 \
-v /docker/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v docker/rabbitmq/conf.d:/etc/rabbitmq/conf.d \
-e TZ=Asia/Seoul -e LANG=ko_KR.UTF-8 \
-e RABBITMQ_DEFAULT_USER=elex -e RABBITMQ_DEFAULT_PASS=test \
rabbitmq:3.8.11-management
```
## Gradle 디펜던시 추가
```kotlin
// https://mvnrepository.com/artifact/com.rabbitmq/amqp-client
implementation("com.rabbitmq:amqp-client:5.10.0")
```
## 다이렉트 익스체인지
* 익스체인지는 브로커가 메시지를 받는 곳이다. 즉, 메시지를 보낼 때는 익스체인지로 보내야 한다.
* 큐는 브로커가 메시지를 보내는 곳이다. 즉, 메시지를 받을 때는 큐로부터 받는다.
* 익스체인지와 큐는 라우팅-키로 서로 바인딩된다. 여러 개의 라우팅-키로 여러 번 바인딩 할 수도 있다.
* 익스체인지에 메시지가 도착하면 메시지의 라우팅-키와 일치하는 큐로 메시지를 보낸다.
## 팬아웃 익스체인지
* 라우팅-키 규칙이 무시된다.
## 토픽 익스체인지
* 라우팅-키를 패턴으로 사용한다.
## TLS
```bash
!/bin/bash
docker run -d --restart=always --name=rabbitmq \
--hostname rabbitmq \
-p 5671:5671 \
-p 5672:5672 \
-p 15671:15671 \
-p 15672:15672 \
-v /media/rabbitmq/certs:/etc/certs \
-e TZ=Asia/Seoul -e LANG=ko_KR.UTF-8 \
-e RABBITMQ_DEFAULT_USER=elex \
-e RABBITMQ_DEFAULT_PASS=test \
-e RABBITMQ_SSL_CACERTFILE=/etc/certs/ca.pem \
-e RABBITMQ_SSL_CERTFILE=/etc/certs/server.pem \
-e RABBITMQ_SSL_KEYFILE=/etc/certs/server.key.pem \
-e RABBITMQ_SSL_FAIL_IF_NO_PEER_CERT=false \
-e RABBITMQ_SSL_VERIFY=verify_none \
rabbitmq:3.8.11-management
```
-----
Copyright (c) 2021 Elex.
All Rights Reserved.
https://www.elex-project.com/

View File

@@ -1,7 +1,20 @@
/*
* Examples for Java
*
* Copyright (c) 2021. Elex. All Rights Reserved.
* https://www.elex-project.com/
*/
plugins {
id("elex-java")
}
dependencies {
// https://mvnrepository.com/artifact/com.rabbitmq/amqp-client
implementation("com.rabbitmq:amqp-client:5.13.0")
// https://mvnrepository.com/artifact/org.bouncycastle/bcprov-jdk15on
implementation("org.bouncycastle:bcprov-jdk15on:1.69")
// https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on
implementation("org.bouncycastle:bcpkix-jdk15on:1.69")
}

View File

@@ -0,0 +1,118 @@
/*
* Copyright (c) 2021. Elex. All Rights Reserved.
* https://www.elex-project.com/
*/
package kr.pe.elex.examples.fanout;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 팬 아웃 익스체인지는 라우팅-키 규칙에 무관하게 메시지를 전달한다.
* @author Elex
* @see "https://www.rabbitmq.com/tutorials/tutorial-two-java.html"
*/
@Slf4j
public class RabbitClient {
private static final String EXCHANGE = "elex.fanout.exchange";
private Connection connection;
private Channel channel;
private String queue;
RabbitClient() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("elex");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// fanout은 routing-key 규칙을 무시하고, 모든 큐에 메시지를 전달합니다.
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT, false);
// 큐 이름을 랜덤으로 생성합니다.
queue = channel.queueDeclare().getQueue();
// 익스체인지와 큐를 묶습니다.
channel.queueBind(queue, EXCHANGE, "");
}
public void consume(String consumerTag) throws IOException {
// 큐로부터 메시지를 받습니다.
channel.basicConsume(queue, true, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("Rx: [{}] {}", consumerTag, new String(body, StandardCharsets.UTF_8));
}
});
}
public void publish(String routingKey, String message) throws IOException {
// 익스체인지에 메시지를 보냅니다.
channel.basicPublish(EXCHANGE, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
log.info("Tx: [{}] {}", routingKey, message);
}
public void close() throws IOException, TimeoutException {
channel.close();
connection.close();
}
public static void main(String... args) throws IOException, TimeoutException {
RabbitClient producer = new RabbitClient();
RabbitClient consumer1 = new RabbitClient();
consumer1.consume("");
for (int i = 0; i < 10; i++) {
producer.publish(String.valueOf(i), "Hello, " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
log.error("Interrupted..", e);
}
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
log.error("Interrupted..", e);
}
producer.close();
consumer1.close();
/*
11:12:40.969 [main] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Tx: [0] Hello, 0
11:12:40.969 [pool-3-thread-4] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Rx: [amq.ctag-NJH49xhJFHvgAo90c1qEiA] Hello, 0
11:12:41.074 [main] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Tx: [1] Hello, 1
11:12:41.077 [pool-3-thread-5] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Rx: [amq.ctag-NJH49xhJFHvgAo90c1qEiA] Hello, 1
11:12:41.175 [main] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Tx: [2] Hello, 2
11:12:41.178 [pool-3-thread-6] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Rx: [amq.ctag-NJH49xhJFHvgAo90c1qEiA] Hello, 2
11:12:41.276 [main] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Tx: [3] Hello, 3
11:12:41.279 [pool-3-thread-7] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Rx: [amq.ctag-NJH49xhJFHvgAo90c1qEiA] Hello, 3
11:12:41.377 [main] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Tx: [4] Hello, 4
11:12:41.380 [pool-3-thread-8] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Rx: [amq.ctag-NJH49xhJFHvgAo90c1qEiA] Hello, 4
11:12:41.478 [main] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Tx: [5] Hello, 5
11:12:41.481 [pool-3-thread-9] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Rx: [amq.ctag-NJH49xhJFHvgAo90c1qEiA] Hello, 5
11:12:41.579 [main] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Tx: [6] Hello, 6
11:12:41.582 [pool-3-thread-10] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Rx: [amq.ctag-NJH49xhJFHvgAo90c1qEiA] Hello, 6
11:12:41.681 [main] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Tx: [7] Hello, 7
11:12:41.684 [pool-3-thread-11] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Rx: [amq.ctag-NJH49xhJFHvgAo90c1qEiA] Hello, 7
11:12:41.782 [main] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Tx: [8] Hello, 8
11:12:41.785 [pool-3-thread-12] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Rx: [amq.ctag-NJH49xhJFHvgAo90c1qEiA] Hello, 8
11:12:41.883 [main] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Tx: [9] Hello, 9
11:12:41.886 [pool-3-thread-13] INFO kr.pe.elex.rabbitmq.fanout.RabbitClient - Rx: [amq.ctag-NJH49xhJFHvgAo90c1qEiA] Hello, 9
*/
}
}

View File

@@ -0,0 +1,115 @@
/*
* Copyright (c) 2021. Elex. All Rights Reserved.
* https://www.elex-project.com/
*/
package kr.pe.elex.examples.hello;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 다이렉트 익스체인지 샘플
* <p>
* 익스체인지는 브로커가 메시지를 받는 곳이다.
* 큐는 브로커가 메시지를 보내는 곳이다.
* 익스체인지와 큐는 라우팅-키로 서로 바인딩된다.
* <p>
* 익스체인지에 메시지가 도착하면 메시지의 라우팅-키와 일치하는 큐로 메시지를 보낸다.
*
* @author Elex
* @see "https://www.rabbitmq.com/tutorials/tutorial-one-java.html"
*/
@Slf4j
public class HelloRabbit {
private static final String EXCHANGE = "elex.direct.exchange";
private static final String QUEUE = "elex.queue.01";
private static final String ROUTING_KEY = "elex-routing-key";
private Connection connection;
private Channel channel;
HelloRabbit() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("elex");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 익스체인지는 브로커가 메시지를 받는 곳입니다.
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
// 큐는 브로커가 메시지를 보내는 곳입니다.
channel.queueDeclare(QUEUE, false, false, false, null);
// 익스체인지와 큐를 묶습니다.
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
// 큐로부터 메시지를 받습니다.
channel.basicConsume(QUEUE, true, ROUTING_KEY, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("Rx: {}", new String(body, StandardCharsets.UTF_8));
}
});
}
public void publish(String message) throws IOException {
// 익스체인지에 메시지를 보냅니다.
channel.basicPublish(EXCHANGE, ROUTING_KEY, null, message.getBytes(StandardCharsets.UTF_8));
log.info("Tx: {}", message);
}
public void close() throws IOException, TimeoutException {
channel.close();
connection.close();
}
public static void main(String... args) throws IOException, TimeoutException {
HelloRabbit helloRabbit = new HelloRabbit();
for (int i = 0; i < 10; i++) {
helloRabbit.publish("Hello, " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("Interrupted..", e);
}
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
log.error("Interrupted..", e);
}
helloRabbit.close();
/*
11:01:31.502 [main] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Tx: Hello, 0
11:01:31.502 [pool-1-thread-4] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Rx: Hello, 0
11:01:32.505 [main] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Tx: Hello, 1
11:01:32.509 [pool-1-thread-5] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Rx: Hello, 1
11:01:33.507 [main] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Tx: Hello, 2
11:01:33.510 [pool-1-thread-6] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Rx: Hello, 2
11:01:34.508 [main] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Tx: Hello, 3
11:01:34.511 [pool-1-thread-7] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Rx: Hello, 3
11:01:35.509 [main] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Tx: Hello, 4
11:01:35.512 [pool-1-thread-8] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Rx: Hello, 4
11:01:36.511 [main] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Tx: Hello, 5
11:01:36.514 [pool-1-thread-9] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Rx: Hello, 5
11:01:37.512 [main] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Tx: Hello, 6
11:01:37.515 [pool-1-thread-10] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Rx: Hello, 6
11:01:38.513 [main] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Tx: Hello, 7
11:01:38.516 [pool-1-thread-11] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Rx: Hello, 7
11:01:39.514 [main] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Tx: Hello, 8
11:01:39.517 [pool-1-thread-12] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Rx: Hello, 8
11:01:40.515 [main] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Tx: Hello, 9
11:01:40.518 [pool-1-thread-13] INFO kr.pe.elex.rabbitmq.hello.HelloRabbit - Rx: Hello, 9
*/
}
}

View File

@@ -0,0 +1,105 @@
/*
* Copyright (c) 2021. Elex. All Rights Reserved.
* https://www.elex-project.com/
*/
package kr.pe.elex.examples.hello;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 다이렉트 익스체인지 샘플
* <p>
* 메시지 소비자는 메시지 처리 후 ack를 보내야 한다.
* QoS를 지정해서 소비자에 전달할 메시지의 최대 개수를 지정할 수 있다.
*
* @author Elex
* @see "https://www.rabbitmq.com/tutorials/tutorial-two-java.html"
*/
@Slf4j
public class HelloRabbit2 {
private static final String EXCHANGE = "elex.direct.exchange";
private static final String QUEUE = "elex.queue";
private static final String ROUTING_KEY = "elex-routing-key";
private Connection connection;
private Channel channel;
HelloRabbit2() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("elex");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 익스체인지는 브로커가 메시지를 받는 곳입니다.
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
// 큐는 브로커가 메시지를 보내는 곳입니다.
channel.queueDeclare(QUEUE, false, false, false, null);
// 익스체인지와 큐를 묶습니다.
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
// 메시지 소비자에 전달할 메시지의 최대 개수입니다. ack를 받을 때까지 메시지 전송을 미룰 수 있습니다.
channel.basicQos(1);
// 큐로부터 메시지를 받습니다.
channel.basicConsume(QUEUE, false, ROUTING_KEY, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("Rx: {}", new String(body, StandardCharsets.UTF_8));
// 만일, 수신 확인을 하지 않으면, 브로커는 다시 전송을 시도할겁니다.
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
public void publish(String message) throws IOException, TimeoutException, InterruptedException {
// 익스체인지에 메시지를 보냅니다.
channel.basicPublish(EXCHANGE, ROUTING_KEY,
// 브로커가 메시지를 디스크에 저장해둠으로써, 오류 등으로 브로커가 종료되었을 경우에
// 미처 전달되지 못한 메시지가 사라지는 것을 예방합니다.
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(StandardCharsets.UTF_8));
// 메시지가 전달되지 않으면 예외가 발생합니다.
channel.waitForConfirmsOrDie(1000);
log.info("Tx: {}", message);
}
public void close() throws IOException, TimeoutException {
channel.close();
connection.close();
}
public static void main(String... args) throws IOException, TimeoutException {
HelloRabbit2 helloRabbit = new HelloRabbit2();
for (int i = 0; i < 10; i++) {
try {
helloRabbit.publish("Hello, " + i);
} catch (TimeoutException | InterruptedException e) {
log.error("Publish fail..", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("Interrupted..", e);
}
}
helloRabbit.close();
}
}

View File

@@ -0,0 +1,136 @@
/*
* Copyright (c) 2021. Elex. All Rights Reserved.
* https://www.elex-project.com/
*/
package kr.pe.elex.examples.loadbalance;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 하나의 큐에 여러 개의 컨슈머를 할당합니다.
*
* @author Elex
* @see "https://www.rabbitmq.com/tutorials/tutorial-two-java.html"
*/
@Slf4j
public class RabbitClient {
private static final String EXCHANGE = "elex.direct.exchange";
private static final String QUEUE = "elex.queue";
private static final String ROUTING_KEY = "elex-routing-key";
private String name;
private Connection connection;
private Channel channel;
RabbitClient(String name) throws IOException, TimeoutException {
this.name = name;
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("elex");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 익스체인지는 브로커가 메시지를 받는 곳입니다.
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
// 큐는 브로커가 메시지를 보내는 곳입니다.
channel.queueDeclare(QUEUE, false, false, false, null);
// 익스체인지와 큐를 묶습니다.
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
// 메시지 소비자에 전달할 메시지의 최대 개수입니다. ack를 받을 때까지 메시지 전송을 미룰 수 있습니다.
channel.basicQos(1);
}
public void consume(String consumerTag) throws IOException {
// 큐로부터 메시지를 받습니다.
channel.basicConsume(QUEUE, false, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("Rx: [{}] {}", name, new String(body, StandardCharsets.UTF_8));
try {
// 메시지를 처리하는데 시간이 좀 걸린다고 가정합니다.
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("Interrupted..", e);
}
// 메시지 처리 후 ack를 보냅니다.
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
public void publish(String message) throws IOException {
// 익스체인지에 메시지를 보냅니다.
channel.basicPublish(EXCHANGE, ROUTING_KEY, null, message.getBytes(StandardCharsets.UTF_8));
log.info("Tx: [{}] {}", name, message);
}
public void close() throws IOException, TimeoutException {
channel.close();
connection.close();
}
public static void main(String... args) throws IOException, TimeoutException {
RabbitClient producer = new RabbitClient("Producer");
RabbitClient consumer1 = new RabbitClient("Consumer1");
RabbitClient consumer2 = new RabbitClient("Consumer2");
RabbitClient consumer3 = new RabbitClient("Consumer3");
consumer1.consume(ROUTING_KEY);
consumer2.consume(ROUTING_KEY);
consumer3.consume(ROUTING_KEY);
for (int i = 0; i < 10; i++) {
producer.publish("Hello, " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
log.error("Interrupted..", e);
}
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
log.error("Interrupted..", e);
}
producer.close();
consumer1.close();
consumer2.close();
consumer3.close();
/*
10:59:25.659 [main] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Tx: [Producer] Hello, 0
10:59:25.659 [pool-3-thread-4] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Rx: [Consumer1] Hello, 0
10:59:25.762 [main] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Tx: [Producer] Hello, 1
10:59:25.765 [pool-5-thread-4] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Rx: [Consumer2] Hello, 1
10:59:25.863 [main] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Tx: [Producer] Hello, 2
10:59:25.866 [pool-7-thread-4] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Rx: [Consumer3] Hello, 2
10:59:25.965 [main] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Tx: [Producer] Hello, 3
10:59:26.066 [main] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Tx: [Producer] Hello, 4
10:59:26.167 [main] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Tx: [Producer] Hello, 5
10:59:26.268 [main] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Tx: [Producer] Hello, 6
10:59:26.369 [main] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Tx: [Producer] Hello, 7
10:59:26.470 [main] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Tx: [Producer] Hello, 8
10:59:26.571 [main] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Tx: [Producer] Hello, 9
10:59:26.664 [pool-3-thread-5] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Rx: [Consumer1] Hello, 3
10:59:26.767 [pool-5-thread-5] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Rx: [Consumer2] Hello, 4
10:59:26.869 [pool-7-thread-5] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Rx: [Consumer3] Hello, 5
10:59:27.665 [pool-3-thread-5] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Rx: [Consumer1] Hello, 6
10:59:27.768 [pool-5-thread-5] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Rx: [Consumer2] Hello, 7
10:59:27.870 [pool-7-thread-5] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Rx: [Consumer3] Hello, 8
10:59:28.666 [pool-3-thread-5] INFO kr.pe.elex.rabbitmq.loadbalance.RabbitClient - Rx: [Consumer1] Hello, 9
*/
}
}

View File

@@ -0,0 +1,6 @@
/*
* Copyright (c) 2021. Elex. All Rights Reserved.
* https://www.elex-project.com/
*/
package kr.pe.elex.examples;

View File

@@ -0,0 +1,120 @@
/*
* Copyright (c) 2021. Elex. All Rights Reserved.
* https://www.elex-project.com/
*/
package kr.pe.elex.examples.rpc;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import static kr.pe.elex.examples.rpc.HelloRabbitServer.EXCHANGE;
import static kr.pe.elex.examples.rpc.HelloRabbitServer.ROUTING_KEY;
/**
* {@link AMQP.BasicProperties.Builder#replyTo(String)}를 사용해서 응답받을 라우팅-키를 전달할 수 있다.
*
* @author Elex
* @see "https://www.rabbitmq.com/tutorials/tutorial-six-java.html"
*/
@Slf4j
public class HelloRabbit {
private static final String CLIENT_ROUTING_KEY = "client-routing-key";
private Connection connection;
private Channel channel;
private String queue;
private Map<UUID, Handler> handlers = new HashMap<>();
HelloRabbit() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("elex");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 익스체인지는 브로커가 메시지를 받는 곳입니다.
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
// 큐는 브로커가 메시지를 보내는 곳입니다.
queue = channel.queueDeclare().getQueue();
// 익스체인지와 큐를 묶습니다.
channel.queueBind(queue, EXCHANGE, CLIENT_ROUTING_KEY);
// 큐로부터 메시지를 받습니다.
channel.basicConsume(queue, false, queue, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 메시지 아이디로 핸들러를 가져옵니다.
Handler handler = handlers.remove(UUID.fromString(properties.getCorrelationId()));
if (null != handler) {
handler.onResponse(new String(body, StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
});
}
public void publish(String message, Handler handler) throws IOException {
UUID uuid = UUID.randomUUID(); // 메시지 아이디로 사용됩니다.
handlers.put(uuid, handler); // 응답 처리를 위해 핸들러를 저장해둡니다.
// 익스체인지에 메시지를 보냅니다.
channel.basicPublish(EXCHANGE, ROUTING_KEY,
new AMQP.BasicProperties.Builder()
.replyTo(CLIENT_ROUTING_KEY)
.contentEncoding(StandardCharsets.UTF_8.name())
.contentType("text/plain")
.correlationId(uuid.toString())
.deliveryMode(MessageProperties.PERSISTENT_BASIC.getDeliveryMode())
.build(),
message.getBytes(StandardCharsets.UTF_8));
log.info("Tx: {}", message);
}
public void close() throws IOException, TimeoutException {
channel.close();
connection.close();
handlers.clear();
}
interface Handler {
void onResponse(String message);
}
public static void main(String... args) throws IOException, TimeoutException {
HelloRabbitServer server = new HelloRabbitServer();
HelloRabbit client = new HelloRabbit();
client.publish("Hello, there", new Handler() {
@Override
public void onResponse(String message) {
log.info("Rx: {}", message);
}
});
client.publish("Lorem ipsum ...", new Handler() {
@Override
public void onResponse(String message) {
log.info("Rx: {}", message);
}
});
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
log.error("Interrupted..", e);
}
client.close();
server.close();
}
}

View File

@@ -0,0 +1,76 @@
/*
* Copyright (c) 2021. Elex. All Rights Reserved.
* https://www.elex-project.com/
*/
package kr.pe.elex.examples.rpc;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @author Elex
* @see "https://www.rabbitmq.com/tutorials/tutorial-six-java.html"
*/
@Slf4j
public class HelloRabbitServer {
static final String EXCHANGE = "elex.rpc.exchange";
static final String QUEUE = "elex.rpc.queue";
static final String ROUTING_KEY = "elex-routing-key";
private Connection connection;
private Channel channel;
HelloRabbitServer() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("elex");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 익스체인지는 브로커가 메시지를 받는 곳입니다.
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
// 큐는 브로커가 메시지를 보내는 곳입니다.
channel.queueDeclare(QUEUE, false, false, false, null);
// 익스체인지와 큐를 묶습니다.
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
// 큐로부터 메시지를 받습니다.
channel.basicConsume(QUEUE, true, ROUTING_KEY, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("Server Rx: {}", new String(body, StandardCharsets.UTF_8));
String queue = properties.getReplyTo();
String messageId = properties.getCorrelationId();
String message = new String(body, StandardCharsets.UTF_8).toUpperCase();
channel.basicPublish(EXCHANGE, queue,
new AMQP.BasicProperties.Builder()
.correlationId(messageId)
.build(),
message.getBytes(StandardCharsets.UTF_8));
log.info("Server Tx: {}", message);
}
});
}
public void close() throws IOException, TimeoutException {
channel.close();
connection.close();
}
interface Handler {
void onResponse(String message);
}
}

View File

@@ -0,0 +1,106 @@
/*
* Copyright (c) 2021. Elex. All Rights Reserved.
* https://www.elex-project.com/
*/
package kr.pe.elex.examples.tls;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.concurrent.TimeoutException;
/**
* TLS 샘플
*
* @author Elex
* @see "https://www.rabbitmq.com/tutorials/tutorial-one-java.html"
*/
@Slf4j
public class HelloRabbit {
private static final String EXCHANGE = "elex.direct.exchange";
private static final String QUEUE = "elex.queue.01";
private static final String ROUTING_KEY = "elex-routing-key";
private Connection connection;
private Channel channel;
private SSLContext sslContext()
throws NoSuchAlgorithmException, KeyStoreException, UnrecoverableKeyException, KeyManagementException, IOException, CertificateException {
return TlsHelper.context(getClass().getResourceAsStream("/clientstore.p12"),
"test".toCharArray(),
"test1".toCharArray(),
getClass().getResourceAsStream("/truststore.p12"),
"test".toCharArray());
}
HelloRabbit() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5671); // 포트 번호가 다릅니다.
connectionFactory.setUsername("elex");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
//connectionFactory.useSslProtocol(); // 테스트 환경에서 사용됩니다.
connectionFactory.useSslProtocol(sslContext());
//connectionFactory.enableHostnameVerification(); // 인증서 내용과 호스트네임을 검증합니다.
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 익스체인지는 브로커가 메시지를 받는 곳입니다.
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
// 큐는 브로커가 메시지를 보내는 곳입니다.
channel.queueDeclare(QUEUE, false, false, false, null);
// 익스체인지와 큐를 묶습니다.
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
// 큐로부터 메시지를 받습니다.
channel.basicConsume(QUEUE, true, ROUTING_KEY, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("Rx: {}", new String(body, StandardCharsets.UTF_8));
}
});
}
public void publish(String message) throws IOException {
// 익스체인지에 메시지를 보냅니다.
channel.basicPublish(EXCHANGE, ROUTING_KEY, null, message.getBytes(StandardCharsets.UTF_8));
log.info("Tx: {}", message);
}
public void close() throws IOException, TimeoutException {
channel.close();
connection.close();
}
public static void main(String... args) throws IOException, TimeoutException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
HelloRabbit helloRabbit = new HelloRabbit();
for (int i = 0; i < 10; i++) {
helloRabbit.publish("Hello, " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("Interrupted..", e);
}
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
log.error("Interrupted..", e);
}
helloRabbit.close();
}
}

View File

@@ -0,0 +1,93 @@
/*
* Copyright (c) 2021. Elex. All Rights Reserved.
* https://www.elex-project.com/
*/
package kr.pe.elex.examples.tls;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.security.*;
import java.security.cert.CertificateException;
public final class TlsHelper {
private TlsHelper(){}
/**
* SSL Context를 만든 다음, 소켓 팩토리를 가져옵니다.
*
* @param keyStoreInputStream 키스토어
* @param keyStorePassword 키스토어 비번
* @param keyPassword 키 비번
* @param trustStoreInputStream CA 인증서 키 스토어
* @param trustStorePassword 트러스트 스토어 비번
* @return
* @throws KeyStoreException
* @throws CertificateException
* @throws NoSuchAlgorithmException
* @throws IOException
* @throws UnrecoverableKeyException
* @throws KeyManagementException
*/
public static SSLContext context(InputStream keyStoreInputStream, char[] keyStorePassword, char[] keyPassword,
InputStream trustStoreInputStream, char[] trustStorePassword)
throws KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException, UnrecoverableKeyException,
KeyManagementException {
// OpenSSL로 키와 인증서를 만들고, PKCS12 키 저장소에 넣었습니다.
KeyStore keyStore = KeyStore.getInstance("PKCS12");
// 클라이언트 키와 인증서가 들어있습니다.
keyStore.load(keyStoreInputStream,
keyStorePassword); // 키스토어 비번
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, keyPassword); // 키 비번
KeyStore trustKeyStore = KeyStore.getInstance("PKCS12");
// CA 인증서가 들어있습니다.
trustKeyStore.load(trustStoreInputStream,
trustStorePassword); // 키 스토어 비번
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(trustKeyStore);
SSLContext context = SSLContext.getInstance("TLSv1.3");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return context;
}
/**
* @param trustStoreInputStream CA 인증서 키 스토어
* @param trustStorePassword 트러스트 스토어 비번
* @return
* @throws KeyStoreException
* @throws CertificateException
* @throws NoSuchAlgorithmException
* @throws IOException
* @throws UnrecoverableKeyException
* @throws KeyManagementException
*/
public static SSLContext context(InputStream trustStoreInputStream, char[] trustStorePassword)
throws KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException, UnrecoverableKeyException,
KeyManagementException {
// OpenSSL로 키와 인증서를 만들고, PKCS12 키 저장소에 넣었습니다.
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
// 클라이언트 키와 인증서가 들어있습니다.
keyStore.load(null, null); // 키스토어 비번
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, null); // 키 비번
KeyStore trustKeyStore = KeyStore.getInstance("PKCS12");
// CA 인증서가 들어있습니다.
trustKeyStore.load(trustStoreInputStream,
trustStorePassword); // 키 스토어 비번
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(trustKeyStore);
SSLContext context = SSLContext.getInstance("TLSv1.3");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return context;
}
}

View File

@@ -0,0 +1,129 @@
/*
* Copyright (c) 2021. Elex. All Rights Reserved.
* https://www.elex-project.com/
*/
package kr.pe.elex.examples.tls;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMDecryptorProvider;
import org.bouncycastle.openssl.PEMEncryptedKeyPair;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.security.*;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
public final class TlsHelperWithBouncyCastle {
private TlsHelperWithBouncyCastle(){}
public static SSLContext context(InputStream clientCrtInputStream, InputStream clientKeyInputStream, char[] clientPassword,
InputStream caCrtInputStream)
throws KeyStoreException, IOException, CertificateException, UnrecoverableKeyException, KeyManagementException, NoSuchAlgorithmException {
Security.addProvider(new BouncyCastleProvider());
// CA 인증서 불러오기
CertificateFactory caCertFactory = CertificateFactory.getInstance("X.509");
PEMParser parser = new PEMParser(new InputStreamReader(caCrtInputStream));
X509Certificate caCert = (X509Certificate) caCertFactory
.generateCertificate(new ByteArrayInputStream(parser.readPemObject().getContent()));
parser.close();
// 클라이언트 인증서 불러오기
CertificateFactory clientCertFactory = CertificateFactory.getInstance("X.509");
parser = new PEMParser(new InputStreamReader(clientCrtInputStream));
X509Certificate clientCert = (X509Certificate) clientCertFactory
.generateCertificate(new ByteArrayInputStream(parser.readPemObject().getContent()));
parser.close();
// 클라이언트 비밀키 불러오기
parser = new PEMParser(new InputStreamReader(clientKeyInputStream));
Object object = parser.readObject();
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
KeyPair keyPair;
if (object instanceof PEMEncryptedKeyPair) {
// 암호화된 키라면 패스워드가 필요하다.
PEMEncryptedKeyPair pemKeyPair = (PEMEncryptedKeyPair) object;
PEMDecryptorProvider decProv = new JcePEMDecryptorProviderBuilder().build(clientPassword);
keyPair = converter.getKeyPair(pemKeyPair.decryptKeyPair(decProv));
} else {
// 암호화되지 않은 키라면 비밀번호가 필요없다.
PEMKeyPair pemKeyPair = (PEMKeyPair) object;
keyPair = converter.getKeyPair(pemKeyPair);
}
parser.close();
PrivateKey clientKey = keyPair.getPrivate();
// CA 인증서를 사용해서 트러스트 스토어를 만든다.
KeyStore caKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
caKeyStore.load(null, null);
caKeyStore.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(caKeyStore);
// 클라이언트 인증서와 비밀키를 사용해서 키 스토어를 만든다.
KeyStore clientKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
clientKeyStore.load(null, null);
clientKeyStore.setCertificateEntry("certificate", clientCert);
clientKeyStore.setKeyEntry("private-key", clientKey, clientPassword, new java.security.cert.Certificate[]{clientCert});
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(clientKeyStore, clientPassword);
SSLContext context = SSLContext.getInstance("TLSv1.3");
context.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
return context;
}
/**
* BouncyCastle을 이용.
*
* @param caCrtInputStream ca cert pem
* @return
* @throws KeyStoreException
* @throws IOException
* @throws CertificateException
* @throws UnrecoverableKeyException
* @throws KeyManagementException
* @throws NoSuchAlgorithmException
*/
public static SSLContext context(InputStream caCrtInputStream)
throws KeyStoreException, IOException, CertificateException, UnrecoverableKeyException, KeyManagementException, NoSuchAlgorithmException {
Security.addProvider(new BouncyCastleProvider());
// CA 인증서 불러오기
CertificateFactory caCertFactory = CertificateFactory.getInstance("X.509");
PEMParser parser = new PEMParser(new InputStreamReader(caCrtInputStream));
X509Certificate caCert = (X509Certificate) caCertFactory
.generateCertificate(new ByteArrayInputStream(parser.readPemObject().getContent()));
parser.close();
// CA 인증서를 사용해서 트러스트 스토어를 만든다.
KeyStore caKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
caKeyStore.load(null, null);
caKeyStore.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(caKeyStore);
// 클라이언트 인증서와 비밀키는 없으므로, 그냥 만든다.
KeyStore clientKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
clientKeyStore.load(null, null);
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(clientKeyStore, null);
SSLContext context = SSLContext.getInstance("TLSv1.3");
context.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
return context;
}
}

View File

@@ -0,0 +1,104 @@
/*
* Copyright (c) 2021. Elex. All Rights Reserved.
* https://www.elex-project.com/
*/
package kr.pe.elex.examples.topic;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 토픽 익스체인지는 라우팅-키를 패턴으로 사용한다.
*
* @author Elex
* @see "https://www.rabbitmq.com/tutorials/tutorial-five-java.html"
*/
@Slf4j
public class RabbitClient {
private static final String EXCHANGE = "elex.topic.exchange";
private String name;
private Connection connection;
private Channel channel;
private String queue;
RabbitClient(String name) throws IOException, TimeoutException {
this.name = name;
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("elex");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// topic은 routing-key를 패턴으로 사용합니다.
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, false);
// 큐 이름을 랜덤으로 생성합니다.
queue = channel.queueDeclare().getQueue();
}
public void consume(String topic) throws IOException {
// 익스체인지와 큐를 묶습니다.
channel.queueBind(queue, EXCHANGE, topic);
// 큐로부터 메시지를 받습니다.
channel.basicConsume(queue, true, queue, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("Rx: [{}] {} : {}", name, envelope.getRoutingKey(), new String(body, StandardCharsets.UTF_8));
}
});
}
public void publish(String topic, String message) throws IOException {
// 익스체인지에 메시지를 보냅니다.
channel.basicPublish(EXCHANGE, topic, null, message.getBytes(StandardCharsets.UTF_8));
log.info("Tx: [{}] {} : {}", name, topic, message);
}
public void close() throws IOException, TimeoutException {
channel.close();
connection.close();
}
public static void main(String... args) throws IOException, TimeoutException {
RabbitClient producer = new RabbitClient("Producer");
RabbitClient consumer1 = new RabbitClient("Consumer1");
RabbitClient consumer2 = new RabbitClient("Consumer2");
consumer1.consume("message.apple.#");
consumer2.consume("message.#");
producer.publish("message.hello", "Hello, there.");
producer.publish("message.apple", "Hello, apple.");
producer.publish("message.banana", "Hello, banana.");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
log.error("Interrupted..", e);
}
producer.close();
consumer1.close();
consumer2.close();
/*
11:29:47.699 [main] INFO kr.pe.elex.rabbitmq.topic.RabbitClient - Tx: [Producer] message.hello : Hello, there.
11:29:47.700 [pool-5-thread-4] INFO kr.pe.elex.rabbitmq.topic.RabbitClient - Rx: [Consumer2] message.hello : Hello, there.
11:29:47.703 [main] INFO kr.pe.elex.rabbitmq.topic.RabbitClient - Tx: [Producer] message.apple : Hello, apple.
11:29:47.703 [main] INFO kr.pe.elex.rabbitmq.topic.RabbitClient - Tx: [Producer] message.banana : Hello, banana.
11:29:47.704 [pool-5-thread-5] INFO kr.pe.elex.rabbitmq.topic.RabbitClient - Rx: [Consumer2] message.apple : Hello, apple.
11:29:47.704 [pool-5-thread-5] INFO kr.pe.elex.rabbitmq.topic.RabbitClient - Rx: [Consumer2] message.banana : Hello, banana.
11:29:47.704 [pool-3-thread-4] INFO kr.pe.elex.rabbitmq.topic.RabbitClient - Rx: [Consumer1] message.apple : Hello, apple.
*/
}
}