spring-kafka는 Kafka 클라이언트의 Java 버전과 spring의 통합을 기반으로 하며 KafkaTemplate을 제공합니다. KafkaTemplate은 Apache의 Kafka 클라이언트를 캡슐화하고 클라이언트 종속성을 가져올 필요가 없습니다. YML 구성
<!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
kafka: #bootstrap-servers: server1:9092,server2:9093 #kafka开发地址, #生产者配置 producer: # Kafka提供的序列化和反序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化 value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 1 # 消息发送重试次数 #acks = 0:设置成 表示 producer 完全不理睬 leader broker 端的处理结果。此时producer 发送消息后立即开启下 条消息的发送,根本不等待 leader broker 端返回结果 #acks= all 或者-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给,消息安全但是吞吐量会比较低。 #acks = 1:默认的参数值。 producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果给producer ,而无须等待其他副本写入该消息。折中方案,只要leader一直活着消息就不会丢失,同时也保证了吞吐量 acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) batch-size: 16384 #批量大小 properties: linger: ms: 0 #提交延迟 buffer-memory: 33554432 # 生产端缓冲区大小 # 消费者配置 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 分组名称 group-id: web enable-auto-commit: false #提交offset延时(接收到消息后多久提交offset) # auto-commit-interval: 1000ms #当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset; # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; auto-offset-reset: latest properties: #消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) session.timeout.ms: 15000 #消费请求超时时间 request.timeout.ms: 18000 #批量消费每次最多消费多少条消息 #每次拉取一条,一条条消费,当然是具体业务状况设置 max-poll-records: 1 # 指定心跳包发送频率,即间隔多长时间发送一次心跳包,优化该值的设置可以减少Rebalance操作,默认时间为3秒; heartbeat-interval: 6000 # 发出请求时传递给服务器的 ID。用于服务器端日志记录 正常使用后解开注释,不然只有一个节点会报错 #client-id: mqtt listener: #消费端监听的topic不存在时,项目启动会报错(关掉) missing-topics-fatal: false #设置消费类型 批量消费 batch,单条消费:single type: single #指定容器的线程数,提高并发量 #concurrency: 3 #手动提交偏移量 manual达到一定数据后批量提交 #ack-mode: manual ack-mode: MANUAL_IMMEDIATE #手動確認消息 # 认证 #properties: #security: #protocol: SASL_PLAINTEXT #sasl: #mechanism: SCRAM-SHA-256 #jaas:config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
@Component @Slf4j public class KafkaUtils<K, V> { @Autowired private KafkaTemplate kafkaTemplate; @Value("${spring.kafka.bootstrap-servers}") String[] servers; /** * 获取连接 * @return */ private Admin getAdmin() { Properties properties = new Properties(); properties.put("bootstrap.servers", servers); // 正式环境需要添加账号密码 return Admin.create(properties); } /** * 增加topic * * @param name 主题名字 * @param partition 分区数量 * @param replica 副本数量 * @date 2022-06-23 chens */ public R addTopic(String name, Integer partition, Integer replica) { Admin admin = getAdmin(); if (replica > servers.length) { return R.error("副本数量不允许超过Broker数量"); } try { NewTopic topic = new NewTopic(name, partition, Short.parseShort(replica.toString())); admin.createTopics(Collections.singleton(topic)); } finally { admin.close(); } return R.ok(); } /** * 删除主题 * * @param names 主题名字集合 * @date 2022-06-23 chens */ public void deleteTopic(List<String> names) { Admin admin = getAdmin(); try { admin.deleteTopics(names); } finally { admin.close(); } } /** * 查询所有主题 * * @date 2022-06-24 chens */ public Set<String> queryTopic() { Admin admin = getAdmin(); try { ListTopicsResult topics = admin.listTopics(); Set<String> set = topics.names().get(); return set; } catch (Exception e) { log.error("查询主题错误!"); } finally { admin.close(); } return null; } // 向所有分区发送消息 public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) { return kafkaTemplate.send(topic, data); } // 指定key发送消息,相同key保证消息在同一个分区 public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) { return kafkaTemplate.send(topic, key, data); } // 指定分区和key发送。 public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) { return kafkaTemplate.send(topic, partition, key, data); } }
@GetMapping("/{topic}") public String test(@PathVariable String topic, @PathVariable Long index) throws ExecutionException, InterruptedException { ListenableFuture future = null; Chenshuang user = new Chenshuang(i, "陈爽", "123456", new Date()); String s = JSON.toJSONString(user); KafkaUtils utils = new KafkaUtils(); future = kafkaUtils.send(topic, s); // 异步回调,同步get,会等待 不推荐同步! future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { System.out.println("发送失败"); } @Override public void onSuccess(Object result) { System.out.println("发送成功:" + result); } }); return "发送成功"; }
같은 방식으로 다양한 메시지를 들을 수도 있습니다. 주제, 지정된 변위 모니터링
같은 그룹은 고르게 소비하고, 다른 그룹은 반복적으로 소비합니다. 1. 유니캐스트 모드에는 소비자 그룹이 하나만 있습니다 (1) 주제에는 하나의 파티션만 있습니다. 그룹에 여러 소비자가 있는 경우 동일한 파티션의 메시지는 그룹 중 하나만 보낼 수 있습니다. 소비자 소비. 소비자 수가 파티션 수를 초과하면 그림 1과 같이 초과 소비자는 유휴 상태가 됩니다. 주제와 테스트에는 단 하나의 파티션과 단 하나의 그룹인 G1만 있습니다. 이 그룹에는 여러 소비자가 있으며 그 중 하나만 사용할 수 있고 나머지는 유휴 상태입니다.(2) 주제에 여러 파티션이 있고 그룹에 여러 소비자가 있습니다. 예를 들어 테스트에는 3개의 파티션이 있고 그룹에 2명의 소비자가 있으면 해당 C0이 될 수 있습니다. p0, p1, c1의 데이터는 p2의 데이터를 소비하는 것에 해당하며, 소비자가 3명인 경우 한 명의 소비자가 하나의 파티션에서 데이터를 소비하는 것에 해당합니다. 다이어그램은 그림 2와 그림 3에 나와 있습니다. 이 모드는 클러스터 모드에서 매우 일반적입니다. 예를 들어, 3개의 서비스를 시작하고 해당 주제에 대해 3개의 파티션을 설정할 수 있으므로 병렬 소비가 달성되고 메시지 처리 효율성이 향상됩니다. 효율성을 크게 향상시킬 수 있습니다.
그림 2
그림 3
2. 브로드캐스트 모드, 다중 소비자 그룹
브로드캐스트 모드를 구현하려면 여러 소비자 그룹을 설정해야 하므로 하나의 소비자 그룹이 소비 이 메시지를 마친 후에는 다른 그룹의 소비자 소비에 전혀 영향을 미치지 않습니다. 이것이 방송의 개념입니다.
(1) 여러 소비자 그룹, 1개의 파티션이 주제의 데이터는 여러 소비자 그룹에서 동시에 소비됩니다. 소비자 그룹에 여러 소비자가 있으면 다음과 같이 한 명의 소비자만 사용할 수 있습니다. 그림 4:그림 4
(2) 다중 소비자 그룹, 다중 파티션
이 항목의 데이터는 한 소비자에서 여러 소비자 그룹에 의해 여러 번 소비될 수 있습니다. 그룹 내에서 각 소비자는 그림 5와 같이 주제 내의 하나 이상의 파티션에 해당하는 병렬:
注意: 消费者的数量并不能决定一个topic的并行度。它是由分区的数目决定的。
再多的消费者,分区数少,也是浪费!
一个组的最大并行度将等于该主题的分区数。
@Component @Slf4j public class Consumer { // 监听主题 分组a @KafkaListener(topics =("${spring.kafka.topic}") ,groupId = "a") public void getMessage(ConsumerRecord message, Acknowledgment ack) { //确认收到消息 ack.acknowledge(); } // 监听主题 分组a @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "a") public void getMessage2(ConsumerRecord message, Acknowledgment ack) { //确认收到消息 ack.acknowledge(); } // 监听主题 分组b @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b") public void getMessage3(ConsumerRecord message, Acknowledgment ack) { //确认收到消息//确认收到消息 ack.acknowledge(); } // 监听主题 分组b @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b") public void getMessage4(ConsumerRecord message, Acknowledgment ack) { //确认收到消息//确认收到消息 ack.acknowledge(); } // 指定监听分区1的消息 @KafkaListener(topicPartitions = {@TopicPartition(topic = ("${spring.kafka.topic}"),partitions = {"1"})}) public void getMessage5(ConsumerRecord message, Acknowledgment ack) { Long id = JSONObject.parseObject(message.value().toString()).getLong("id"); //确认收到消息//确认收到消息 ack.acknowledge(); } /** * @Title 指定topic、partition、offset消费 * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8 * 注意:topics和topicPartitions不能同时使用; **/ @KafkaListener(id = "c1",groupId = "c",topicPartitions = { @TopicPartition(topic = "t1", partitions = { "0" }), @TopicPartition(topic = "t2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))}) public void getMessage6(ConsumerRecord record,Acknowledgment ack) { //确认收到消息 ack.acknowledge(); } /** * 批量消费监听goods变更消息 * yml配置listener:type 要改为batch * ymk配置consumer:max-poll-records: ??(每次拉取多少条数据消费) * concurrency = "2" 启动多少线程执行,应小于等于broker数量,避免资源浪费 */ @KafkaListener(id="sync-modify-goods", topics = "${spring.kafka.topic}",concurrency = "4") public void getMessage7(List<ConsumerRecord<String, String>> records){ for (ConsumerRecord<String, String> msg:records) { GoodsChangeMsg changeMsg = null; try { changeMsg = JSONObject.parseObject(msg.value(), GoodsChangeMsg.class); syncGoodsProcessor.handle(changeMsg); }catch (Exception exception) { log.error("解析失败{}", msg, exception); } } } }
위 내용은 SpringBoot가 Kafka 구성 도구 클래스를 통합하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!