Pembangunan Java: Cara menggunakan Apache Kafka Streams untuk pemprosesan dan pengkomputeran strim masa nyata
Pengenalan:
Dengan peningkatan data besar dan pengkomputeran masa nyata, Apache Kafka Streams, sebagai enjin pemprosesan strim, sedang digunakan semakin banyak Digunakan oleh pembangun. Ia menyediakan cara yang mudah tetapi berkuasa untuk mengendalikan data penstriman masa nyata dan melaksanakan pemprosesan dan pengiraan strim yang kompleks. Artikel ini akan memperkenalkan cara menggunakan Apache Kafka Streams untuk pemprosesan dan pengkomputeran strim masa nyata, termasuk mengkonfigurasi persekitaran, menulis kod dan demonstrasi sampel.
1. Penyediaan:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.8.1</version> </dependency>
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; public class KafkaStreamsApp { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); // 在这里添加流处理和计算逻辑 Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, props); streams.start(); // 添加Shutdown Hook,确保应用程序在关闭时能够优雅地停止 Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
Laksanakan arahan berikut dalam baris arahan untuk mencipta topik Kafka bernama "topik-input" dan "topik-output":
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import java.util.Arrays; public class KafkaStreamsApp { // 省略其他代码... public static void main(String[] args) { // 省略其他代码... KStream<String, String> inputStream = builder.stream("input-topic"); KTable<String, Long> wordCounts = inputStream .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+"))) .groupBy((key, word) -> word) .count(); wordCounts.toStream().to("output-topic"); // 省略其他代码... } }
bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092 >hello world >apache kafka streams >real-time processing >``` 3. 查看结果: 在命令行中执行以下命令,从"output-topic"中消费结果消息:
hello: 2
dunia: 1rreee
Atas ialah kandungan terperinci Pembangunan Java: Cara menggunakan Apache Kafka Streams untuk pemprosesan dan pengkomputeran strim masa nyata. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!