文章目錄
一、基礎(chǔ)集成
1. 技術(shù)選型
軟件/框架
| 版本
|
jdk
| 1.8.0_202
|
springboot
| 2.5.4
|
kafka server
| kafka_2.12-2.8.0
|
kafka client
| 2.7.1
|
zookeeper
| 3.7.0
|
2. 導(dǎo)入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3. kafka配置
properties版本
spring.application.name=springboot-kafka
server.port=8080
# kafka 配置
spring.kafka.bootstrap-servers=node1:9092
# producer 配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生產(chǎn)者每個(gè)批次最多方多少條記錄
spring.kafka.producer.batch-size=16384
# 生產(chǎn)者一端總的可用緩沖區(qū)大小,此處設(shè)置為32M * 1024 * 1024
spring.kafka.producer.buffer-memory=33544432
# consumer 配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer-02
# earliest - 如果找不到當(dāng)前消費(fèi)者的有效偏移量,則自動(dòng)重置向到最開始
spring.kafka.consumer.auto-offset-reset=earliest
# 消費(fèi)者的偏移量是自動(dòng)提交還是手動(dòng)提交,此處自動(dòng)提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消費(fèi)者偏移量自動(dòng)提交時(shí)間間隔
spring.kafka.consumer.auto-commit-interval=1000
yml版本項(xiàng)目內(nèi)部配置
server:
port: 8002
spring:
application:
# 應(yīng)用名稱
name: ly-kafka
profiles:
# 環(huán)境配置
active: dev
cloud:
nacos:
discovery:
# 服務(wù)注冊地址
server-addr: nacos.server.com:8848
config:
# 配置中心地址
server-addr: nacos.server.com:8848
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
nacos-config 服務(wù)端配置
4. auto-offset-reset 簡述
關(guān)于
auto.offset.reset 配置有3個(gè)值可以設(shè)置,分別如下:
earliest:當(dāng)各分區(qū)下有已提交的 offset 時(shí),從提交的 offset 開始消費(fèi);無提交的 offset時(shí),從頭開始消費(fèi);
latest:當(dāng)各分區(qū)下有已提交的 offset 時(shí),從提交的 offset 開始消費(fèi);無提交的 offset 時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù);
none: topic 各分區(qū)都存在已提交的 offset 時(shí),從 offset 后開始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的 offset,則拋出異常;
默認(rèn)建議用 earliest, 設(shè)置該參數(shù)后 kafka出錯(cuò)后重啟,找到未消費(fèi)的offset可以繼續(xù)消費(fèi)。
而 latest 這個(gè)設(shè)置容易丟失消息,假如 kafka 出現(xiàn)問題,還有數(shù)據(jù)往topic中寫,這個(gè)時(shí)候重啟kafka,這個(gè)設(shè)置會(huì)從最新的offset開始消費(fèi), 中間出問題的哪些就不管了。
none 這個(gè)設(shè)置沒有用過,兼容性太差,經(jīng)常出問題。
5. 新增一個(gè)訂單類
模擬業(yè)務(wù)系統(tǒng)中,用戶每下一筆訂單,就發(fā)送一個(gè)消息,供其他服務(wù)消費(fèi)
package com.gblfy.kafka.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
/**
* 訂單id
*/
private long orderId;
/**
* 訂單號
*/
private String orderNum;
/**
* 訂單創(chuàng)建時(shí)間
*/
private LocalDateTime createTime;
}
6. 生產(chǎn)者(異步)
package com.gblfy.lykafka.provider;
import com.alibaba.fastjson.JSONObject;
import com.gblfy.common.constant.KafkaTopicConstants;
import com.gblfy.common.entity.Order;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.time.LocalDateTime;
/**
* Kafka生產(chǎn)者
*
* @author gblfy
* @date 2021-09-28
*/
@Service
public class KafkaProvider {
private final static Logger log = LoggerFactory.getLogger(KafkaProvider.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
// 構(gòu)建一個(gè)訂單類
Order order = Order.builder()
.orderId(orderId)
.orderNum(orderNum)
.createTime(createTime)
.build();
// 發(fā)送消息,訂單類的 json 作為消息體
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(KafkaTopicConstants.KAFKA_MSG_TOPIC, JSONObject.toJSONString(order));
// 監(jiān)聽回調(diào)
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable e) {
log.info("發(fā)送消息失敗: {}", e.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
RecordMetadata metadata = result.getRecordMetadata();
log.info("發(fā)送的主題:{} ,發(fā)送的分區(qū):{} ,發(fā)送的偏移量:{} ",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
}
}
7. 消費(fèi)者
package com.gblfy.lykafka.controller;
import com.gblfy.lykafka.provider.KafkaProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
@RestController
@RequestMapping("/kafka")
public class KafkaProviderController {
@Autowired
private KafkaProvider kafkaProvider;
@GetMapping("/sendMQ")
public String sendMQContent() {
kafkaProvider.sendMessage(0001, "10", LocalDateTime.now());
return "OK";
}
}
通過 @KafkaListener注解,我們可以指定需要監(jiān)聽的 topic 以及 groupId, 注意,這里的 topics 是個(gè)數(shù)組,意味著我們可以指定多個(gè) topic,如:@KafkaListener(topics = {“topic-springboot-01”, “topic-springboot-02”}, groupId = “group_id”)。
注意:消息發(fā)布者的 TOPIC 需要保持與消費(fèi)者監(jiān)聽的 TOPIC 一致,否者消費(fèi)不到消息。
8. kafka配置類
package com.gblfy.common.constant;
public class KafkaTopicConstants {
//kafka發(fā)送消息主題
public static final String KAFKA_MSG_TOPIC = "topic-springboot-01";
// kafka消費(fèi)者組需要和yml文件中的 kafka.consumer.group-id的值保持一致
public static final String KAFKA_MSG_TOPIC_GROUP = "springboot-consumer-02";
}
9.單元測試
新建單元測試,功能測試消息發(fā)布,以及消費(fèi)。
package com.gblfy.kafka;
import com.gblfy.kafka.controller.KafkaProvider;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@SpringBootTest
class KafkaSpringbootApplicationTests {
@Autowired
private KafkaProvider kafkaProvider;
@Test
public void sendMessage() throws InterruptedException {
// 發(fā)送 1000 個(gè)消息
for (int i = 0; i < 1000; i++) {
long orderId = i+1;
String orderNum = UUID.randomUUID().toString();
kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
}
TimeUnit.MINUTES.sleep(1);
}
}
9. 效果圖
10. 源碼地址
??https://gitee.com/gb_90/kafka-parent??
11.微服務(wù)專欄
??https://gitee.com/gb_90/micro-service-parent??