SpringBoot中的多RabbitMQ數據源配置實現(xiàn)
簡介
在構建復雜的應用程序時,經常需要與多個數據源進行交互。這可能包括連接多個數據庫、消息隊列或其他數據存儲系統(tǒng)。RabbitMQ 是一個流行的消息隊列系統(tǒng),它通過消息隊列實現(xiàn)了應用程序之間的松耦合,適用于異步任務處理、解耦、削峰填谷等場景。本篇博客將介紹如何在 Spring Boot 中配置和管理多個 RabbitMQ 數據源,以滿足不同的應用需求,并提供示例代碼使用
1. 依賴引入
首先,在 pom.xml 文件中添加 RabbitMQ 的 Spring Boot Starter 依賴,以便引入 RabbitMQ 相關的庫和功能。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2. 抽象類
創(chuàng)建一個抽象類 AbstractRabbitConfiguration,其中包含了RabbitMQ的基本配置信息。這些信息包括主機、端口、用戶名、密碼、虛擬主機、隊列名、交換機名、確認機制和消費條數等。這個抽象類的目的是為了讓子類繼承這些基本配置信息,并根據不同的數據源創(chuàng)建相應的RabbitMQ連接和管理器。
@Data
public abstract class AbstractRabbitConfiguration {
? ? ?protected String host;
? ? protected Integer port;
? ? protected String userName;
? ? protected String password;
? ? protected String virtualHost;
? ? protected String queueName;
? ? protected String exchangeName;
? ? protected String routingKey;
? ? protected String acknowledge = "manual";
? ? protected Integer prefetch = 1;
? ? public ConnectionFactory connectionFactory() {
? ? ? ? CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
? ? ? ? connectionFactory.setHost(host);
? ? ? ? connectionFactory.setPort(port);
? ? ? ? connectionFactory.setVirtualHost(virtualHost);
? ? ? ? connectionFactory.setUsername(userName);
? ? ? ? connectionFactory.setPassword(password);
? ? ? ? connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
? ? ? ? connectionFactory.setPublisherReturns(Boolean.TRUE);
? ? ? ? return connectionFactory;
? ? }
}3. 子類
在抽象類的基礎上,我們可以創(chuàng)建多個子類,每個子類對應一個不同的RabbitMQ數據源配置。以一個名為 RabbitConfig 的子類為例,假設它是用于主數據源的配置。
@Configuration
@ConfigurationProperties(prefix = "kxj.rabbit")
public class RabbitConfig extends AbstractRabbitConfiguration {
? ? @Bean("primaryConnectionFactory")
? ? @Primary
? ? public ConnectionFactory primaryConnectionFactory() {
? ? ? ? return super.connectionFactory();
? ? }
? ? @Bean
? ? @Primary
? ? public RabbitTemplate rabbitTemplate(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?@Qualifier("confirmCallback") ConfirmCallback confirmCallback,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?@Qualifier("returnCallback") ReturnCallback returnCallback) {
? ? ? ? RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
? ? ? ? rabbitTemplate.setMandatory(true);
? ? ? ? rabbitTemplate.setConfirmCallback(confirmCallback);
? ? ? ? rabbitTemplate.setReturnCallback(returnCallback);
? ? ? ? rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
? ? ? ? return rabbitTemplate;
? ? }
? ? @Bean(name = "primaryContainerFactory")
? ? public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
? ? ? ? ? ? SimpleRabbitListenerContainerFactoryConfigurer configurer,
? ? ? ? ? ? @Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
? ? ? ? SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
? ? ? ? factory.setConnectionFactory(connectionFactory);
? ? ? ? // 設置ACK確認機制
? ? ? ? factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
? ? ? ? // 設置消費者消費條數
? ? ? ? factory.setPrefetchCount(prefetch);
? ? ? ? configurer.configure(factory, connectionFactory);
? ? ? ? return factory;
? ? }
? ? @Bean(name = "primaryRabbitAdmin")
? ? public RabbitAdmin rabbitAdmin(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
? ? ? ? RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
? ? ? ? rabbitAdmin.setAutoStartup(true);
? ? ? ? // 聲明交換機,隊列及對應綁定關系
? ? ? ? Queue queue = RabbitmqUtil.createQueue(queueName);
? ? ? ? FanoutExchange exchange = RabbitmqUtil.createFanoutExchange(exchangeName);
? ? ? ? Binding binding = RabbitmqUtil.createBinding(queue, exchange, "");
? ? ? ? RabbitmqUtil.createRabbitAdmin(queue, exchange, binding, rabbitAdmin);
? ? ? ? return rabbitAdmin;
? ? }
}在子類中,我們使用 @Configuration 注解將它標記為Spring的配置類,并使用 @ConfigurationProperties 注解將以 kxj.rabbit 為前綴的配置屬性注入到類中。這使得我們可以在配置文件中為不同的數據源配置不同的RabbitMQ屬性。
在子類中,我們定義多個Bean來配置RabbitMQ的連接、管理和消息處理等,以滿足不同數據源的需求。在這里創(chuàng)建主數據源的連接工廠,并使用 @Primary 注解將其標記為默認的連接工廠。
除了連接工廠之外,我們還可以配置其他與RabbitMQ相關的Bean,如 RabbitTemplate、RabbitAdmin 以及回調類等。這些Bean可以根據不同數據源的需求進行配置,例如設置消息確認機制、消息返回機制和消息轉換器等。
另外,我們在 rabbitTemplate 方法中也進行了一些配置,如設置 mandatory 為 true,設置消息轉換器為 Jackson2JsonMessageConverter 等。
4. 配置回調類
在處理消息時,我們通常需要設置確認回調(ConfirmCallback)和返回回調(ReturnCallback)。這些回調類可以用于處理消息的確認和返回情況。
@Slf4j
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
? ? @Override
? ? public void confirm(CorrelationData correlationData, boolean ack, String cause) {
? ? ? ? if (ack) {
? ? ? ? ? ? log.info("傳遞消息到交換機成功,correlationData:{}, cause:{}", JSON.toJSONString(correlationData), cause);
? ? ? ? } else {
? ? ? ? ? ? log.error("傳遞消息到交換機失敗,correlationData:{}, cause:{}", JSON.toJSONString(correlationData), cause);
? ? ? ? }
? ? }
}
@Slf4j
@Component
public class ReturnCallback implements RabbitTemplate.ReturnCallback {
? ? @Override
? ? public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
? ? ? ? String msg = new String(message.getBody());
? ? ? ? log.error(String.format("消息{%s}不能被正確路由,routingKey為{%s}", msg, routingKey));
? ? }
}5. 配置文件
server.port=8895 kxj.rabbit.host=MQ地址 kxj.rabbit.port=MQ端口 kxj.rabbit.virtualHost=/ kxj.rabbit.userName=guest kxj.rabbit.password=guest kxj.rabbit.queueName=test.queue kxj.rabbit.exchangeName=test.exchange kxj.rabbit.routingKey=test-routing-key
6. 工具類
在 RabbitMQ 的配置過程中,我們需要聲明交換機、隊列和綁定關系等,這些操作可以通過一個工具類 RabbitmqUtil 來實現(xiàn)。
public class RabbitmqUtil {
? ? public static DirectExchange createDirectExchange(String exchangeName) {
? ? ? ? if (StringUtils.isNotBlank(exchangeName)) {
? ? ? ? ? ? return new DirectExchange(exchangeName, true, false);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? public static TopicExchange createTopicExchange(String exchangeName) {
? ? ? ? if (StringUtils.isNotBlank(exchangeName)) {
? ? ? ? ? ? return new TopicExchange(exchangeName, true, false);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? public static FanoutExchange createFanoutExchange(String exchangeName) {
? ? ? ? if (StringUtils.isNotBlank(exchangeName)) {
? ? ? ? ? ? return new FanoutExchange(exchangeName, true, false);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? public static Queue createQueue(String queueName) {
? ? ? ? if (StringUtils.isNotBlank(queueName)) {
? ? ? ? ? ? return new Queue(queueName, true);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? public static Binding createBinding(Queue queueName, Exchange exchangeName, String routingKeyName) {
? ? ? ? if (Objects.nonNull(queueName) && Objects.nonNull(exchangeName)) {
? ? ? ? ? ? return BindingBuilder.bind(queueName).to(exchangeName).with(routingKeyName).noargs();
? ? ? ? }
? ? ? ? return null;
? ? }
// ? ?public static void createRabbitAdmin(Queue queue, DirectExchange exchange, Binding binding, RabbitAdmin rabbitAdmin) {
// ? ? ? ?rabbitAdmin.declareQueue(queue);
// ? ? ? ?rabbitAdmin.declareExchange(exchange);
// ? ? ? ?rabbitAdmin.declareBinding(binding);
// ? ?}
? ? public static void createRabbitAdmin(Queue queue, Exchange exchange, Binding binding, RabbitAdmin rabbitAdmin) {
? ? ? ? if (queue != null) {
? ? ? ? ? ? rabbitAdmin.declareQueue(queue);
? ? ? ? }
? ? ? ? if (exchange != null) {
? ? ? ? ? ? rabbitAdmin.declareExchange(exchange);
? ? ? ? }
? ? ? ? if (binding != null) {
? ? ? ? ? ? rabbitAdmin.declareBinding(binding);
? ? ? ? }
? ? }
}7. 測試用例
我們可以編寫一些測試用例來驗證以上配置是否正確。下面是一個發(fā)送消息到主數據源的示例:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqTest {
? ? @Autowired
? ? @Qualifier("primaryRabbitAdmin")
? ? private RabbitAdmin primaryRabbitAdmin;
? ? @Autowired
? ? @Qualifier("primaryContainerFactory")
? ? private SimpleRabbitListenerContainerFactory primaryContainerFactory;
? ? @Autowired
? ? @Qualifier("primaryConnectionFactory")
? ? private ConnectionFactory primaryConnectionFactory;
? ? @Autowired
? ? private RabbitTemplate primaryRabbitTemplate;
? ? @Test
? ? public void testSend() {
? ? ? ? String message = "Hello, World!";
? ? ? ? primaryRabbitTemplate.convertAndSend("test.exchange", "test.routingKey", message);
? ? ? ? String receivedMessage = (String) primaryRabbitTemplate.receiveAndConvert("test.queue");
? ? ? ? assertEquals(message, receivedMessage);
? ? }
}在上面的測試用例中,我們使用了 @Qualifier 注解來指定主數據源的 Bean,然后通過 RabbitTemplate 發(fā)送消息到 test.exchange,并在隊列 test.queue 中接收到消息。我們可以通過斷言來判斷發(fā)送和接收的消息是否一致,以此驗證配置是否正確。
總結
通過使用抽象類和子類的方式,我們可以輕松地配置和管理多個RabbitMQ數據源,每個數據源可以有不同的屬性配置。這種方法使得我們的應用程序更具靈活性,能夠與多個RabbitMQ實例交互,滿足不同數據源的需求。同時,回調類的使用也可以幫助我們處理消息的確認和返回情況,確保消息的可靠性傳遞。
到此這篇關于SpringBoot中的多RabbitMQ數據源配置實現(xiàn)的文章就介紹到這了,更多相關SpringBoot 多RabbitMQ數據源配置內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
spring cloud Hystrix斷路器的使用(熔斷器)
這篇文章主要介紹了spring cloud Hystrix斷路器的使用(熔斷器),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-08-08
SpringBoot2.0集成Swagger2訪問404的解決操作
這篇文章主要介紹了SpringBoot2.0集成Swagger2訪問404的解決操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09
SpringSecurity頁面授權與登錄驗證實現(xiàn)(內存取值與數據庫取值)
Spring Security是一個能夠為基于Spring的企業(yè)應用系統(tǒng)提供聲明式的安全訪問控制解決方案的安全框架,本文主要介紹了SpringSecurity頁面授權與登錄驗證實現(xiàn),文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-06-06
Java線程池ThreadPoolExecutor源碼深入分析
ThreadPoolExecutor作為java.util.concurrent包對外提供基礎實現(xiàn),以內部線程池的形式對外提供管理任務執(zhí)行,線程調度,線程池管理等等服務2022-08-08

