我是在 windows 中安装的 Kafka,用于在本地测试用的
1、添加项目依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
2、添加配置文件 application.properties
配置中用了批量消费
# 指定kafka server的地址,集群配多个,中间,逗号隔开 spring.kafka.bootstrap-servers=127.0.0.1:9092 #重试次数 spring.kafka.producer.retries=3 #批量发送的消息数量 spring.kafka.producer.batch-size=1000 #32MB的批处理缓冲区 spring.kafka.producer.buffer-memory=33554432 #默认消费者组 spring.kafka.consumer.group-id=crm-microservice-newperformance #最早未被消费的offset spring.kafka.consumer.auto-offset-reset=earliest #批量一次最大拉取数据量 spring.kafka.consumer.max-poll-records=4000 #是否自动提交 spring.kafka.consumer.enable-auto-commit=false #自动提交时间间隔,单位ms spring.kafka.consumer.auto-commit-interval=1000 #批消费并发量,小于或等于Topic的分区数 spring.kafka.consumer.batch.concurrency = 3
3、创建一个 KafkaConfiguration 配置类
package com.example.kafkademo.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler; import org.springframework.kafka.listener.ContainerProperties; import java.util.HashMap; import java.util.Map; /** * @author Frederic.Hu * @date 2022/05/25 18:00 */ @Configuration public class KafkaConfiguration { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.retries}") private Integer retries; @Value("${spring.kafka.producer.batch-size}") private Integer batchSize; @Value("${spring.kafka.producer.buffer-memory}") private Integer bufferMemory; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${spring.kafka.consumer.max-poll-records}") private Integer maxPollRecords; @Value("${spring.kafka.consumer.batch.concurrency}") private Integer batchConcurrency; @Value("${spring.kafka.consumer.enable-auto-commit}") private Boolean autoCommit; @Value("${spring.kafka.consumer.auto-commit-interval}") private Integer autoCommitInterval; /** * 生产者配置信息 */ @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.ACKS_CONFIG, "0"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } /** * 生产者工厂 */ @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /** * 生产者模板 */ @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } /** * 消费者配置信息 */ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } /** * 消费者批量工厂 */ @Bean public KafkaListenerContainerFactory<?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); //设置并发量,小于或等于Topic的分区数 factory.setConcurrency(batchConcurrency); factory.getContainerProperties().setPollTimeout(1500); //配置监听手动提交 ack,消费一条数据完后,立即提交 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.setBatchListener(true); return factory; } /** * 异常处理器 */ @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler(){ return (message,exception,consumer)->{ System.out.println("消费异常:"+message.getPayload()); return null; }; } }
4、写一个向 Kafka 推送消费的测试类(生产者 producer)
package com.example.kafkademo; import com.alibaba.fastjson.JSONObject; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.junit4.SpringRunner; import java.util.*; /** * @author Frederic.Hu * @Description * @date 2022/05/25 17:46 */ @RunWith(SpringRunner.class) @SpringBootTest public class KafkaProducerTest { private final Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Test public void testSend(){ Map<String, Object> map = new LinkedHashMap<>(); map.put("username", "小明"); map.put("userid", 1); map.put("age", 12); kafkaTemplate.send("test4", JSONObject.toJSONString(map)).addCallback(success -> { // 消息在分区内的offset long offset = success.getRecordMetadata().offset(); logger.info("产线发送消息到kafka队列成功:{}, offset为:{}", JSONObject.toJSONString(map), offset); }, failure -> { logger.error("产线发送消息到kafka队列失败:{}, 报错信息为:{}", JSONObject.toJSONString(map), failure.getMessage()); }); } }
5、创建一个消费者(消费者 consumer)
package com.example.kafkademo.listener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import java.util.List; /** * @author Frederic.Hu * @Description * @date 2022/05/25 17:43 */ @Component public class BigDataTopicListener { private final Logger logger = LoggerFactory.getLogger(getClass()); /** * 监听kafka数据(批量消费) * @param consumerRecords * @param ack */ @KafkaListener(id = "operation", topics = {"test4"}, containerFactory = "batchFactory", errorHandler="consumerAwareErrorHandler") public void batchConsumer(List<ConsumerRecord<?, ?>> consumerRecords, Acknowledgment ack) { long start = System.currentTimeMillis(); //... //db.batchSave(consumerRecords);//批量插入或者批量更新数据 for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) { logger.info("消费的每条数据为:{}", consumerRecord.value()); } //手动提交 ack.acknowledge(); logger.info("收到bigData推送的数据,拉取数据量:{},消费时间:{}ms", consumerRecords.size(), (System.currentTimeMillis() - start)); } }
6、启动测试类,查看控制台
1、Kafka 中 topic 不存在的话,启动项目会报错
解决办法:启动项目之前,先在 Kafka 中创建好自己定义的 topic 名称,也可以在配置类中写一个自动创建 topic,但是出现一个问题,项目上线每个 Kafka 的集群数都不一样,自动创建 topic 时,分区数和副本数不好设置,设置不合理,启动项目是会报错的。
2、生产者生产消息是否成功怎么看?
解决办法:kafkaTemplate 提供了一个回调方法 addCallback,我们可以在回调方法中监控消息是否发送成功或失败时做补偿处理。
3、消费者消费消息报错了怎么办?
解决办法:新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用 @Bean 注入,BeanName 默认就是方法名,然后我们将这个异常处理器的 BeanName 放到 @KafkaListener 注解的 errorHandler 属性里面,当监听抛出异常的时候,则会自动调用异常处理器。
4、消费不同的 topic 中的数据,消费者组(group id)如果用的是同一个,消费时会报错的
解决办法:@KafkaListener 中的 id 监听器使用不同的名称,如果配置文属性配置了默认消费组(group id),注解中的 监听器 id 会覆盖默认的消费组(group id)。
5、重复消费了数据,怎么办?
原因:消费者宕机、重启或者被强行 kill 进程,导致消费者消费的 offset 没有提交。或者消费后的数据,当 offset 还没有提交时,Partition 就断开连接。
解决办法:我目前项目中,是消费的数据插入到 MySQL 中的,如果重复消费了,插入到数据库中的时候,会查询该主键已经在数据库存在,则更新该条数据。
热门文章
- 宠物粮食原料是什么(宠物粮原材料是什么)
- 「11月15日」最高速度18.9M/S,2024年Clash Nyanpasu每天更新免费节点订阅链接
- 动物医院管理办法规定由谁制定(动物医院规章制度范本)
- 「2月11日」最高速度22.7M/S,2025年Clash Nyanpasu每天更新免费节点订阅链接
- 动物疫苗证明怎么写(动物疫苗证明怎么写的)
- 「12月2日」最高速度21.4M/S,2024年Clash Nyanpasu每天更新免费节点订阅链接
- 宠物粮食代工一吨2万一贵吗知乎 宠物粮食代工一吨2万一贵吗知乎
- 「1月3日」最高速度21.2M/S,2025年Clash Nyanpasu每天更新免费节点订阅链接
- 「1月7日」最高速度18.6M/S,2025年Clash Nyanpasu每天更新免费节点订阅链接
- Hibernate框架的使用(二)