SpringCloud?Stream?快速入門實(shí)例教程
1.SCS 組件的出現(xiàn)的背景和作用
在分布式系統(tǒng)中,可能使用到的消息隊(duì)列讓人眼花繚亂,可能有使用(RabbitMq RroketMQ Kafka....),他們提供的客戶端各不相同,使用的方式也讓人眼花繚亂,此時(shí)就需要一個(gè)能夠統(tǒng)一消息隊(duì)列的客戶端,通過更高級(jí)的抽象來實(shí)現(xiàn)更通用和更簡(jiǎn)單的集成不同的消息隊(duì)列中間件,此時(shí)也就誕生了這個(gè)SCS 組件
2.SCS 集成srping Boot項(xiàng)目
我們?cè)谶@個(gè)演示項(xiàng)目中所使用的Spring Boot版本為 2.7.18、SpringCloud Alibaba版本為 2021.0.6.0
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2021.0.6.0</version>
<type>pom</type>
<scope> import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.7.18</version>
<type>pom</type>
<scope> import</scope>
</dependency>
</dependencies>
</dependencyManagement>使用的SCS 組件版本為 3.2.10
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.2.10</version>
</dependency>3.Yml 配置
scs 的使用難點(diǎn)主要就是在yml 的配置上,配置完成使用很方便
spring:
cloud:
function:
definition: myTaskConsumer;ackConsumer #你注冊(cè)的 Consumer 方法名 或者 Function 方法名 中間使用 ;分割 (生產(chǎn)者一般是動(dòng)態(tài)發(fā)送消息 不需要注冊(cè))
stream:
binders:
kafka-binder-1: # 綁定器名稱
type: kafka # 消息隊(duì)列的類型類型
environment: #綁定器環(huán)境配置
spring:
kafka:
bootstrap-servers: 172.22.134.135:9092 # kafka地址 可以設(shè)置多個(gè)
properties:
security.protocol: PLAINTEXT # kafka協(xié)議
#rabbit-binde-1r:
# type: rabbit
# ... rabbitmq配置
# 全局生產(chǎn)者可靠性配置(推薦)
binder:
producer-properties:
acks: all # ?? 生產(chǎn)者 ACK = all 所有副本同步完成才ack;ACK=1寫入leader副本返回ack;ACK=0 生產(chǎn)者發(fā)送消息立馬ack
retries: 100 # 最大重試 當(dāng)發(fā)送失敗時(shí)(如網(wǎng)絡(luò)抖動(dòng)、Leader 切換),Producer 自動(dòng)重試的最大次數(shù)。
retry.backoff.ms: 1000 #每次重試之間的等待時(shí)間(毫秒)。
enable-idempotence: true # 冪等生產(chǎn)者(防重復(fù))
bindings:
myTaskConsumer-in-0: #命名規(guī)則 ${方法名}-${消費(fèi)者:in/生產(chǎn)者:out}-${數(shù)字:不能與其他相同}
destination: test-kraft # topic
group: my-consumer-group #消費(fèi)者組
binder: kafka-binder-1 # 綁定器 <--上面配置的綁定名稱
consumer: # 消費(fèi)者配置
autoStartup: true # 是否自動(dòng)啟動(dòng)
concurrency: 1 #啟動(dòng)消費(fèi)者實(shí)例數(shù) (同屬于一個(gè)消費(fèi)者組)
myTaskProducer-out-0:
destination: test-kraft # topic
binder: kafka-binder-1 # 綁定器 <--上面配置的綁定名稱
producer: # 生產(chǎn)者配置
partitionCount: 1 # 應(yīng)與目標(biāo) Topic 的實(shí)際分區(qū)數(shù)一致。
# - 若小于實(shí)際分區(qū)數(shù):僅使用部分分區(qū),浪費(fèi)并行能力;
# - 若大于實(shí)際分區(qū)數(shù):發(fā)送時(shí)會(huì)因訪問不存在的分區(qū)而失??!
#使用消息頭中的 headers的 partitionKey 作為 key進(jìn)行分區(qū)
partition-key-expression: headers.partitionKey # 分區(qū)鍵(分區(qū)規(guī)則根據(jù)key進(jìn)行hash落到分區(qū) 有助于落到指定分區(qū)順序消費(fèi))4.SpringCloud Stream 3.X新特性函數(shù)編程
4.1.編寫 消費(fèi)者
Mesage 的包別導(dǎo)錯(cuò)
import org.springframework.messaging.Message;
@Configuration
public class kafkaConsumer
{
@Bean
public Consumer<Message<String>> myTaskConsumer ()
{
System.out.println ("[初始化] myTaskConsumer Bean 已創(chuàng)建");
return message -> System.out.println ("[myTaskConsumer] 收到消息: " + message.getPayload ());
}
}4.2.編寫動(dòng)態(tài)生產(chǎn)者
@RestController
public class SendController
{
@Autowired
StreamBridge streamBridge;
@GetMapping ("/sendMyTaskProducer/{msg}")
public String send (@PathVariable ("msg") String msg)
{
//構(gòu)建消息
Message<String> message = MessageBuilder.withPayload (msg)
.setHeader ("partitionKey", msg) // 添加分區(qū)鍵partitionKey 作為分區(qū)鍵
.build ();
//參數(shù)1為發(fā)送的通道名稱(在yml中配置),參數(shù)2為消息
boolean myTaskProducer = streamBridge.send ("myTaskProducer-out-0", message);
System.out.println ("發(fā)送結(jié)果:" + myTaskProducer);
return "發(fā)送結(jié)果:" + myTaskProducer;
}
}5.進(jìn)行測(cè)試
訪問發(fā)送消息的接口,發(fā)送成功,并且消費(fèi)者進(jìn)行了消費(fèi)


