ホームページ Java &#&チュートリアル SpringBoot が Kafka 構成ツール クラスを統合する方法

SpringBoot が Kafka 構成ツール クラスを統合する方法

May 12, 2023 pm 09:58 PM
springboot kafka

spring-kafka は、java バージョンの kafka クライアントと spring の統合に基づいています。簡単に操作できるようにさまざまなメソッドをカプセル化する KafkaTemplate を提供します。Apache の kafka クライアントをカプセル化し、クライアントの依存関係をインポートする必要はありません。

<!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
ログイン後にコピー

YML 構成

kafka:
    #bootstrap-servers: server1:9092,server2:9093 #kafka开发地址,
    #生产者配置
    producer:
      # Kafka提供的序列化和反序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 1 # 消息发送重试次数
      #acks = 0:设置成 表示 producer 完全不理睬 leader broker 端的处理结果。此时producer 发送消息后立即开启下 条消息的发送,根本不等待 leader broker 端返回结果
      #acks= all 或者-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给,消息安全但是吞吐量会比较低。
      #acks = 1:默认的参数值。 producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果给producer ,而无须等待其他副本写入该消息。折中方案,只要leader一直活着消息就不会丢失,同时也保证了吞吐量
      acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 16384 #批量大小
      properties:
        linger:
          ms: 0 #提交延迟
      buffer-memory: 33554432 # 生产端缓冲区大小
    # 消费者配置
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 分组名称
      group-id: web
      enable-auto-commit: false
      #提交offset延时(接收到消息后多久提交offset)
      # auto-commit-interval: 1000ms
      #当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset;
      # latest:重置为分区中最新的offset(消费分区中新产生的数据);
      # none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-offset-reset: latest
      properties:
        #消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
        session.timeout.ms: 15000
        #消费请求超时时间
        request.timeout.ms: 18000
      #批量消费每次最多消费多少条消息
      #每次拉取一条,一条条消费,当然是具体业务状况设置
      max-poll-records: 1
      # 指定心跳包发送频率,即间隔多长时间发送一次心跳包,优化该值的设置可以减少Rebalance操作,默认时间为3秒;
      heartbeat-interval: 6000
      # 发出请求时传递给服务器的 ID。用于服务器端日志记录 正常使用后解开注释,不然只有一个节点会报错
      #client-id: mqtt
    listener:
      #消费端监听的topic不存在时,项目启动会报错(关掉)
      missing-topics-fatal: false
      #设置消费类型 批量消费 batch,单条消费:single
      type: single
      #指定容器的线程数,提高并发量
      #concurrency: 3
      #手动提交偏移量 manual达到一定数据后批量提交
      #ack-mode: manual
      ack-mode: MANUAL_IMMEDIATE #手動確認消息
        # 认证
    #properties:
      #security:
        #protocol: SASL_PLAINTEXT
      #sasl:
        #mechanism: SCRAM-SHA-256
        #jaas:config: &#39;org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";&#39;
ログイン後にコピー

シンプルなツール クラス、通常の使用に対応、テーマは変更不可

@Component
@Slf4j
public class KafkaUtils<K, V> {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Value("${spring.kafka.bootstrap-servers}")
    String[] servers;

