SpringBoot が Kafka 構成ツール クラスを統合する方法
spring-kafka は、java バージョンの kafka クライアントと spring の統合に基づいています。簡単に操作できるようにさまざまなメソッドをカプセル化する KafkaTemplate を提供します。Apache の kafka クライアントをカプセル化し、クライアントの依存関係をインポートする必要はありません。
<!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
YML 構成
kafka: #bootstrap-servers: server1:9092,server2:9093 #kafka开发地址, #生产者配置 producer: # Kafka提供的序列化和反序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化 value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 1 # 消息发送重试次数 #acks = 0:设置成 表示 producer 完全不理睬 leader broker 端的处理结果。此时producer 发送消息后立即开启下 条消息的发送,根本不等待 leader broker 端返回结果 #acks= all 或者-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给,消息安全但是吞吐量会比较低。 #acks = 1:默认的参数值。 producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果给producer ,而无须等待其他副本写入该消息。折中方案,只要leader一直活着消息就不会丢失,同时也保证了吞吐量 acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) batch-size: 16384 #批量大小 properties: linger: ms: 0 #提交延迟 buffer-memory: 33554432 # 生产端缓冲区大小 # 消费者配置 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 分组名称 group-id: web enable-auto-commit: false #提交offset延时(接收到消息后多久提交offset) # auto-commit-interval: 1000ms #当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset; # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; auto-offset-reset: latest properties: #消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) session.timeout.ms: 15000 #消费请求超时时间 request.timeout.ms: 18000 #批量消费每次最多消费多少条消息 #每次拉取一条,一条条消费,当然是具体业务状况设置 max-poll-records: 1 # 指定心跳包发送频率,即间隔多长时间发送一次心跳包,优化该值的设置可以减少Rebalance操作,默认时间为3秒; heartbeat-interval: 6000 # 发出请求时传递给服务器的 ID。用于服务器端日志记录 正常使用后解开注释,不然只有一个节点会报错 #client-id: mqtt listener: #消费端监听的topic不存在时,项目启动会报错(关掉) missing-topics-fatal: false #设置消费类型 批量消费 batch,单条消费:single type: single #指定容器的线程数,提高并发量 #concurrency: 3 #手动提交偏移量 manual达到一定数据后批量提交 #ack-mode: manual ack-mode: MANUAL_IMMEDIATE #手動確認消息 # 认证 #properties: #security: #protocol: SASL_PLAINTEXT #sasl: #mechanism: SCRAM-SHA-256 #jaas:config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
シンプルなツール クラス、通常の使用に対応、テーマは変更不可
@Component @Slf4j public class KafkaUtils<K, V> { @Autowired private KafkaTemplate kafkaTemplate; @Value("${spring.kafka.bootstrap-servers}") String[] servers; /** * 获取连接 * @return */ private Admin getAdmin() { Properties properties = new Properties(); properties.put("bootstrap.servers", servers); // 正式环境需要添加账号密码 return Admin.create(properties); } /** * 增加topic * * @param name 主题名字 * @param partition 分区数量 * @param replica 副本数量 * @date 2022-06-23 chens */ public R addTopic(String name, Integer partition, Integer replica) { Admin admin = getAdmin(); if (replica > servers.length) { return R.error("副本数量不允许超过Broker数量"); } try { NewTopic topic = new NewTopic(name, partition, Short.parseShort(replica.toString())); admin.createTopics(Collections.singleton(topic)); } finally { admin.close(); } return R.ok(); } /** * 删除主题 * * @param names 主题名字集合 * @date 2022-06-23 chens */ public void deleteTopic(List<String> names) { Admin admin = getAdmin(); try { admin.deleteTopics(names); } finally { admin.close(); } } /** * 查询所有主题 * * @date 2022-06-24 chens */ public Set<String> queryTopic() { Admin admin = getAdmin(); try { ListTopicsResult topics = admin.listTopics(); Set<String> set = topics.names().get(); return set; } catch (Exception e) { log.error("查询主题错误!"); } finally { admin.close(); } return null; } // 向所有分区发送消息 public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) { return kafkaTemplate.send(topic, data); } // 指定key发送消息,相同key保证消息在同一个分区 public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) { return kafkaTemplate.send(topic, key, data); } // 指定分区和key发送。 public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) { return kafkaTemplate.send(topic, partition, key, data); } }
送信メッセージ 非同期の使用
@GetMapping("/{topic}") public String test(@PathVariable String topic, @PathVariable Long index) throws ExecutionException, InterruptedException { ListenableFuture future = null; Chenshuang user = new Chenshuang(i, "陈爽", "123456", new Date()); String s = JSON.toJSONString(user); KafkaUtils utils = new KafkaUtils(); future = kafkaUtils.send(topic, s); // 异步回调,同步get,会等待 不推荐同步! future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { System.out.println("发送失败"); } @Override public void onSuccess(Object result) { System.out.println("发送成功:" + result); } }); return "发送成功"; }
トピックの作成
ブローカーが auto.create.topics.enable を true に設定した場合 (デフォルトは true)、メッセージを受信するときに使用されます。クライアントからのメタデータ要求 トピックを作成します。
存在しないトピックを送信して利用すると、新しいトピックが作成されます。多くの場合、予期しないトピックが作成されると、多くの予期せぬ問題が発生します。この機能をオフにすることをお勧めします。
トピック トピックは、さまざまなタイプのメッセージを区別するために使用されます。実際、これらはさまざまなビジネス シナリオに適しています。デフォルトでは、メッセージは 1 週間保存されます。
同じトピック トピックの下で、デフォルトはパーティションです。つまり、消費できるコンシューマは 1 つだけです。消費容量を向上させたい場合は、パーティションを追加する必要があります。
同じトピックの複数のパーティションには、メッセージ (キー、値) を別のパーティションに分散する 3 つの方法、パーティションの指定、ハッシュ ルーティング、デフォルト、同じパーティション内のメッセージ ID は一意であり、順序付けられています;
コンシューマーがパーティション パーティション内のメッセージを消費する場合、メッセージの場所を識別するためにオフセットを使用します;
GroupId は、同じトピックの下で繰り返し消費される問題を解決するために使用されます。たとえば、消費を複数のコンシューマーが受信する必要がある場合は、次のようにすることができます。異なる GroupId を設定することで実現されます。
実際のメッセージは 1 つのコピーに保存されます。識別子を論理的に設定することによってのみ区別されます。システムは、Topic トピック -> GroupId グループ - およびパーティション パーティションの下にオフセットを記録します。消費されたかどうかを特定します。
メッセージ送信の高可用性 - クラスター モード、マルチコピーの実装。メッセージの送信は、acks 識別子を設定することで異なる可用性を実現できます。=0 の場合、送信が成功すれば OK です。;= 1、マスターは OK になる前に正常に応答し、=all、応答の半分以上が OK (実際の高可用性)
コンシューマー メッセージの高可用性 -- 自動識別オフサート モードをオフにすることができます。消費が完了した後、最初にメッセージをプルし、消費の高可用性を解決するためにオフセット位置を設定します。
import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaTopic { // yml自定义主题,项目启动就创建, @Value("${spring.kafka.topic}") String topic; @Value("${spring.kafka.bootstrap-servers}") String[] server; /** * 项目启动 初始化主题,如果存在不会覆盖主题的 */ @Bean public NewTopic batchTopic() { // 最大复制因子 <= 经纪人broker数量. return new NewTopic(topic, 10, (short) server.length); } }
リスニング クラス、メッセージ。メッセージが監視するパーティション 1 を指定すると、また消費されます。
同じ方法で異なるトピックを監視し、ディスプレイスメント監視を指定することもできます。同じグループは均等に消費し、異なるグループは繰り返し消費します。
1. ユニキャスト モード、コンシューマ グループは 1 つだけです
(1) トピックにはパーティションが 1 つだけあり、グループ内に複数のコンシューマがある場合、同じパーティション内のメッセージはグループ内の 1 人の消費者のみが消費できます。図 1 に示すように、コンシューマの数がパーティションの数を超えると、超過したコンシューマはアイドル状態になります。トピックとテストには、パーティションが 1 つとグループ G1 が 1 つだけあります。このグループには複数のコンシューマがあり、そのうちの 1 つだけが使用でき、他のコンシューマはアイドル状態です。
(2) トピックには複数のパーティションがあり、グループ内には複数のコンシューマがあります。たとえば、test には 3 つのパーティションがあります。グループ内に 2 つのコンシューマーがいる場合、C0 は p0 と p1 のデータの消費に対応し、c1 は p2 のデータの消費に対応します。コンシューマーが 3 人の場合、1 つのコンシューマーは 1 つのパーティション内のデータの消費に対応します。図を図 2 と図 3 に示します。このモードはクラスター モードで非常に一般的です。たとえば、3 つのサービスを開始し、対応するトピックに 3 つのパーティションを設定できるため、並列消費が達成され、メッセージ処理の効率が向上します。効率を大幅に向上させることができます。
#図 2
図 3
2. ブロードキャスト モード、複数のコンシューマー グループ
図 4
(2) 複数のコンシューマ グループ、複数のパーティション
注意: 消费者的数量并不能决定一个topic的并行度。它是由分区的数目决定的。
再多的消费者,分区数少,也是浪费!
一个组的最大并行度将等于该主题的分区数。
@Component @Slf4j public class Consumer { // 监听主题 分组a @KafkaListener(topics =("${spring.kafka.topic}") ,groupId = "a") public void getMessage(ConsumerRecord message, Acknowledgment ack) { //确认收到消息 ack.acknowledge(); } // 监听主题 分组a @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "a") public void getMessage2(ConsumerRecord message, Acknowledgment ack) { //确认收到消息 ack.acknowledge(); } // 监听主题 分组b @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b") public void getMessage3(ConsumerRecord message, Acknowledgment ack) { //确认收到消息//确认收到消息 ack.acknowledge(); } // 监听主题 分组b @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b") public void getMessage4(ConsumerRecord message, Acknowledgment ack) { //确认收到消息//确认收到消息 ack.acknowledge(); } // 指定监听分区1的消息 @KafkaListener(topicPartitions = {@TopicPartition(topic = ("${spring.kafka.topic}"),partitions = {"1"})}) public void getMessage5(ConsumerRecord message, Acknowledgment ack) { Long id = JSONObject.parseObject(message.value().toString()).getLong("id"); //确认收到消息//确认收到消息 ack.acknowledge(); } /** * @Title 指定topic、partition、offset消费 * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8 * 注意:topics和topicPartitions不能同时使用; **/ @KafkaListener(id = "c1",groupId = "c",topicPartitions = { @TopicPartition(topic = "t1", partitions = { "0" }), @TopicPartition(topic = "t2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))}) public void getMessage6(ConsumerRecord record,Acknowledgment ack) { //确认收到消息 ack.acknowledge(); } /** * 批量消费监听goods变更消息 * yml配置listener:type 要改为batch * ymk配置consumer:max-poll-records: ??(每次拉取多少条数据消费) * concurrency = "2" 启动多少线程执行,应小于等于broker数量,避免资源浪费 */ @KafkaListener(id="sync-modify-goods", topics = "${spring.kafka.topic}",concurrency = "4") public void getMessage7(List<ConsumerRecord<String, String>> records){ for (ConsumerRecord<String, String> msg:records) { GoodsChangeMsg changeMsg = null; try { changeMsg = JSONObject.parseObject(msg.value(), GoodsChangeMsg.class); syncGoodsProcessor.handle(changeMsg); }catch (Exception exception) { log.error("解析失败{}", msg, exception); } } } }
以上がSpringBoot が Kafka 構成ツール クラスを統合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック











インターネットとテクノロジーの発展に伴い、デジタル投資への関心が高まっています。多くの投資家は、より高い投資収益率を得ることを期待して、投資戦略を模索し、研究し続けています。株式取引では、リアルタイムの株式分析が意思決定に非常に重要であり、Kafka のリアルタイム メッセージ キューと PHP テクノロジの使用は効率的かつ実用的な手段です。 1. Kafka の概要 Kafka は、LinkedIn によって開発された高スループットの分散型パブリッシュおよびサブスクライブ メッセージング システムです。 Kafka の主な機能は次のとおりです。

SpringBoot と SpringMVC はどちらも Java 開発で一般的に使用されるフレームワークですが、それらの間には明らかな違いがいくつかあります。この記事では、これら 2 つのフレームワークの機能と使用法を調べ、その違いを比較します。まず、SpringBoot について学びましょう。 SpringBoot は、Spring フレームワークに基づいたアプリケーションの作成と展開を簡素化するために、Pivotal チームによって開発されました。スタンドアロンの実行可能ファイルを構築するための高速かつ軽量な方法を提供します。

Kafka 視覚化ツールの 5 つのオプション ApacheKafka は、大量のリアルタイム データを処理できる分散ストリーム処理プラットフォームです。これは、リアルタイム データ パイプライン、メッセージ キュー、イベント駆動型アプリケーションの構築に広く使用されています。 Kafka の視覚化ツールは、ユーザーが Kafka クラスターを監視および管理し、Kafka データ フローをより深く理解するのに役立ちます。以下は、5 つの人気のある Kafka 視覚化ツールの紹介です。 ConfluentControlCenterConfluent

適切な Kafka 視覚化ツールを選択するにはどうすればよいですか? 5 つのツールの比較分析 はじめに: Kafka は、ビッグ データの分野で広く使用されている、高性能、高スループットの分散メッセージ キュー システムです。 Kafka の人気に伴い、Kafka クラスターを簡単に監視および管理するためのビジュアル ツールを必要とする企業や開発者が増えています。この記事では、読者がニーズに合ったツールを選択できるように、一般的に使用される 5 つの Kafka 視覚化ツールを紹介し、その特徴と機能を比較します。 1.カフカマネージャー

この記事では、dubbo+nacos+Spring Boot の実際の開発について詳しく説明する例を書きます。この記事では理論的な知識はあまり取り上げませんが、dubbo を nacos と統合して開発環境を迅速に構築する方法を説明する最も簡単な例を書きます。

RockyLinux に ApacheKafka をインストールするには、次の手順に従います。 システムの更新: まず、RockyLinux システムが最新であることを確認し、次のコマンドを実行してシステム パッケージを更新します: sudoyumupdate Java のインストール: ApacheKafka は Java に依存しているため、最初に JavaDevelopmentKit (JDK) をインストールします)。 OpenJDK は、次のコマンドを使用してインストールできます。 sudoyuminstalljava-1.8.0-openjdk-devel ダウンロードして解凍します。 ApacheKafka 公式 Web サイト () にアクセスして、最新のバイナリ パッケージをダウンロードします。安定したバージョンを選択してください

SpringBootプロジェクトでよく使われるディレクトリ SpringBoot開発時のディレクトリ構成と命名仕様を基に、SpringBootプロジェクトのディレクトリ構成と命名仕様を紹介します 導入を通じて問題解決をお手伝いします 実際のディレクトリ構成の計画方法プロジェクト?ディレクトリにもっと標準化された名前を付けるにはどうすればよいでしょうか?各ディレクトリは何を意味しますか? 3 つの質問をお待ちください。ディレクトリの説明 servicex//プロジェクト名|-admin-ui//管理サービス フロントエンド コード (通常、管理を容易にするために、UI とサービスは 1 つのプロジェクトにまとめられます)|-servicex-auth//モジュール 1|-servicex-common//モジュール 2|-servicex-gateway//モジュール 3|

近年、ビッグ データと活発なオープン ソース コミュニティの台頭により、ますます多くの企業が増大するデータ ニーズを満たすために高性能の対話型データ処理システムを探し始めています。このテクノロジー アップグレードの波の中で、go-zero と Kafka+Avro はますます多くの企業に注目され、採用されています。 go-zero は、Golang 言語をベースに開発されたマイクロサービス フレームワークで、高いパフォーマンス、使いやすさ、拡張の容易さ、メンテナンスの容易さという特徴を備えており、企業が効率的なマイクロサービス アプリケーション システムを迅速に構築できるように設計されています。その急速な成長
