Spring Boot中使用RabbitMQ 生產(chǎn)消息和消費消息的實例代碼
引入RabbitMQ依賴
<!-- springboot集成rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>增加RabbitMQ配置
#rabbitmq配置
spring:
rabbitmq:
host: ip地址
port: 5672
username: 賬號
password: 密碼
virtual-host: /配置RabbitMQ交換機以及隊列
package com.ckm.ball.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//rabbitMQ綁定交換機 / 隊列
@Configuration
public class RabbitMQConfig {
//========================================================RabbitMQ Queue========================================================//
//創(chuàng)建fanout模式交換機
@Bean
public FanoutExchange fanoutExchangeProcess() {
return new FanoutExchange("process-data-change-exchange", true, false);
}
//創(chuàng)建隊列
@Bean
public Queue processDataChangeQueue() {
return new Queue("process-data-change-queue", true);
}
//將隊列綁定到交換機
@Bean
public Binding chatBindExchange() {
return BindingBuilder.bind(processDataChangeQueue()).to(fanoutExchangeProcess());
}
}編寫接口,模擬生產(chǎn)消息
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/produceMessage")
@ApiOperation(value = "生產(chǎn)消息", tags = "測試接口")
public void updateTokenTime() {
//生產(chǎn)消息,會到交換機,交換機下發(fā)給隊列,隊列監(jiān)聽到就會消費,執(zhí)行業(yè)務邏輯
rabbitTemplate.convertAndSend("process-data-change-exchange", "process-data-change-queue", "hhhhhhhhhhhhhh");
}編寫消息監(jiān)聽類,模擬消費消息
package com.ckm.ball.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class RabbitMQDataSyncListenerProcess {
//監(jiān)聽process-data-change-queue隊列 -> 消費
@RabbitListener(queues = "process-data-change-queue")
public void orderDead(@Payload String productIdAndOrderId) {
log.info("當前時間:{},收到隊列信息:{}", new Date().toString(), productIdAndOrderId);
//執(zhí)行你的業(yè)務邏輯
for (int i = 0; i < 5; i++) {
System.out.println("循環(huán)次數(shù): " + (i + 1));
try {
// 暫停 2000 毫秒(2 秒)
Thread.sleep(2000);
} catch (InterruptedException e) {
// 處理異常
System.err.println("線程被中斷: " + e.getMessage());
}
}
}
}RabbitMQ 中的交換機的作用
RabbitMQ 中的交換機(Exchange)是消息路由的核心組件。它負責接收來自生產(chǎn)者發(fā)送的消息,并根據(jù)特定的路由規(guī)則將這些消息傳遞給一個或多個隊列(Queue)。交換機的主要功能和類型
1.消息路由:
- 交換機決定消息應該發(fā)送到哪些隊列,基于綁定(Binding)和路由鍵(Routing Key)。
2.類型:
- 直連交換機(Direct Exchange):消息直接發(fā)送到與路由鍵精確匹配的隊列。
- 主題交換機(Topic Exchange):消息根據(jù)路由鍵模式匹配一個或多個隊列,支持通配符。
- 扇出交換機(Fanout Exchange):將消息廣播到所有綁定的隊列,不考慮路由鍵。
- 頭交換機(Headers Exchange):通過消息的屬性(Headers)進行路由,而不是使用路由鍵。
工作流程
- 生產(chǎn)者發(fā)送消息到交換機。
- 交換機根據(jù)配置的路由規(guī)則和隊列的綁定關系,將消息路由到相應的隊列。
- 消費者從隊列中獲取消息進行處理。
在我的代碼中生產(chǎn)消息語句:
convertAndSend(交換機,路由鍵也就是隊列,你想傳遞的參數(shù))
在扇出交換機(Fanout Exchange)模式不需要指定路由鍵,因為指定了也沒用。
rabbitTemplate.convertAndSend("process-data-change-exchange", "process-data-change-queue", "hhhhhhhhhhhhhh");在扇出交換機(Fanout Exchange)模式,應改成:
rabbitTemplate.convertAndSend("process-data-change-exchange", "", "hhhhhhhhhhhhhh");在扇出交換機中,可以將路由鍵設置為空字符串 “”,因為扇出交換機會將消息發(fā)送到所有綁定的隊列,而不需要考慮路由鍵的具體值。
- 在扇出交換機中,路由鍵被忽略。
- 消息會被廣播到所有與交換機綁定的隊列中。
四種交換機模式
1. 直連交換機(Direct Exchange)
直連交換機:發(fā)送到匹配路由鍵的隊列。
// 創(chuàng)建直連交換機
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct-exchange", true, false);
}
// 創(chuàng)建隊列
@Bean
public Queue directQueue() {
return new Queue("direct-queue", true);
}
// 將隊列綁定到直連交換機,同時指定路由鍵
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct-routing-key");
}生產(chǎn)消息:
直連交換機生產(chǎn)消息:需要指定路由鍵。
// 發(fā)送消息到直連交換機
rabbitTemplate.convertAndSend("direct-exchange", "direct-routing-key", "Your message here");2. 主題交換機(Topic Exchange)
主題交換機:支持模糊匹配路由鍵。
// 創(chuàng)建主題交換機
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic-exchange", true, false);
}
// 創(chuàng)建隊列
@Bean
public Queue topicQueue() {
return new Queue("topic-queue", true);
}
// 將隊列綁定到主題交換機,同時指定路由鍵
@Bean
public Binding topicBinding() {
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.#");
}生產(chǎn)消息:
主題交換機生產(chǎn)消息:需要指定符合主題模式的路由鍵。
// 發(fā)送消息到主題交換機
rabbitTemplate.convertAndSend("topic-exchange", "topic.routing.key", "Your message here");3. 扇出交換機(Fanout Exchange)
扇出交換機:將消息廣播到所有綁定的隊列。
// 創(chuàng)建扇出交換機
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout-exchange", true, false);
}
// 創(chuàng)建隊列
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout-queue-1", true);
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout-queue-2", true);
}
// 將隊列綁定到扇出交換機
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}生產(chǎn)消息:
扇出交換機生產(chǎn)消息:不需要路由鍵,使用空字符串即可。
// 發(fā)送消息到扇出交換機
rabbitTemplate.convertAndSend("fanout-exchange", "", "Your message here");4. 頭交換機(Headers Exchange)
頭交換機:根據(jù)消息頭中匹配的屬性進行路由。
// 創(chuàng)建頭交換機
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headers-exchange", true, false);
}
// 創(chuàng)建隊列
@Bean
public Queue headersQueue() {
return new Queue("headers-queue", true);
}
// 將隊列綁定到頭交換機,同時指定頭屬性
@Bean
public Binding headersBinding() {
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
headers.put("type", "report");
return BindingBuilder.bind(headersQueue())
.to(headersExchange())
.whereAll(headers)
.match();
}生產(chǎn)消息:
頭交換機生產(chǎn)消息:需要構建一個帶有頭屬性的消息。
// 發(fā)送消息到頭交換機
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("format", "pdf");
messageProperties.setHeader("type", "report");
Message message = new Message("Your message here".getBytes(), messageProperties);
rabbitTemplate.send("headers-exchange", "", message);到此這篇關于Spring Boot中使用RabbitMQ 生產(chǎn)消息和消費消息的文章就介紹到這了,更多相關Spring Boot生產(chǎn)消息和消費消息內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
mybatis配置文件簡介_動力節(jié)點Java學院整理
這篇文章主要為大家詳細介紹了mybatis配置文件簡介的相關資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-09-09
mybatis插件pageHelper實現(xiàn)分頁效果
這篇文章主要為大家詳細介紹了mybatis插件pageHelper實現(xiàn)分頁效果,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-12-12
Java SimpleDateFormat中英文時間格式化轉換詳解
這篇文章主要為大家詳細介紹了Java SimpleDateFormat中英文時間格式化轉換,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-12-12
關于SpringBoot2.7.6連接nacos遇到的一些問題
這篇文章主要介紹了關于SpringBoot2.7.6連接nacos遇到的一些問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-06-06
Java synchronized重量級鎖實現(xiàn)過程淺析
這篇文章主要介紹了Java synchronized重量級鎖實現(xiàn)過程,synchronized是Java里的一個關鍵字,起到的一個效果是"監(jiān)視器鎖",它的功能就是保證操作的原子性,同時禁止指令重排序和保證內存的可見性2023-02-02

