Spring Cloud Stream est un framework qui simplifie le développement de microservices basés sur des messages en faisant abstraction des courtiers de messages tels qu'Apache Kafka et RabbitMQ. L'une des fonctionnalités puissantes de Spring Cloud Stream est sa capacité à s'intégrer de manière transparente à Kafka, permettant aux développeurs de créer des applications événementielles robustes et évolutives. Le classeur Kafka de Spring Cloud Stream permet de se connecter facilement aux sujets Kafka.
Dans ce blog, nous verrons comment utiliser un intercepteur consommateur avec Spring Cloud Stream Kafka Binder. Les intercepteurs de Kafka fournissent un mécanisme pour intercepter et modifier les enregistrements avant qu'ils ne soient consommés par l'application, offrant ainsi des opportunités de journalisation, de collecte de métriques et de manipulation de données.
Avant de plonger dans les détails, assurez-vous d'avoir les prérequis suivants :
Tout d'abord, mettons en place un projet Spring Boot simple avec les dépendances nécessaires pour Spring Cloud Stream et Kafka.
org.springframework.boot spring-boot-starter org.springframework.cloud spring-cloud-starter-stream-kafka org.springframework.boot spring-boot-starter-test test org.springframework.cloud spring-cloud-dependencies Hoxton.SR10 pom import
dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka' testImplementation 'org.springframework.boot:spring-boot-starter-test' } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR10" } }
Ensuite, configurez le classeur Kafka dans le fichier application.yml.
spring: cloud: stream: bindings: input: destination: my-topic group: my-group consumer: interceptor-classes: com.example.MyConsumerInterceptor kafka: binder: brokers: localhost:9092
Pour créer un intercepteur consommateur, implémentez l'interface ConsumerInterceptor fournie par Kafka. Cette interface vous permet de définir une logique personnalisée pour intercepter et traiter les enregistrements avant qu'ils n'atteignent l'application.
package com.example; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.Configurable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class MyConsumerInterceptor implements ConsumerInterceptor, Configurable { private static final Logger logger = LoggerFactory.getLogger(MyConsumerInterceptor.class); @Override public ConsumerRecords onConsume(ConsumerRecords records) { records.forEach(record -> { logger.info("Intercepted record: key = {}, value = {}", record.key(), record.value()); // Add your custom logic here }); return records; } @Override public void onCommit(Map offsets) { // Custom logic on commit } @Override public void close() { // Cleanup resources if necessary } @Override public void configure(Map configs) { // Configuration logic } }
Créez une application grand public simple qui écoute les messages d'un sujet Kafka.
package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; @SpringBootApplication @EnableBinding(KafkaProcessor.class) public class KafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); } @StreamListener("input") public void handle(Messagemessage) { System.out.println("Received message: " + message.getPayload()); } }
Définissez une interface pour lier le canal d'entrée au sujet Kafka.
package com.example; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface KafkaProcessor { String INPUT = "input"; @Input(INPUT) SubscribableChannel input(); }
Lorsque des messages sont envoyés au sujet Kafka, MyConsumerInterceptor interceptera les enregistrements et vous devriez voir les messages de journal interceptés.
Dans ce blog, nous avons exploré comment utiliser un intercepteur consommateur avec Spring Cloud Stream Kafka Binder. Les intercepteurs constituent un moyen puissant de traiter, enregistrer et manipuler les enregistrements avant qu'ils ne soient consommés par l'application. En intégrant des intercepteurs personnalisés, vous pouvez améliorer les fonctionnalités de vos consommateurs Kafka, en ajoutant des fonctionnalités précieuses telles que la journalisation, la collecte de métriques et la transformation des données.
En suivant les étapes décrites dans ce guide, vous devriez être en mesure d'implémenter et de configurer de manière transparente des intercepteurs grand public dans vos applications Spring Cloud Stream. Bon codage !
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!