    /**
     * 获取连接
     * @return
     */
    private Admin getAdmin() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", servers);
        // 正式环境需要添加账号密码
        return Admin.create(properties);
    }

    /**
     * 增加topic
     *
     * @param name      主题名字
     * @param partition 分区数量
     * @param replica   副本数量
     * @date 2022-06-23 chens
     */
    public R addTopic(String name, Integer partition, Integer replica) {
        Admin admin = getAdmin();
        if (replica > servers.length) {
            return R.error("副本数量不允许超过Broker数量");
        }
        try {
            NewTopic topic = new NewTopic(name, partition, Short.parseShort(replica.toString()));
            admin.createTopics(Collections.singleton(topic));
        } finally {
            admin.close();
        }
        return R.ok();
    }

    /**
     * 删除主题
     *
     * @param names 主题名字集合
     * @date 2022-06-23 chens
     */
    public void deleteTopic(List<String> names) {
        Admin admin = getAdmin();
        try {
            admin.deleteTopics(names);
        } finally {
            admin.close();
        }
    }

    /**
     * 查询所有主题
     *
     * @date 2022-06-24 chens
     */
    public Set<String> queryTopic() {
        Admin admin = getAdmin();
        try {
            ListTopicsResult topics = admin.listTopics();
            Set<String> set = topics.names().get();
            return set;
        } catch (Exception e) {
            log.error("查询主题错误!");
        } finally {
            admin.close();
        }
        return null;
    }

    // 向所有分区发送消息
    public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
        return kafkaTemplate.send(topic, data);
    }
    
    // 指定key发送消息,相同key保证消息在同一个分区
    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
        return kafkaTemplate.send(topic, key, data);
    }

    // 指定分区和key发送。
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
        return kafkaTemplate.send(topic, partition, key, data);
    }
}
ログイン後にコピー

送信メッセージ 非同期の使用

@GetMapping("/{topic}")
    public String test(@PathVariable String topic, @PathVariable Long index) throws ExecutionException, InterruptedException {

        ListenableFuture future = null;
        Chenshuang user = new Chenshuang(i, "陈爽", "123456", new Date());
        String s = JSON.toJSONString(user);
        KafkaUtils utils = new KafkaUtils();
        future = kafkaUtils.send(topic, s);
        // 异步回调,同步get,会等待 不推荐同步!
        future.addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送失败");
            }
            @Override
            public void onSuccess(Object result) {
                System.out.println("发送成功:" + result);
            }
        });
        return "发送成功";
    }
ログイン後にコピー

トピックの作成

ブローカーが auto.create.topics.enable を true に設定した場合 (デフォルトは true)、メッセージを受信するときに使用されます。クライアントからのメタデータ要求 トピックを作成します。

存在しないトピックを送信して利用すると、新しいトピックが作成されます。多くの場合、予期しないトピックが作成されると、多くの予期せぬ問題が発生します。この機能をオフにすることをお勧めします。

トピック トピックは、さまざまなタイプのメッセージを区別するために使用されます。実際、これらはさまざまなビジネス シナリオに適しています。デフォルトでは、メッセージは 1 週間保存されます。

同じトピック トピックの下で、デフォルトはパーティションです。つまり、消費できるコンシューマは 1 つだけです。消費容量を向上させたい場合は、パーティションを追加する必要があります。

同じトピックの複数のパーティションには、メッセージ (キー、値) を別のパーティションに分散する 3 つの方法、パーティションの指定、ハッシュ ルーティング、デフォルト、同じパーティション内のメッセージ ID は一意であり、順序付けられています;

コンシューマーがパーティション パーティション内のメッセージを消費する場合、メッセージの場所を識別するためにオフセットを使用します;

GroupId は、同じトピックの下で繰り返し消費される問題を解決するために使用されます。たとえば、消費を複数のコンシューマーが受信する必要がある場合は、次のようにすることができます。異なる GroupId を設定することで実現されます。

実際のメッセージは 1 つのコピーに保存されます。識別子を論理的に設定することによってのみ区別されます。システムは、Topic トピック -> GroupId グループ - およびパーティション パーティションの下にオフセットを記録します。消費されたかどうかを特定します。

メッセージ送信の高可用性 - クラスター モード、マルチコピーの実装。メッセージの送信は、acks 識別子を設定することで異なる可用性を実現できます。=0 の場合、送信が成功すれば OK です。;= 1、マスターは OK になる前に正常に応答し、=all、応答の半分以上が OK (実際の高可用性)

コンシューマー メッセージの高可用性 -- 自動識別オフサート モードをオフにすることができます。消費が完了した後、最初にメッセージをプルし、消費の高可用性を解決するためにオフセット位置を設定します。

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaTopic {
    // yml自定义主题,项目启动就创建,
    @Value("${spring.kafka.topic}")
    String topic;
    @Value("${spring.kafka.bootstrap-servers}")
    String[] server;
    /**
     * 项目启动 初始化主题,如果存在不会覆盖主题的
     */
    @Bean
    public NewTopic batchTopic() {
        // 最大复制因子 <= 经纪人broker数量.
        return new NewTopic(topic, 10, (short) server.length);
    }
}
ログイン後にコピー

