目錄
介紹
Kafka核心相關名稱
kafka叢集安裝
首頁 Java java教程 Java分散式之Kafka訊息佇列實例分析

Java分散式之Kafka訊息佇列實例分析

Apr 19, 2023 pm 04:10 PM
java kafka

介紹

Apache Kafka 是分散式發布-訂閱訊息系統,在 kafka官網上對 kafka 的定義:一個分散式發布-訂閱訊息傳遞系統。它最初由LinkedIn公司開發,Linkedin於2010年貢獻給了Apache基金會並成為頂級開源專案。 Kafka是一種快速、可擴展的、設計內在就是分散式的,分區的和可複製的提交日誌服務。

注意:Kafka並沒有遵循JMS規範(),它只提供了發布和訂閱通訊方式。

Kafka核心相關名稱

  1. Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka叢集

  2. #Topic:一類訊息,訊息存放的目錄即主題,例如page view日誌、click日誌等都可以以topic的形式存在,Kafka叢集能夠同時負責多個topic的分發

  3. #massage: Kafka中最基本的傳遞物件。

  4. Partition:topic物理上的分組,一個topic可以分成多個partition,每個partition是一個有序的佇列。 Kafka裡面實作分區,一個broker就是表示一個區域。

  5. Segment:partition物理上由多個segment組成,每個Segment存message資訊

  6. Producer : 生產者,生產message發送到topic

  7. Consumer : 消費者,訂閱topic並消費message, consumer作為一個執行緒來消費

  8. Consumer Group:消費者群組,一個Consumer Group包含多個consumer

  9. Offset:偏移量,理解為訊息partition 中訊息的索引位置

主題和佇列的區別:

佇列是一個資料結構,遵循先進先出原則

kafka叢集安裝

  • 每台伺服器上安裝jdk1.8環境

  • 安裝Zookeeper叢集環境

  • 安裝kafka叢集環境

  • 執行環境測試

Java分散式之Kafka訊息佇列實例分析

安裝jdk環境和zookeeper這裡不詳述了。

kafka為什麼依賴zookeeper:kafka會將mq資訊存放在zookeeper上,為了讓整個叢集能夠方便擴展,採用zookeeper的事件通知相互感知。

kafka叢集安裝步驟:

1、下載kafka的壓縮套件

2、解壓縮安裝套件

##tar -zxvf kafka_2.11 -1.0.0.tgz

3、修改kafka的設定檔config/server.properties

設定檔修改內容:

  • zookeeper連線位址:

    zookeeper.connect=192.168.1.19:2181

  • 監聽的ip,修改為本機的ip

    listeners=PLAINTEXT:// 192.168.1.19:9092

  • kafka的brokerid,每台broker的id都不一樣

    broker.id=0

#4、依序啟動kafka

./kafka-server-start.sh -daemon config/server.properties

kafka使用

kafka檔案儲存

topic是邏輯上的概念,而partition是物理上的概念,每個partition對應於一個log文件,該log檔案中儲存的就是Producer產生的資料。 Producer產生的資料會被不斷追加到該log檔案末端,為防止log檔案過大導致資料定位效率低下,Kafka採取了分片和索引機制,將每個partition分為多個segment,每個segment包含: “.index”文件、“.log”文件和.timeindex等文件。這些檔案位於一個資料夾下,該資料夾的命名規則為:topic名稱 分割區序號。

例如:執行指令新建一個主題,分三個區存放放在三個broker中:

#./kafka-topics.sh --create --zookeeper localhost: 2181 --replication-factor 1 --partitions 3 --topic kaico

Java分散式之Kafka訊息佇列實例分析

Java分散式之Kafka訊息佇列實例分析

  • 一個partition分為多個segment

  • .log 日誌檔

  • #.index 偏移量索引檔

  • .timeindex 時間戳索引檔

  • 其他檔案(partition.metadata,leader-epoch-checkpoint)

Springboot整合kafka

maven依賴

 <dependencies>
        <!-- springBoot集成kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!-- SpringBoot整合Web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

yml設定

# kafka
spring:
  kafka:
    # kafka服务器地址(可以多个)
#    bootstrap-servers: 192.168.212.164:9092,192.168.212.167:9092,192.168.212.168:9092
    bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094
    consumer:
      # 指定一个默认的组名
      group-id: kafkaGroup1
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288
      # 服务器地址
      bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094

生產者

@RestController
public class KafkaController {
	/**
	 * 注入kafkaTemplate
	 */
	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;
	/**
	 * 发送消息的方法
	 *
	 * @param key
	 *            推送数据的key
	 * @param data
	 *            推送数据的data
	 */
	private void send(String key, String data) {
		// topic 名称 key   data 消息数据
		kafkaTemplate.send("kaico", key, data);
	}
	// test 主题 1 my_test 3
	@RequestMapping("/kafka")
	public String testKafka() {
		int iMax = 6;
		for (int i = 1; i < iMax; i++) {
			send("key" + i, "data" + i);
		}
		return "success";
	}
}

