SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
一、RabbitTemplate 的使用
1.【導(dǎo)入依賴(lài)】
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.1</version>
</dependency>2.【添加配置】
rabbitmq:
host: #ip地址
port: 5672 #端口
username: guest
password: guest
virtual-host: /
listener:
simple:
prefetch: 1 # 默認(rèn)每次取出一條消息消費(fèi), 消費(fèi)完成取下一條
acknowledge-mode: manual # 設(shè)置消費(fèi)端手動(dòng)ack確認(rèn)
retry:
enabled: true # 是否支持重試
publisher-confirm-type: correlated #確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)
publisher-returns: true #確認(rèn)消息已發(fā)送到隊(duì)列(Queue)3.【點(diǎn)對(duì)點(diǎn)通信(隊(duì)列模式)(Point-to-Point Messaging)】
使用方式:
這種方式也被稱(chēng)為隊(duì)列(Queue)模型。消息發(fā)送者(Producer)發(fā)送消息到隊(duì)列,然后消息接收者(Consumer)從隊(duì)列中獲取消息進(jìn)行處理。這種模型下,每個(gè)消息只有一個(gè)消費(fèi)者可以接收,確保消息的可靠傳遞和順序處理。
代碼示例: 生產(chǎn)者
/**
* 第一種模型: 簡(jiǎn)單模型
* 一個(gè)消息生產(chǎn)者 一個(gè)隊(duì)列 一個(gè)消費(fèi)者
* @return
*/
@GetMapping("hello/world")
public void helloWorld() {
SysUser sysUser = new SysUser();
// 發(fā)送消息
// 第一個(gè)參數(shù): String routingKey 路由規(guī)則 【交換機(jī) 和隊(duì)列的綁定規(guī)則 】 隊(duì)列名稱(chēng)
// 第二個(gè)參數(shù): object message 消息的內(nèi)容
// rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!");
/// MessagePostProcessor 消息包裝器 如果需要對(duì)消息進(jìn)行包裝
rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!", message -> {
// 設(shè)置唯一的標(biāo)識(shí)
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
});消費(fèi)者
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class HelloWorldConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 監(jiān)聽(tīng) hello_world_queue 隊(duì)列消費(fèi)消息
* queues 監(jiān)聽(tīng)隊(duì)列的名稱(chēng) 要求這個(gè)隊(duì)列必須是已經(jīng)存在的隊(duì)列
* queuesToDeclare 監(jiān)聽(tīng)隊(duì)列 如果這個(gè)隊(duì)列不存在 則 rabbitMQ 中 RabbitAdmin 會(huì)幫助去構(gòu)建這個(gè)隊(duì)列
*/
@RabbitListener(queuesToDeclare = @Queue("hello_world_queue"))
public void helloWorldConsumer(String msg, Message message, Channel channel) {
// 獲取消息的唯一標(biāo)識(shí)
String messageId = message.getMessageProperties().getMessageId();
// 將消息添加到 Redis的set集合中 set 不能重復(fù)的 方法的返回值 添加成功的數(shù)量
Long count = redisTemplate.opsForSet().add("hello_world_queue", messageId);
if (count != null && count == 1) {
// 沒(méi)有消費(fèi)過(guò) 正常消費(fèi)
log.info("hello_world_queue隊(duì)列消費(fèi)者接收到了消息,消息內(nèi)容:{}", message);
}
}
}4.【發(fā)布/訂閱模式(Publish/Subscribe Messaging)】
使用方式:
在發(fā)布/訂閱模式中,消息發(fā)送者將消息發(fā)布到交換機(jī)(Exchange),而不是直接發(fā)送到隊(duì)列。交換機(jī)負(fù)責(zé)將消息路由到一個(gè)或多個(gè)綁定的隊(duì)列中。每個(gè)訂閱者(Subscriber)可以選擇訂閱它感興趣的消息隊(duì)列,從而接收消息。
代碼示例: 生產(chǎn)者
/**
* 工作隊(duì)列
* 一個(gè)生產(chǎn)者 一個(gè)隊(duì)列 多個(gè)消費(fèi)者
*/
@GetMapping("work/queue")
public void workQueue() {
for (int i = 1; i <= 10; i++) {
rabbitTemplate.convertAndSend("work_queue", i + "hello work queue!");
}
}消費(fèi)者
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class WorkQueueConsumer {
/***
* 消費(fèi)者1
* @param message
*/
@RabbitListener(queuesToDeclare = @Queue("work_queue"))
public void workQueueConsumer(String message) throws InterruptedException {
Thread.sleep(200);
log.info("work_queue隊(duì)列消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message);
}
/***
* 消費(fèi)者2
* @param message
*/
@RabbitListener(queuesToDeclare = @Queue("work_queue"))
public void workQueueConsumer2(String message) throws InterruptedException {
Thread.sleep(400);
log.info("work_queue隊(duì)列消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message);
}
}5.【工作隊(duì)列模式(Work Queues)】
使用方式:
工作隊(duì)列模式也稱(chēng)為任務(wù)隊(duì)列(Task Queues),它可以用來(lái)實(shí)現(xiàn)任務(wù)的異步處理。多個(gè)工作者(Worker)同時(shí)監(jiān)聽(tīng)同一個(gè)隊(duì)列,當(dāng)有新的任務(wù)消息被發(fā)送到隊(duì)列中時(shí),空閑的工作者會(huì)獲取并處理這些任務(wù),確保任務(wù)能夠并行處理而不會(huì)重復(fù)執(zhí)行。
代碼示例: 生產(chǎn)者
/**
* 發(fā)布訂閱
* 一個(gè)生產(chǎn)者 多個(gè)隊(duì)列 多個(gè)消費(fèi)者 涉及 到交換機(jī) fanout
*/
@GetMapping("publish/subscribe")
public void publishSubscribe() {
// 第一個(gè)參數(shù): 交換機(jī)的名稱(chēng) 沒(méi)有要求
// 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則 如果是發(fā)布訂閱模式 那么這個(gè)規(guī)則默認(rèn)不寫(xiě) 只需要交換機(jī)和隊(duì)列綁定即可不需要規(guī)則
// 第三個(gè)參數(shù): 消息內(nèi)容
rabbitTemplate.convertAndSend("publish_subscribe_exchange", "",
"hello publisher subscribe!!");
}消費(fèi)者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class PublisherSubscribeConsumer {
private static final Logger log = LoggerFactory.getLogger(PublisherSubscribeConsumer.class);
/**
* 發(fā)布訂閱模型消費(fèi)者
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_01"),
exchange = @Exchange(name = "publish_subscribe_exchange",
type = ExchangeTypes.FANOUT)))
public void publisherSubscribe(String message) {
log.info("發(fā)布訂閱模型消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message);
}
/**
* 發(fā)布訂閱模型消費(fèi)者
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_02"),
exchange = @Exchange(name = "publish_subscribe_exchange", type = ExchangeTypes.FANOUT)))
public void publisherSubscribe2(String message) {
log.info("發(fā)布訂閱模型消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message);
}
}6.【路由模式(Routing)】
使用方式:
路由模式允許發(fā)送者根據(jù)消息的路由鍵(Routing Key)將消息路由到特定的隊(duì)列。發(fā)送者將消息發(fā)送到交換機(jī),并且通過(guò)設(shè)置不同的路由鍵,使消息能夠被交換機(jī)路由到不同的隊(duì)列。消費(fèi)者可以根據(jù)需要選擇監(jiān)聽(tīng)哪些隊(duì)列來(lái)接收消息。
代碼示例: 生產(chǎn)者
/**
* 路由模型
* 一個(gè)生產(chǎn)者 多個(gè)隊(duì)列 多個(gè)消費(fèi)者 涉及 到交換機(jī) direct
*/
@GetMapping("routing")
public void routing() {
// 第一個(gè)參數(shù): 交換機(jī)的名稱(chēng) 沒(méi)有要求
// 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則 字符串 隨意
// 第三個(gè)參數(shù): 消息內(nèi)容
rabbitTemplate.convertAndSend("routing_exchange", "aaa",
"hello routing!!");
}消費(fèi)者
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class RoutingConsumer {
/**
* 路由模型消費(fèi)者
* @param message
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_01"),
exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),
key = { "abc", "error", "info" }))
public void routingConsumer(String message) {
log.info("路由模型消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message);
}
/**
* 路由模型消費(fèi)者
* @param message
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_02"),
exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),
key = { "aaa", "ccc", "waadaffas" }))
public void routingConsumer2(String message) {
log.info("路由模型消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message);
}
/**
* 路由模型消費(fèi)者
* @param message
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_03"),
exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),
key = { "bbbb", "asdfasd", "asdfasdf" }))
public void routingConsumer3(String message) {
log.info("路由模型消費(fèi)者3接收到了消息,消息內(nèi)容:{}", message);
}
}7.【主題模式(Topics)】
使用方式:
主題模式是路由模式的一種擴(kuò)展,它允許發(fā)送者根據(jù)消息的多個(gè)屬性(如主題)將消息路由到一個(gè)或多個(gè)隊(duì)列。主題交換機(jī)(Topic Exchange)使用通配符匹配路由鍵與隊(duì)列綁定鍵的模式,從而實(shí)現(xiàn)更靈活的消息路由和過(guò)濾。
代碼示例: 生產(chǎn)者
/**
* 主題模型
* 一個(gè)生產(chǎn)者 多個(gè)隊(duì)列 多個(gè)消費(fèi)者 涉及 到交換機(jī) topic
*/
@GetMapping("topic")
public void topic() {
// 第一個(gè)參數(shù): 交換機(jī)的名稱(chēng) 沒(méi)有要求
// 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則 多個(gè)單詞 以 “.” 拼起來(lái)
// 第三個(gè)參數(shù): 消息內(nèi)容
rabbitTemplate.convertAndSend("topic_exchange", "bwie.age.name",
"hello topic!!");
}消費(fèi)者
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class TopicConsumer {
/**
* * 表示任意一個(gè)單詞
* # 表示任意一個(gè)單詞 或 多個(gè)
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_01"),
exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),
key = { "abc.*", "error.*.info", "#.name" }))
public void topicConsumer(String message) {
log.info("xxxxxxxxx1");
}
/**
* * 表示任意一個(gè)單詞
* # 表示任意一個(gè)單詞 或 多個(gè)
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_02"),
exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),
key = { "abc.*", "username" }))
public void topicConsumer2(String message) {
log.info("xxxxxxxxx2");
}
/**
* * 表示任意一個(gè)單詞
* # 表示任意一個(gè)單詞 或 多個(gè)
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_03"),
exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),
key = { "bwie.*", "error.*.info" }))
public void topicConsumer3(String message) {
log.info("xxxxxxxxx3");
}
}到此這篇關(guān)于SpringBoot 整合 RabbitMQ 的使用的文章就介紹到這了,更多相關(guān)SpringBoot 整合 RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot實(shí)現(xiàn)RabbitMQ監(jiān)聽(tīng)消息的四種方式
- SpringBoot使用RabbitMQ延時(shí)隊(duì)列(小白必備)
- SpringBoot中RabbitMQ集群的搭建詳解
- SpringBoot集成RabbitMQ的方法(死信隊(duì)列)
- 一文掌握Springboot集成RabbitMQ的方法
- Springboot 配置RabbitMQ文檔的方法步驟
- springboot整合rabbitmq的示例代碼
- SpringBoot中連接多個(gè)RabbitMQ的方法詳解
- springboot3.0整合rabbitmq3.13的實(shí)現(xiàn)示例
相關(guān)文章
java的Map集合中按value值進(jìn)行排序輸出的實(shí)例代碼
下面小編就為大家?guī)?lái)一篇java的Map集合中按value值進(jìn)行排序輸出的實(shí)例代碼。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-08-08
解析Java中的定時(shí)器及使用定時(shí)器制作彈彈球游戲的示例
這篇文章主要介紹了Java中的定時(shí)器及使用定時(shí)器制作彈彈球游戲的示例,文中同時(shí)也分析了定時(shí)器timer的缺點(diǎn)及相關(guān)替代方案,需要的朋友可以參考下2016-02-02
Java基礎(chǔ)之Math和Random類(lèi)知識(shí)總結(jié)
今天帶大家來(lái)學(xué)習(xí)java的Math和Random類(lèi),文中有非常詳細(xì)的代碼示例及介紹,對(duì)正在學(xué)習(xí)java基礎(chǔ)的小伙伴們很有幫助喲,需要的朋友可以參考下2021-05-05
java中的方法重載知識(shí)點(diǎn)總結(jié)
在本篇文章里小編給大家整理了關(guān)于java中的方法重載知識(shí)點(diǎn)總結(jié),有興趣的朋友們可以跟著學(xué)習(xí)參考下。2020-02-02
使用java實(shí)現(xiàn)“釘釘微應(yīng)用免登進(jìn)入某H5系統(tǒng)首頁(yè)“功能”
這篇文章主要介紹了用java實(shí)現(xiàn)“釘釘微應(yīng)用,免登進(jìn)入某H5系統(tǒng)首頁(yè)“功能”,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-10-10
基于Java實(shí)現(xiàn)簡(jiǎn)單的身材計(jì)算程序
這篇文章主要為大家詳細(xì)介紹了如何利用Java實(shí)現(xiàn)簡(jiǎn)單的身材計(jì)算程序,可以計(jì)算身體的體脂率以及BMI數(shù)值等,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2022-12-12
springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制(踩坑經(jīng)驗(yàn))
這篇文章主要介紹了springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制,本文給大家分享小編實(shí)際開(kāi)發(fā)中的一點(diǎn)踩坑經(jīng)驗(yàn),內(nèi)容簡(jiǎn)單易懂,需要的朋友可以參考下2020-07-07