リスニング クラス、メッセージ。メッセージが監視するパーティション 1 を指定すると、また消費されます。

同じ方法で異なるトピックを監視し、ディスプレイスメント監視を指定することもできます。

同じグループは均等に消費し、異なるグループは繰り返し消費します。

1. ユニキャスト モード、コンシューマ グループは 1 つだけです

(1) トピックにはパーティションが 1 つだけあり、グループ内に複数のコンシューマがある場合、同じパーティション内のメッセージはグループ内の 1 人の消費者のみが消費できます。図 1 に示すように、コンシューマの数がパーティションの数を超えると、超過したコンシューマはアイドル状態になります。トピックとテストには、パーティションが 1 つとグループ G1 が 1 つだけあります。このグループには複数のコンシューマがあり、そのうちの 1 つだけが使用でき、他のコンシューマはアイドル状態です。

図 1

SpringBoot が Kafka 構成ツール クラスを統合する方法 (2) トピックには複数のパーティションがあり、グループ内には複数のコンシューマがあります。たとえば、test には 3 つのパーティションがあります。グループ内に 2 つのコンシューマーがいる場合、C0 は p0 と p1 のデータの消費に対応し、c1 は p2 のデータの消費に対応します。コンシューマーが 3 人の場合、1 つのコンシューマーは 1 つのパーティション内のデータの消費に対応します。図を図 2 と図 3 に示します。このモードはクラスター モードで非常に一般的です。たとえば、3 つのサービスを開始し、対応するトピックに 3 つのパーティションを設定できるため、並列消費が達成され、メッセージ処理の効率が向上します。効率を大幅に向上させることができます。

#図 2

SpringBoot が Kafka 構成ツール クラスを統合する方法

図 3

2. ブロードキャスト モード、複数のコンシューマー グループ SpringBoot が Kafka 構成ツール クラスを統合する方法

ブロードキャスト モードを実装したい場合は、1 つのコンシューマ グループがメッセージを消費した後、他のグループのコンシューマの消費にまったく影響を及ぼさないように、複数のコンシューマ グループを設定する必要があります。これがコンセプトです。放送の

(1) 複数のコンシューマ グループ、1 つのパーティション

このトピックのデータは、複数のコンシューマ グループによって同時に消費されます。コンシューマ グループに複数のコンシューマがある場合、それを消費できるのは複数のコンシューマ グループのみです。図 4 に示すように、1 つのコンシューマ:

図 4

(2) 複数のコンシューマ グループ、複数のパーティション SpringBoot が Kafka 構成ツール クラスを統合する方法

このトピックは、複数のコンシューマ グループによって複数回使用できます。図 5 に示すように、コンシューマ グループ内では、各コンシューマはトピック内の 1 つ以上のパーティションに対応して並行して使用できます。 ##

注意: 消费者的数量并不能决定一个topic的并行度。它是由分区的数目决定的。
再多的消费者,分区数少,也是浪费!
一个组的最大并行度将等于该主题的分区数。

