SpringBoot整合Kafka简单示例

SpringBoot整合Kafka简单示例

简介

Spring for Apache Kafka (Spring - Kafka)项目将Spring的核心概念应用到基于Kafka的消息解决方案的开发中。它提供了一个“模板”作为发送消息的高级抽象,即KafkaTemplate。它还通过@KafkaListener注解和一个“监听器容器”为消息驱动的pojo提供支持。这些库促进了依赖注入和声明式的使用。在所有这些情况下,您将看到Spring框架中对JMS的支持和Spring AMQP中对RabbitMQ的支持的相似之处。

image-20241223164016690

​ 其实整个Spring - Kafka的使用挺简单的,接下来我会给出一个简单的示例,需要注意的是Kafka是支持事务的,SpringKafka提供了KafkaTransactionManager来支持Spring的声明式事务,当你配置了spring.kafka.producer.transaction-id-prefix时SpringBoot将会把KafkaTransactionManager自动注入到容器中,此时使用Spring的声明式事务注解@Transactional将会涉及到Database和Kafka的事务,这部分在本文中暂不涉及(感觉用的不多?),如果将来工作中有接触到,会专门写一篇博客进行讲解。

image-20241223164542749

pom文件

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

application.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
acks: all
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 100
consumer:
auto-offset-reset: earliest
#是否自动提交偏移量offset
enable-auto-commit: false
#自动提交的频率,前提是 enable-auto-commit=true
# auto-commit-interval: 1s
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 批量拉取的消息条数
max-poll-records: 50
properties:
#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发 组再平衡rebalance
session.timeout.ms: 120000
#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
max.poll.interval.ms: 300000
#配置控制客户端等待请求响应的最长时间。
#如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
#或者如果重试次数用尽,则请求失败。
request.timeout.ms: 60000
#订阅或分配主题时,允许自动创建主题
allow.auto.create.topics: true
#poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一
heartbeat.interval.ms: 40000
#每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节
#max.partition.fetch.bytes=1048576 #1Mb
listener:
#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
ack-mode: manual_immediate
#如果至少有一个topic不存在,true启动失败。false忽略
missing-topics-fatal: true
#type: single单条消费 batch批量消费 批量消费需要配合consumer.max-poll-records
type: single
#配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲
concurrency: 2

创建主题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class KafkaConfig {

//创建主题
@Bean
public NewTopic testTopic() {
// 主题名
return TopicBuilder.name("test")
// 副本数
.replicas(1)
// 分区数
.partitions(3)
.build();
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@RestController
public class TestProvider {

@Resource
KafkaTemplate<String, String> kafkaTemplate;

@GetMapping("/test1")
public String test1(@RequestParam("msg") String msg) {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", msg);

//同步发送
try {
SendResult<String, String> sendResult = future.get();
System.out.println("发送成功");
} catch (InterruptedException | ExecutionException e) {
System.out.println("发送失败");
throw new RuntimeException(e);
}
return "success";
}

@GetMapping("/test2")
public String test2(@RequestParam("msg") String msg) {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", msg);

//异步回调
future.whenComplete((result, ex)->{
if (ex == null) {
System.out.println("发送成功");
} else {
System.out.println("发送失败");
}
});

return "success";
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class TestConsumer {

@KafkaListener(topics = {"test"}, groupId = "my-group")
public void topic_test(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) String partition,
@Header(KafkaHeaders.OFFSET) String offset,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
Acknowledgment ack) {
System.out.println("message = " + message +
", topic = " + topic +
", partition = " + partition +
", offset = " + offset +
", thread = " + Thread.currentThread().getName());
//手动提交
ack.acknowledge();
}
}