Cara menggunakan Java untuk membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka Streams
Pengenalan:
Apache Kafka Streams ialah rangka kerja pemprosesan strim yang berkuasa yang boleh digunakan untuk membangunkan berprestasi tinggi, berskala, toleran kesalahan sebenar- aplikasi pemprosesan aliran masa. Ia dibina pada Apache Kafka dan menyediakan API yang ringkas dan berkuasa yang membolehkan kami memproses aliran data mentah dengan menyambungkan topik Kafka input dan output. Artikel ini akan memperkenalkan cara menggunakan Java untuk membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka Streams dan menyediakan beberapa contoh kod.
1. Kerja penyediaan:
Sebelum mula menggunakan Apache Kafka Streams, kita perlu menyiapkan beberapa kerja penyediaan. Mula-mula, pastikan anda memasang dan menjalankan Apache Kafka. Dalam kelompok Kafka, kita perlu mencipta dua topik: satu untuk data input dan satu untuk hasil output. Kita boleh menggunakan arahan berikut untuk mencipta topik ini:
bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Pada masa yang sama, pastikan anda menambah kebergantungan berikut dalam projek Java anda:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.4.0</version> </dependency>
2. Tulis aplikasi pemprosesan strim:
Seterusnya, kami akan menulis ringkasan apl pemprosesan strim. Dalam contoh ini, kita akan membaca data daripada topik input, mengubah data, dan kemudian menulis keputusan kepada topik output. Berikut ialah contoh pelaksanaan mudah:
import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import java.util.Properties; public class StreamProcessingApp { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> inputStream = builder.stream("input-topic"); KStream<String, String> outputStream = inputStream .mapValues(value -> value.toUpperCase()); outputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } }
Dalam kod di atas, kami mula-mula menentukan beberapa sifat konfigurasi, seperti ID aplikasi dan pelayan bootstrap. Kemudian, kami mencipta contoh StreamsBuilder dan memperoleh aliran daripada topik input. Seterusnya, kami menghantar setiap nilai dalam strim ke huruf besar dan menulis hasilnya ke topik output. Akhirnya, kami mencipta contoh KafkaStreams dan memulakan aplikasi pemprosesan strim.
3. Jalankan aplikasi:
Selepas menulis aplikasi pemprosesan strim, kita boleh menggunakan arahan berikut untuk menjalankan aplikasi:
java -cp your-project.jar StreamProcessingApp
Sila pastikan untuk menggantikan your-project.jar dengan nama fail jar projek anda yang sebenar. Selepas menjalankan aplikasi, ia akan mula memproses data dalam topik input dan menulis hasil yang diubah kepada topik output.
Kesimpulan:
Membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka Streams menggunakan Java adalah sangat mudah. Dengan menyambungkan topik Kafka input dan output serta menggunakan API Kafka Streams yang berkuasa, kami boleh membina aplikasi pemprosesan strim masa nyata berprestasi tinggi, berskala, bertoleransi kesalahan. Saya harap artikel ini dapat membantu anda bermula dengan Kafka Streams dan menggunakannya dalam projek sebenar.
Atas ialah kandungan terperinci Cara menggunakan Java untuk membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka Streams. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!