@Component
@Slf4j
public class Consumer {
    // 监听主题 分组a
    @KafkaListener(topics =("${spring.kafka.topic}") ,groupId = "a")
    public  void  getMessage(ConsumerRecord message, Acknowledgment ack) {
        //确认收到消息
        ack.acknowledge();
    }
    // 监听主题 分组a
    @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "a")
    public  void getMessage2(ConsumerRecord message, Acknowledgment ack) {
        //确认收到消息
        ack.acknowledge();
    }
    // 监听主题 分组b
    @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b")
    public  void getMessage3(ConsumerRecord message, Acknowledgment ack) {
        //确认收到消息//确认收到消息
        ack.acknowledge();
    }
    // 监听主题 分组b
    @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b")
    public  void getMessage4(ConsumerRecord message, Acknowledgment ack) {
        //确认收到消息//确认收到消息
        ack.acknowledge();
    }

    // 指定监听分区1的消息
    @KafkaListener(topicPartitions = {@TopicPartition(topic = ("${spring.kafka.topic}"),partitions = {"1"})})
    public void getMessage5(ConsumerRecord message, Acknowledgment ack) {
        Long id = JSONObject.parseObject(message.value().toString()).getLong("id");
        //确认收到消息//确认收到消息
        ack.acknowledge();
    }
    
    /**
     * @Title 指定topic、partition、offset消费
     * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
     * 注意:topics和topicPartitions不能同时使用;
     **/
    @KafkaListener(id = "c1",groupId = "c",topicPartitions = {
            @TopicPartition(topic = "t1", partitions = { "0" }),
            @TopicPartition(topic = "t2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))})
    public void getMessage6(ConsumerRecord record,Acknowledgment ack) {
        //确认收到消息
        ack.acknowledge();
    }
    
    /**        
     * 批量消费监听goods变更消息
     * yml配置listener:type 要改为batch
     * ymk配置consumer:max-poll-records: ??(每次拉取多少条数据消费)
     * concurrency = "2" 启动多少线程执行,应小于等于broker数量,避免资源浪费
     */
    @KafkaListener(id="sync-modify-goods", topics = "${spring.kafka.topic}",concurrency = "4")
    public void getMessage7(List<ConsumerRecord<String, String>> records){
        for (ConsumerRecord<String, String> msg:records) {
            GoodsChangeMsg changeMsg = null;
            try {
                changeMsg = JSONObject.parseObject(msg.value(), GoodsChangeMsg.class);
                syncGoodsProcessor.handle(changeMsg);
            }catch (Exception exception) {
                log.error("解析失败{}", msg, exception);
            }
        }
    }
}
ログイン後にコピー

以上がSpringBoot が Kafka 構成ツール クラスを統合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

PHP と Kafka を使用してリアルタイム株価分析を実装する方法 PHP と Kafka を使用してリアルタイム株価分析を実装する方法 Jun 28, 2023 am 10:04 AM

インターネットとテクノロジーの発展に伴い、デジタル投資への関心が高まっています。多くの投資家は、より高い投資収益率を得ることを期待して、投資戦略を模索し、研究し続けています。株式取引では、リアルタイムの株式分析が意思決定に非常に重要であり、Kafka のリアルタイム メッセージ キューと PHP テクノロジの使用は効率的かつ実用的な手段です。 1. Kafka の概要 Kafka は、LinkedIn によって開発された高スループットの分散型パブリッシュおよびサブスクライブ メッセージング システムです。 Kafka の主な機能は次のとおりです。

SpringBootとSpringMVCの比較と差異分析 SpringBootとSpringMVCの比較と差異分析 Dec 29, 2023 am 11:02 AM

SpringBoot と SpringMVC はどちらも Java 開発で一般的に使用されるフレームワークですが、それらの間には明らかな違いがいくつかあります。この記事では、これら 2 つのフレームワークの機能と使用法を調べ、その違いを比較します。まず、SpringBoot について学びましょう。 SpringBoot は、Spring フレームワークに基づいたアプリケーションの作成と展開を簡素化するために、Pivo​​tal チームによって開発されました。スタンドアロンの実行可能ファイルを構築するための高速かつ軽量な方法を提供します。

Kafkaを探索するための可視化ツール5選 Kafkaを探索するための可視化ツール5選 Feb 01, 2024 am 08:03 AM

Kafka 視覚化ツールの 5 つのオプション ApacheKafka は、大量のリアルタイム データを処理できる分散ストリーム処理プラットフォームです。これは、リアルタイム データ パイプライン、メッセージ キュー、イベント駆動型アプリケーションの構築に広く使用されています。 Kafka の視覚化ツールは、ユーザーが Kafka クラスターを監視および管理し、Kafka データ フローをより深く理解するのに役立ちます。以下は、5 つの人気のある Kafka 視覚化ツールの紹介です。 ConfluentControlCenterConfluent

Kafka 視覚化ツールの比較分析: 最適なツールを選択するには? Kafka 視覚化ツールの比較分析: 最適なツールを選択するには? Jan 05, 2024 pm 12:15 PM

適切な Kafka 視覚化ツールを選択するにはどうすればよいですか? 5 つのツールの比較分析 はじめに: Kafka は、ビッグ データの分野で広く使用されている、高性能、高スループットの分散メッセージ キュー システムです。 Kafka の人気に伴い、Kafka クラスターを簡単に監視および管理するためのビジュアル ツールを必要とする企業や開発者が増えています。この記事では、読者がニーズに合ったツールを選択できるように、一般的に使用される 5 つの Kafka 視覚化ツールを紹介し、その特徴と機能を比較します。 1.カフカマネージャー

SpringBoot+Dubbo+Nacos開発実践チュートリアル SpringBoot+Dubbo+Nacos開発実践チュートリアル Aug 15, 2023 pm 04:49 PM

この記事では、dubbo+nacos+Spring Boot の実際の開発について詳しく説明する例を書きます。この記事では理論的な知識はあまり取り上げませんが、dubbo を nacos と統合して開発環境を迅速に構築する方法を説明する最も簡単な例を書きます。

Rocky Linux に Apache Kafka をインストールするにはどうすればよいですか? Rocky Linux に Apache Kafka をインストールするにはどうすればよいですか? Mar 01, 2024 pm 10:37 PM

RockyLinux に ApacheKafka をインストールするには、次の手順に従います。 システムの更新: まず、RockyLinux システムが最新であることを確認し、次のコマンドを実行してシステム パッケージを更新します: sudoyumupdate Java のインストール: ApacheKafka は Java に依存しているため、最初に JavaDevelopmentKit (JDK) をインストールします)。 OpenJDK は、次のコマンドを使用してインストールできます。 sudoyuminstalljava-1.8.0-openjdk-devel ダウンロードして解凍します。 ApacheKafka 公式 Web サイト () にアクセスして、最新のバイナリ パッケージをダウンロードします。安定したバージョンを選択してください

springBoot プロジェクトで一般的に使用されるディレクトリは何ですか? springBoot プロジェクトで一般的に使用されるディレクトリは何ですか? Jun 27, 2023 pm 01:42 PM

SpringBootプロジェクトでよく使われるディレクトリ SpringBoot開発時のディレクトリ構成と命名仕様を基に、SpringBootプロジェクトのディレクトリ構成と命名仕様を紹介します 導入を通じて問題解決をお手伝いします 実際のディレクトリ構成の計画方法プロジェクト?ディレクトリにもっと標準化された名前を付けるにはどうすればよいでしょうか?各ディレクトリは何を意味しますか? 3 つの質問をお待ちください。ディレクトリの説明 servicex//プロジェクト名|-admin-ui//管理サービス フロントエンド コード (通常、管理を容易にするために、UI とサービスは 1 つのプロジェクトにまとめられます)|-servicex-auth//モジュール 1|-servicex-common//モジュール 2|-servicex-gateway//モジュール 3|

go-zero と Kafka+Avro の実践: 高性能対話型データ処理システムの構築 go-zero と Kafka+Avro の実践: 高性能対話型データ処理システムの構築 Jun 23, 2023 am 09:04 AM

近年、ビッグ データと活発なオープン ソース コミュニティの台頭により、ますます多くの企業が増大するデータ ニーズを満たすために高性能の対話型データ処理システムを探し始めています。このテクノロジー アップグレードの波の中で、go-zero と Kafka+Avro はますます多くの企業に注目され、採用されています。 go-zero は、Golang 言語をベースに開発されたマイクロサービス フレームワークで、高いパフォーマンス、使いやすさ、拡張の容易さ、メンテナンスの容易さという特徴を備えており、企業が効率的なマイクロサービス アプリケーション システムを迅速に構築できるように設計されています。その急速な成長

See all articles