消費者

@Component
public class TopicKaicoConsumer {
    /**
     * 消费者使用日志打印消息
     */
    @KafkaListener(topics = "kaico") //监听的主题
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("topic名称:" + consumer.topic() + ",key:" +
                consumer.key() + "," +
                "分区位置:" + consumer.partition()
                + ", 下标" + consumer.offset());
        //输出key对应的value的值
        System.out.println(consumer.value());
    }
}

以上是Java分散式之Kafka訊息佇列實例分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

熱門話題

Laravel 教程
1605
29
PHP教程
1510
276
什麼是Java的哈希圖? 什麼是Java的哈希圖? Aug 11, 2025 pm 07:24 PM

ahashmapinjavaiSadattrastureturethatStoreskey-valuepairsforefficeFitedReval,插入和deletion.itusesthekey’shashcode()methodtodeTermInestorageLageLageAgeLageAgeAgeAgeAgeAneStorageAgeAndAllowSavereo(1)timecomplexityforget()

如何在Java中創建和使用數組 如何在Java中創建和使用數組 Aug 11, 2025 pm 04:00 PM

toCreateAnduseanArrayInjava,第一declethearraywithththetatepeandsquarebarackets,thanStantiateItWithTheneWkeyWordeRinitialIseIsizitDirectlywithvalues; 1.DecleAteAteAndeAnArrayUsishArayusisherusingDataType [] ArraynAmeDatepe [] arraynAmename = newDatatepe [size]

如何在Java中創建線程? 如何在Java中創建線程? Aug 11, 2025 pm 01:34 PM

YouCancReateathReadInjavaByExtDingTheThEthEthEthReadClassOrimplementingTherunnablefface.2.ExtDendingThreadThreadInvolvesCreatingingAclassThatoverRidestherun()MethodAndCallingStart()onaninstance.3.implementingrementingRunnnablerequirequirequirequirequiresdefinterun()

如何在Java中使用套件 如何在Java中使用套件 Aug 11, 2025 am 11:57 AM

選擇:linkedhashsetForinsertionorder,andreesetForsortedOrder.2.addelementswithadd()andremoveWithRemove()

python argparse需要參數示例 python argparse需要參數示例 Aug 11, 2025 pm 09:42 PM

在使用argparse模塊時,必須提供的參數可通過設置required=True來實現,1.使用required=True可將可選參數(如--input)設為必填,運行腳本時若未提供會報錯;2.位置參數默認必填,無需設置required=True;3.建議必要參數使用位置參數,偶爾必須的配置再使用required=True的可選參數,以保持靈活性;4.required=True是控制參數必填最直接的方式,使用後用戶調用腳本時必須提供對應參數,否則程序將提示錯誤並退出。

如何使用Spring Boot在Java中使用請求參數 如何使用Spring Boot在Java中使用請求參數 Aug 11, 2025 pm 07:51 PM

在SpringBoot中,處理請求參數的方法包括:1.使用@RequestParam獲取查詢參數,支持必填、可选和默認值;2.通過List或Map類型接收多個同名參數;3.結合@ModelAttribute將多個參數綁定到對象;4.使用@PathVariable提取URL路徑中的變量;5.在POST請求中用@RequestParam處理表單數據;6.用Map接收所有請求參數。正確選擇註解可高效解析請求數據,提升開發效率。

Java的評論是什麼? Java的評論是什麼? Aug 12, 2025 am 08:20 AM

評論Incominjavaareignoredbythecompilereranded forexplanation,notes,OrdisablingCode.thereareThreetypes:1)單位linecommentsStartWith // andlastuntiltheEndoftheline; 2)Multi-lineCommentsBebeNWITH/ANDENCOMMENTBEMEMENT/ANDENDWITH/ANDENDWITH/ANDENDWITH/ANDENDWITH/ANDENDWITH/ANDENDWITH/ANDENDWITH/ANDCANSPANMELTIPLICEMENTS; 3)文檔

Java開發的最佳IDE:比較評論 Java開發的最佳IDE:比較評論 Aug 12, 2025 pm 02:55 PM

ThebestJavaIDEin2024dependsonyourneeds:1.ChooseIntelliJIDEAforprofessional,enterprise,orfull-stackdevelopmentduetoitssuperiorcodeintelligence,frameworkintegration,andtooling.2.UseEclipseforhighextensibility,legacyprojects,orwhenopen-sourcecustomizati

See all articles