Ajoutez les dépendances suivantes dans pom.xml :
org.springframework.kafka spring-kafka 2.8.0
Ajoutez la configuration suivante dans le fichierapplication.yml
:application.yml
文件中添加以下配置:
sping: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer
这里我们配置了 Kafka 的服务地址为localhost:9092
,配置了一个消费者组 ID 为my-group
,并设置了一个最早的偏移量来读取消息。在生产者方面,我们配置了消息序列化程序为StringSerializer
。
我们现在要创建一个 Kafka 生产者,以便向 Kafka 服务器发送消息。我们将在此处创建一个 RESTful API 端点,以接收 POST 请求并将消息发送到 Kafka。
首先,我们将创建一个KafkaProducerConfig
类,用于配置 Kafka 生产者:
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public MapproducerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
在上面的代码中,我们使用@Configuration
注解将KafkaProducerConfig
类声明为配置类。然后,我们使用@Value
注解注入配置文件中的bootstrap-servers
属性。
接下来,我们创建了一个producerConfigs
方法,用于设置 Kafka 生产者的配置。在这里,我们设置了BOOTSTRAP_SERVERS_CONFIG
、KEY_SERIALIZER_CLASS_CONFIG
和VALUE_SERIALIZER_CLASS_CONFIG
三个属性。
然后,我们创建了一个producerFactory
方法,用于创建 Kafka 生产者工厂。在这里,我们使用了DefaultKafkaProducerFactory
类,并传递了我们的配置。
最后,我们创建了一个kafkaTemplate
方法,用于创建KafkaTemplate
实例。在这里,我们使用了刚刚创建的生产者工厂作为参数,然后返回KafkaTemplate
实例。
接下来,我们将创建一个 RESTful 端点,用于接收 POST 请求并将消息发送到 Kafka。在这里,我们将使用@RestController
注解创建一个 RESTful 控制器:
@RestController public class KafkaController { @Autowired private KafkaTemplatekafkaTemplate; @PostMapping("/send") public void sendMessage(@RequestBody String message) { kafkaTemplate.send("my-topic", message); } }
在上面的代码中,我们使用@Autowired
注解将KafkaTemplate
实例注入到KafkaController
类中。然后,我们创建了一个sendMessage
方法,用于发送消息到 Kafka。
在这里,我们使用kafkaTemplate.send
方法发送消息到my-topic
主题。send 方法返回一个ListenableFuture
对象,用于异步处理结果。
现在,我们将创建一个 Kafka 消费者,用于从 Kafka 服务器接收消息。在这里,我们将创建一个消费者组,并将其配置为从my-topic
主题读取消息。
首先,我们将创建一个KafkaConsumerConfig
类,用于配置 Kafka 消费者:
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Bean public MapconsumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
在上面的代码中,我们使用@Configuration
注解将KafkaConsumerConfig
类声明为配置类,并使用@EnableKafka
注解启用 Kafka。
然后,我们使用@Value
注解注入配置文件中的bootstrap-servers
和consumer.group-id
属性。
接下来,我们创建了一个consumerConfigs
方法,用于设置 Kafka 消费者的配置。在这里,我们设置了BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIG
、AUTO_OFFSET_RESET_CONFIG
、KEY_DESERIALIZER_CLASS_CONFIG
和VALUE_DESERIALIZER_CLASS_CONFIG
五个属性。
然后,我们创建了一个consumerFactory
方法,用于创建 Kafka 消费者工厂。在这里,我们使用了DefaultKafkaConsumerFactory
类,并传递了我们的配置。
最后,我们创建了一个kafkaListenerContainerFactory
方法,用于创建一个ConcurrentKafkaListenerContainerFactory
实例。在这里,我们将消费者工厂注入到kafkaListenerContainerFactory
实例中。
接下来,我们将创建一个 Kafka 消费者类KafkaConsumer
,用于监听my-topic
主题并接收消息:
@Service public class KafkaConsumer { @KafkaListener(topics = "my-topic", groupId = "my-group-id") public void consume(String message) { System.out.println("Received message: " + message); } }
在上面的代码中,我们使用@KafkaListener
注解声明了一个消费者方法,用于接收从my-topic
主题中读取的消息。在这里,我们将消费者组 ID 设置为my-group-id
。
现在,我们已经完成了 Kafka 生产者和消费者的设置。我们可以使用mvn spring-boot:run
命令启动应用程序,并使用 curl 命令发送 POST 请求到http://localhost:8080/send
rrreee
localhost:9092
, configuré un ID de groupe de consommateurs comme
my-group
et défini un décalage le plus précoce pour lire les informations. Du côté du producteur, nous avons configuré le sérialiseur de messages comme
StringSerializer
. Étape 3 : Créer un producteurNous allons maintenant créer un producteur Kafka pour envoyer des messages au serveur Kafka. Ici, nous allons créer un point de terminaison d'API RESTful pour recevoir les requêtes POST et envoyer des messages à Kafka. Tout d'abord, nous allons créer une classe
KafkaProducerConfig
pour configurer le producteur Kafka : rrreeeDans le code ci-dessus, nous utilisons l'annotation
@Configuration
pour
Le KafkaProducerConfig La classe
est déclarée comme classe de configuration. Nous injectons ensuite l'attribut
bootstrap-servers
dans le fichier de configuration en utilisant l'annotation
@Value
. Ensuite, nous créons une méthode
producerConfigs
pour définir la configuration du producteur Kafka. Ici, nous définissons trois propriétés :
BOOTSTRAP_SERVERS_CONFIG
,
KEY_SERIALIZER_CLASS_CONFIG
et
VALUE_SERIALIZER_CLASS_CONFIG
. Ensuite, nous créons une méthode
producerFactory
pour créer une usine de producteurs Kafka. Ici, nous avons utilisé la classe
DefaultKafkaProducerFactory
et transmis notre configuration. Enfin, nous avons créé une méthode
kafkaTemplate
pour créer des instances
KafkaTemplate
. Ici, nous utilisons la fabrique de producteurs que nous venons de créer comme paramètre et renvoyons une instance de
KafkaTemplate
. Ensuite, nous allons créer un point de terminaison RESTful qui reçoit les requêtes POST et envoie des messages à Kafka. Ici, nous allons créer un contrôleur RESTful en utilisant l'annotation
@RestController
: rrreee Dans le code ci-dessus, nous utiliserons l'annotation
@Autowired
pour
KafkaTemplate code> est injectée dans la classe KafkaController
. Ensuite, nous avons créé une méthode
sendMessage
pour envoyer des messages à Kafka. Ici, nous utilisons la méthode
kafkaTemplate.send
pour envoyer des messages au sujet
my-topic
. La méthode send renvoie un objet
ListenableFuture
pour le traitement asynchrone des résultats. Étape 4 : Créer un consommateurMaintenant, nous allons créer un consommateur Kafka pour recevoir les messages du serveur Kafka. Ici, nous allons créer un groupe de consommateurs et le configurer pour lire les messages du sujet
my-topic
. Tout d'abord, nous allons créer une classe
KafkaConsumerConfig
pour configurer le consommateur Kafka : rrreeeDans le code ci-dessus, nous utilisons l'annotation
@Configuration
pour
Le KafkaConsumerConfig La classe
est déclarée comme classe de configuration et Kafka est activé à l'aide de l'annotation
@EnableKafka
. Ensuite, nous utilisons l'annotation
@Value
pour injecter les propriétés
bootstrap-servers
et
consumer.group-id
dans le fichier de configuration. Ensuite, nous créons une méthode
consumerConfigs
pour définir la configuration du consommateur Kafka. Ici, nous définissons cinq propriétés :
BOOTSTRAP_SERVERS_CONFIG, GROUP_ID_CONFIG
,
AUTO_OFFSET_RESET_CONFIG
,
KEY_DESERIALIZER_CLASS_CONFIG
et
VALUE_DESERIALIZER_CLASS_CONFIG
. Ensuite, nous créons une méthode
consumerFactory
pour créer une usine de consommation Kafka. Ici, nous avons utilisé la classe
DefaultKafkaConsumerFactory
et transmis notre configuration. Enfin, nous avons créé une méthode
kafkaListenerContainerFactory
pour créer une instance
ConcurrentKafkaListenerContainerFactory
. Ici, nous injectons la consumer factory dans l'instance
kafkaListenerContainerFactory
. Ensuite, nous allons créer une classe de consommateur Kafka
KafkaConsumer
pour écouter le sujet
my-topic
et recevoir des messages : rrreeeDans le code ci-dessus, nous utilisons le @KafkaListenerpour déclarer une méthode consommateur qui reçoit les messages lus à partir du sujet
my-topic
. Ici, nous définissons l'ID du groupe de consommateurs sur
my-group-id
. Maintenant, nous avons terminé la mise en place du producteur et du consommateur Kafka. Nous pouvons utiliser la commande
mvn spring-boot:run
pour démarrer l'application et utiliser la commande curl pour envoyer une requête POST au
http://localhost:8080/send
point de terminaison pour envoyer le message Envoyer à Kafka. On peut alors visualiser les messages reçus par le consommateur sur la console. Il s'agit de la configuration de base pour utiliser Spring Boot et Kafka. Nous pouvons changer et nous développer selon les besoins pour répondre à des besoins spécifiques.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!