6.進(jìn)行消費(fèi)者手動(dòng)ACK
消費(fèi)者手動(dòng)ACK 比自動(dòng)ACK 要安全得多,默認(rèn)scs 是實(shí)行自動(dòng)ack,自動(dòng)ack只要消息被投遞到消費(fèi)者,不論是否消費(fèi)成功或者失敗,都會(huì)被視為消費(fèi)成功
6.1yml 配置
#========================================消費(fèi)者ACK Kafka 專屬配置========================================
#演示消費(fèi)者ACK機(jī)制
ackConsumer-in-0: #命名規(guī)則 ${方法名}-${消費(fèi)者:in/生產(chǎn)者:out}-${數(shù)字:不能與其他相同}
destination: topicOne # topic
group: ack-consumer-group #消費(fèi)者組 (修改為獨(dú)立的消費(fèi)者組,避免與myTaskConsumer沖突)
binder: kafka-binder-1 # 綁定器 <--上面配置的綁定名稱
consumer: # 消費(fèi)者配置
autoStartup: true # 是否自動(dòng)啟動(dòng)
concurrency: 1 #啟動(dòng)消費(fèi)者實(shí)例數(shù) (同屬于一個(gè)消費(fèi)者組)
ackProducer-out-0:
destination: topicOne # topic
binder: kafka-binder-1 # 綁定器 <--上面配置的綁定名稱
producer: # 生產(chǎn)者配置
partitionCount: 1 # 應(yīng)與目標(biāo) Topic 的實(shí)際分區(qū)數(shù)一致。
# - 若小于實(shí)際分區(qū)數(shù):僅使用部分分區(qū),浪費(fèi)并行能力;
# - 若大于實(shí)際分區(qū)數(shù):發(fā)送時(shí)會(huì)因訪問不存在的分區(qū)而失?。?
#使用消息頭中的 headers的 partitionKey 作為 key進(jìn)行分區(qū)
partition-key-expression: headers.partitionKey # 分區(qū)鍵(分區(qū)規(guī)則根據(jù)key進(jìn)行hash落到分區(qū) 有助于落到指定分區(qū)順序消費(fèi))
# Kafka 專屬配置
kafka:
bindings:
ackConsumer-in-0: # ??指定哪個(gè)消費(fèi)者使用ACK
consumer:
ack-mode: MANUAL # ?? 關(guān)鍵!手動(dòng) ACK 模式
#RECORD 每條消息處理完自動(dòng)提交 offset(默認(rèn)) 簡(jiǎn)單場(chǎng)景
#BATCH 批量提交(一批 poll 的消息處理完后提交) 默認(rèn)行為(等價(jià)于 auto-commit=true)
#TIME 定時(shí)提交 較少用
#COUNT 每 N 條提交一次 較少用
#MANUAL 手動(dòng)調(diào)用 acknowledge() 才提交 ? 需要精確控制(推薦)
#MANUAL_IMMEDIATE 手動(dòng)調(diào)用立即提交(不等批次) 高可靠性要求6.2編寫消費(fèi)者
@Bean
public Consumer<Message<String>> ackConsumer(){
System.out.println ("[初始化] ackConsumer Bean 已創(chuàng)建");
return message -> {
System.out.println ("[ackConsumer] ========== 開始處理消息 ==========");
System.out.println ("[ackConsumer] 消息內(nèi)容: " + message.getPayload());
System.out.println ("[ackConsumer] 消息Headers: " + message.getHeaders());
//獲取Acknowledgment
Acknowledgment ack = message.getHeaders ()
.get (KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (ack != null) {
//進(jìn)行手動(dòng)ack
ack.acknowledge ();
System.out.println ("[ackConsumer] ? 已手動(dòng)確認(rèn)消息");
} else {
System.out.println ("[ackConsumer] ?? 警告: Acknowledgment為null,無法手動(dòng)確認(rèn)");
}
System.out.println ("[ackConsumer] ========== 消息處理完成 ==========\n");
};
}6.3編寫生產(chǎn)者
@GetMapping ("/sendAckProducer/{msg}")
public String send2 (@PathVariable ("msg") String msg)
{
//構(gòu)建消息
Message<String> message = MessageBuilder.withPayload (msg)
.setHeader ("partitionKey", msg) // 添加分區(qū)鍵partitionKey 作為分區(qū)鍵
.build ();
//參數(shù)1為發(fā)送的通道名稱(在yml中配置),參數(shù)2為消息
boolean ackProducer = streamBridge.send ("ackProducer-out-0", message);
System.out.println ("發(fā)送結(jié)果:" + ackProducer);
return "發(fā)送結(jié)果:" + ackProducer;
}6.4測(cè)試結(jié)果

到此這篇關(guān)于SpringCloud Stream 快速入門的文章就介紹到這了,更多相關(guān)SpringCloud Stream 入門內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot模塊多項(xiàng)目解耦的最佳實(shí)踐
為了提高代碼質(zhì)量和靈活性,在Spring Boot項(xiàng)目中采用策略模式是一個(gè)有效的方法,該模式允許定義一系列算法并將每一個(gè)封裝起來,使它們可以互相替換,本文給大家介紹了SpringBoot模塊多項(xiàng)目解耦的最佳實(shí)踐,需要的朋友可以參考下2025-02-02
Mybatis plus結(jié)合springboot使用
本文主要介紹了MyBatisPlus使用SpringBoot數(shù)據(jù)庫(kù)操作,從添加依賴到測(cè)試,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-11-11
使用IntelliJ IDEA 進(jìn)行代碼對(duì)比的方法(兩種方法)
這篇文章給大家?guī)砹藘煞NIntelliJ IDEA 進(jìn)行代碼對(duì)比的方法,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2018-01-01
IDEA修改idea.vmoptions后,IDEA無法打開的解決方案
文章介紹了在IDEA中因錯(cuò)誤修改啟動(dòng)參數(shù)導(dǎo)致無法啟動(dòng)的問題,指出正確的修改文件位置應(yīng)在破解插件目錄下的idea.vmoptions,并分享了個(gè)人經(jīng)驗(yàn)供參考2025-10-10
springboot+EHcache 實(shí)現(xiàn)文章瀏覽量的緩存和超時(shí)更新
這篇文章主要介紹了springboot+EHcache 實(shí)現(xiàn)文章瀏覽量的緩存和超時(shí)更新,問題描述和解決思路給大家介紹的非常詳細(xì),需要的朋友可以參考下2017-04-04
java.lang.OutOfMemoryError: Java heap space錯(cuò)誤
本文主要介紹了java.lang.OutOfMemoryError: Java heap space錯(cuò)誤的問題解決,包括內(nèi)存泄漏、數(shù)據(jù)過大和JVM堆大小配置不足,提供了解決方法,具有一定的參考價(jià)值,感興趣的可以了解一下2025-03-03

