Spring Boot集成Apache Kafka的實(shí)戰(zhàn)指南
Apache Kafka 是一個(gè)分布式流處理平臺(tái),廣泛用于構(gòu)建實(shí)時(shí)數(shù)據(jù)管道、日志聚合系統(tǒng)和事件溯源架構(gòu)。Spring Boot 提供了對(duì) Kafka 的良好集成支持,使得開(kāi)發(fā)者可以非常便捷地在項(xiàng)目中使用 Kafka。
本文將手把手教你如何在 Spring Boot 項(xiàng)目中集成 Kafka,包括生產(chǎn)者(Producer)和消費(fèi)者(Consumer)的實(shí)現(xiàn),并提供完整的代碼示例。
開(kāi)發(fā)環(huán)境準(zhǔn)備
Java 17+
Maven 或 Gradle
Spring Boot 3.x
Apache Kafka 3.0+(本地或遠(yuǎn)程)
IDE(如 IntelliJ IDEA、VS Code)
創(chuàng)建 Spring Boot 項(xiàng)目
你可以通過(guò) Spring Initializr 創(chuàng)建一個(gè)新的 Spring Boot 項(xiàng)目,選擇以下依賴(lài):
- Spring Web
- Spring for Apache Kafka
或者手動(dòng)添加 pom.xml 中的依賴(lài):
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Spring Boot 會(huì)自動(dòng)管理版本兼容性,無(wú)需手動(dòng)指定版本號(hào)。
配置 Kafka 連接信息
在 application.yml 或 application.properties 文件中配置 Kafka 相關(guān)參數(shù):
application.yml 示例:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
編寫(xiě) Kafka 生產(chǎn)者(Producer)
創(chuàng)建一個(gè)服務(wù)類(lèi)用于發(fā)送消息到 Kafka 主題。
KafkaProducer.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Sent message: " + message);
}
}
編寫(xiě) Kafka 消費(fèi)者(Consumer)
使用 @KafkaListener 注解監(jiān)聽(tīng)特定主題的消息。
KafkaConsumer.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "my-group")
public void listen(ConsumerRecord<String, String> record) {
System.out.printf("Received message: topic - %s, partition - %d, offset - %d, key - %s, value - %s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
添加 REST 接口用于測(cè)試發(fā)送消息
為了方便測(cè)試,我們可以創(chuàng)建一個(gè)簡(jiǎn)單的 REST 控制器來(lái)觸發(fā)消息發(fā)送。
KafkaController.java
import org.springframework.web.bind.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
@PostMapping("/send")
public String sendMessage(@RequestParam String msg) {
kafkaProducer.sendMessage("test-topic", msg);
return "Message sent: " + msg;
}
}
啟動(dòng) Kafka 環(huán)境(可選)
如果你還沒(méi)有運(yùn)行 Kafka,可以按照以下步驟快速啟動(dòng):
啟動(dòng) Zookeeper(Kafka 依賴(lài))
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動(dòng) Kafka 服務(wù)
bin/kafka-server-start.sh config/server.properties
創(chuàng)建測(cè)試 Topic
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
測(cè)試接口
啟動(dòng) Spring Boot 應(yīng)用后,訪(fǎng)問(wèn)如下接口發(fā)送消息:
POST http://localhost:8080/kafka/send?msg=HelloKafka
觀(guān)察控制臺(tái)輸出,確認(rèn)是否收到類(lèi)似以下內(nèi)容:
Received message: topic - test-topic, partition - 0, offset - 5, key - null, value - HelloKafka
擴(kuò)展功能建議
使用 JSON 格式傳輸對(duì)象(自定義序列化/反序列化)
多消費(fèi)者組配置與負(fù)載均衡
異常處理與重試機(jī)制(@DltHandler, SeekToCurrentErrorHandler)
Kafka Streams 實(shí)現(xiàn)實(shí)時(shí)流處理邏輯
配置 SSL、SASL 安全認(rèn)證
結(jié)合 Spring Cloud Stream 構(gòu)建云原生事件驅(qū)動(dòng)架構(gòu)
到此這篇關(guān)于Spring Boot集成Apache Kafka的實(shí)戰(zhàn)指南的文章就介紹到這了,更多相關(guān)SpringBoot集成Apache Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
一次由Lombok的@AllArgsConstructor注解引發(fā)的錯(cuò)誤及解決
這篇文章主要介紹了一次由Lombok的@AllArgsConstructor注解引發(fā)的錯(cuò)誤及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
SpringBoot常見(jiàn)問(wèn)題小結(jié)
這篇文章主要介紹了SpringBoot常見(jiàn)問(wèn)題小結(jié),需要的朋友可以參考下2017-07-07
將Mybatis升級(jí)為Mybatis-Plus的詳細(xì)過(guò)程
本文詳細(xì)介紹了在若依管理系統(tǒng)(v3.8.8)中將MyBatis升級(jí)為MyBatis-Plus的過(guò)程,旨在提升開(kāi)發(fā)效率,通過(guò)本文,開(kāi)發(fā)者可實(shí)現(xiàn)系統(tǒng)功能無(wú)損升級(jí),同時(shí)享受MyBatis-Plus帶來(lái)的便捷特性,如代碼簡(jiǎn)化和性能優(yōu)化,需要的朋友可以參考下2025-04-04

