Maison> interface Web> js tutoriel> le corps du texte

Exploration de l'intercepteur de consommateurs Spring Cloud Stream Kafka Binder

WBOY
Libérer: 2024-08-06 19:20:50
original
883 Les gens l'ont consulté

Exploring Spring Cloud Stream Kafka Binder Consumer Interceptor

Introduction

Spring Cloud Stream est un framework qui simplifie le développement de microservices basés sur des messages en faisant abstraction des courtiers de messages tels qu'Apache Kafka et RabbitMQ. L'une des fonctionnalités puissantes de Spring Cloud Stream est sa capacité à s'intégrer de manière transparente à Kafka, permettant aux développeurs de créer des applications événementielles robustes et évolutives. Le classeur Kafka de Spring Cloud Stream permet de se connecter facilement aux sujets Kafka.

Dans ce blog, nous verrons comment utiliser un intercepteur consommateur avec Spring Cloud Stream Kafka Binder. Les intercepteurs de Kafka fournissent un mécanisme pour intercepter et modifier les enregistrements avant qu'ils ne soient consommés par l'application, offrant ainsi des opportunités de journalisation, de collecte de métriques et de manipulation de données.

Conditions préalables

Avant de plonger dans les détails, assurez-vous d'avoir les prérequis suivants :

  • Java Development Kit (JDK) 8 ou version ultérieure
  • Apache Kafka
  • Spring Boot 2.x ou version ultérieure
  • Maven ou Gradle

Configuration de l'application Spring Boot

Tout d'abord, mettons en place un projet Spring Boot simple avec les dépendances nécessaires pour Spring Cloud Stream et Kafka.

Maven pom.xml

  org.springframework.boot spring-boot-starter   org.springframework.cloud spring-cloud-starter-stream-kafka   org.springframework.boot spring-boot-starter-test test      org.springframework.cloud spring-cloud-dependencies Hoxton.SR10 pom import   
Copier après la connexion

Gradle build.gradle

dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka' testImplementation 'org.springframework.boot:spring-boot-starter-test' } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR10" } }
Copier après la connexion

Configuration de Kafka Binder

Ensuite, configurez le classeur Kafka dans le fichier application.yml.

spring: cloud: stream: bindings: input: destination: my-topic group: my-group consumer: interceptor-classes: com.example.MyConsumerInterceptor kafka: binder: brokers: localhost:9092
Copier après la connexion

Création d'un intercepteur de consommateur Kafka

Pour créer un intercepteur consommateur, implémentez l'interface ConsumerInterceptor fournie par Kafka. Cette interface vous permet de définir une logique personnalisée pour intercepter et traiter les enregistrements avant qu'ils n'atteignent l'application.

package com.example; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.Configurable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class MyConsumerInterceptor implements ConsumerInterceptor, Configurable { private static final Logger logger = LoggerFactory.getLogger(MyConsumerInterceptor.class); @Override public ConsumerRecords onConsume(ConsumerRecords records) { records.forEach(record -> { logger.info("Intercepted record: key = {}, value = {}", record.key(), record.value()); // Add your custom logic here }); return records; } @Override public void onCommit(Map offsets) { // Custom logic on commit } @Override public void close() { // Cleanup resources if necessary } @Override public void configure(Map configs) { // Configuration logic } }
Copier après la connexion

Création de l'application consommateur

Créez une application grand public simple qui écoute les messages d'un sujet Kafka.

package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; @SpringBootApplication @EnableBinding(KafkaProcessor.class) public class KafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); } @StreamListener("input") public void handle(Message message) { System.out.println("Received message: " + message.getPayload()); } }
Copier après la connexion

Interface pour la liaison

Définissez une interface pour lier le canal d'entrée au sujet Kafka.

package com.example; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface KafkaProcessor { String INPUT = "input"; @Input(INPUT) SubscribableChannel input(); }
Copier après la connexion

Exécution de l'application

  1. Démarrez le courtier Kafka et créez le sujet requis (my-topic).
  2. Exécutez l'application Spring Boot.

Lorsque des messages sont envoyés au sujet Kafka, MyConsumerInterceptor interceptera les enregistrements et vous devriez voir les messages de journal interceptés.

Conclusion

Dans ce blog, nous avons exploré comment utiliser un intercepteur consommateur avec Spring Cloud Stream Kafka Binder. Les intercepteurs constituent un moyen puissant de traiter, enregistrer et manipuler les enregistrements avant qu'ils ne soient consommés par l'application. En intégrant des intercepteurs personnalisés, vous pouvez améliorer les fonctionnalités de vos consommateurs Kafka, en ajoutant des fonctionnalités précieuses telles que la journalisation, la collecte de métriques et la transformation des données.

En suivant les étapes décrites dans ce guide, vous devriez être en mesure d'implémenter et de configurer de manière transparente des intercepteurs grand public dans vos applications Spring Cloud Stream. Bon codage !

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:dev.to
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal
À propos de nous Clause de non-responsabilité Sitemap
Site Web PHP chinois:Formation PHP en ligne sur le bien-être public,Aidez les apprenants PHP à grandir rapidement!