SpringBoot整合Kafka简单示例 简介 Spring for Apache Kafka (Spring - Kafka)项目将Spring的核心概念应用到基于Kafka的消息解决方案的开发中。它提供了一个“模板”作为发送消息的高级抽象,即KafkaTemplate。它还通过@KafkaListener注解和一个“监听器容器”为消息驱动的pojo提供支持。这些库促进了依赖注入和声明式的使用。在所有这些情况下,您将看到Spring框架中对JMS的支持和Spring AMQP中对RabbitMQ的支持的相似之处。
其实整个Spring - Kafka的使用挺简单的,接下来我会给出一个简单的示例,需要注意的是Kafka是支持事务的,SpringKafka提供了KafkaTransactionManager
来支持Spring的声明式事务,当你配置了spring.kafka.producer.transaction-id-prefix
时SpringBoot将会把KafkaTransactionManager
自动注入到容器中,此时使用Spring的声明式事务注解@Transactional
将会涉及到Database和Kafka的事务,这部分在本文中暂不涉及(感觉用的不多?),如果将来工作中有接触到,会专门写一篇博客进行讲解。
pom文件 1 2 3 4 <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka</artifactId > </dependency >
application.yaml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 spring: kafka: bootstrap-servers: localhost:9092 producer: acks: all batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 100 consumer: auto-offset-reset: earliest enable-auto-commit: false key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 50 properties: session.timeout.ms: 120000 max.poll.interval.ms: 300000 request.timeout.ms: 60000 allow.auto.create.topics: true heartbeat.interval.ms: 40000 listener: ack-mode: manual_immediate missing-topics-fatal: true type: single concurrency: 2
创建主题 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Configuration public class KafkaConfig { @Bean public NewTopic testTopic () { return TopicBuilder.name("test" ) .replicas(1 ) .partitions(3 ) .build(); } }
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @RestController public class TestProvider { @Resource KafkaTemplate<String, String> kafkaTemplate; @GetMapping("/test1") public String test1 (@RequestParam("msg") String msg) { CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("test" , msg); try { SendResult<String, String> sendResult = future.get(); System.out.println("发送成功" ); } catch (InterruptedException | ExecutionException e) { System.out.println("发送失败" ); throw new RuntimeException (e); } return "success" ; } @GetMapping("/test2") public String test2 (@RequestParam("msg") String msg) { CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("test" , msg); future.whenComplete((result, ex)->{ if (ex == null ) { System.out.println("发送成功" ); } else { System.out.println("发送失败" ); } }); return "success" ; } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Component public class TestConsumer { @KafkaListener(topics = {"test"}, groupId = "my-group") public void topic_test (String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION) String partition, @Header(KafkaHeaders.OFFSET) String offset, @Header(KafkaHeaders.RECEIVED_KEY) String key, Acknowledgment ack) { System.out.println("message = " + message + ", topic = " + topic + ", partition = " + partition + ", offset = " + offset + ", thread = " + Thread.currentThread().getName()); ack.acknowledge(); } }