我很高興向大家介紹一系列有關 Apache Camel 的文章。在第一篇文章中,我將介紹一個實際用例來展示其功能,而不是深入研究 Apache Camel 的複雜性。具體來說,您將學習如何使用 Apache Camel 在兩個資料庫之間建立簡單的提取、轉換和載入 (ETL) 應用程式。
在深入實際用例之前,我們先簡單介紹一下 Apache Camel。 Apache Camel 是一個開源整合框架,它利用企業整合模式(EIP)來促進各種系統的整合。
當今世界,眾多不同類型的系統並存。有些可能是遺留系統,而有些則是新系統。這些系統通常需要相互交互和集成,由於不同的實現和訊息格式,這可能具有挑戰性。一種解決方案是編寫自訂程式碼來彌合這些差異,但這可能會導致緊密耦合和維護困難。
相反,Apache Camel 提供了一個額外的層來調解系統之間的差異,從而實現鬆散耦合並更易於維護。 Camel 使用 API(或宣告式 Java 域特定語言)來設定基於 EIP 的路由和中介規則。
要了解 Apache Camel,掌握「企業整合模式」(EIP) 很重要。 《企業整合模式》一書描述了一組用於設計大型基於組件的系統的模式,其中組件可以在同一進程或不同機器上運行。關鍵思想是系統應該是面向訊息的,元件透過訊息進行通訊。這些模式提供了用於實現這些通訊的工具包(圖 1)。
圖 1 – 整合解決方案的基本元素 (enterpriseintegrationpatterns.com)
端點:端點是發送和接收訊息的通道。它充當組件與外界之間的介面。
訊息:訊息是用於系統之間通訊的資料結構,由訊息頭和訊息體組成。標頭包含元數據,正文包含實際數據。
通道:通道連接兩個端點,方便訊息的發送和接收。
路由器:路由器將訊息從一個端點定向到另一個端點,確定訊息路徑。
翻譯器:翻譯器將訊息從一種格式轉換為另一種格式。
關於 Apache Camel 的介紹我考慮到此為止。現在讓我們向您展示如何使用 Apache Camel 在兩個資料庫之間建立簡單的 ETL 應用程式。
假設我們有一個高負載的系統,其中一個關鍵元件是資料庫。在某些時候,我們需要在通常的操作案例之外處理這些資料 - 訓練 ML 模型、生成導出、圖表,或我們只需要資料的某些部分。當然,這會給我們的營運資料庫帶來更大的負擔,為此,最好有一種機制,透過它我們可以提取必要的數據,將其轉換為我們需要的形式,並將其儲存在另一個資料庫中- 其他比操作性的。透過這項策略,我們解決了營運基地潛在超載的問題。而且,透過這樣的機制,我們可以在系統負載不太大的時候(例如夜間)執行此操作。
解如下圖(圖2)。我們將使用 Apache Camel 在兩個資料庫之間建立一個簡單的 ETL 應用程式。該應用程式將從來源資料庫中提取數據,對其進行轉換,然後將其載入到目標資料庫中。我們可以引入不同的策略來實現這個解決方案,重點是如何從來源資料庫中提取資料。我假設選擇資料的標準將基於記錄的修改日期。此策略也提供了提取已修改資料的機會。
圖 2 – 使用 Apache Camel 在兩個資料庫之間同步資料
來源資料庫和目標資料庫將具有以下表格結構:
CREATE TABLE IF NOT EXISTS user ( id serial PRIMARY KEY, username VARCHAR(50) NOT NULL, password VARCHAR(50) NOT NULL, email VARCHAR(255) NOT NULL, created_at timestamp default now()::timestamp without time zone, last_modified TIMESTAMP DEFAULT now()::timestamp without time zone );
在目標資料庫中,我們將在插入之前將使用者名稱轉換為大寫。
我們將使用 Camel Quarkus 擴充功能來實現各種 Camel 元件。具體來說,我們將使用 Camel SQL 元件與資料庫進行互動。 SQL元件支援執行SQL查詢、插入、更新和刪除。
首先,建立一個擴展 RouteBuilder 的類別並重寫配置方法:
@ApplicationScoped public class UserRoute extends RouteBuilder { @Override public void configure() throws Exception { // your code here } }
這裡不強制使用 @ApplicationScoped 註解,但我更願意表明該類別是一個 CDI bean,並且應該由 CDI 容器管理。
如我上面所提到的,我們將使用 Camel SQL 元件與資料庫互動。我們需要配置Camel SQL元件來連接來源資料庫和目標資料庫。我們將使用 Quarkus Agroal 擴充功能來配置資料來源。 Agroal 擴充為資料來源提供了一個連線池。我們將在 application.properties 檔案中配置資料來源。
# # Source Database Configuration quarkus.datasource.source_db.db-kind=postgresql quarkus.datasource.source_db.jdbc.url=jdbc:postgresql://localhost:5001/demo quarkus.datasource.source_db.username=test quarkus.datasource.source_db.password=password1 # # # Target Database Configuration quarkus.datasource.target_db.db-kind=postgresql quarkus.datasource.target_db.jdbc.url=jdbc:postgresql://localhost:6001/demo quarkus.datasource.target_db.username=test quarkus.datasource.target_db.password=password1 #
現在我們可以設定Camel SQL元件來連接來源資料庫和目標資料庫。我們將使用 sql 元件為來源資料庫和目標資料庫建立 SQL 端點。
SQL 元件使用下列端點 URI 表示法:
sql:select * from table where id=# order by name[?options]
但是我們需要機制來自動運行該操作。我們將使用計時器組件每秒觸發一次 ETL 過程。計時器組件用於在計時器觸發時產生訊息交換。計時器組件使用以下端點 URI 表示法:
timer:name[?options]
在我們的路線中,我們使用以下配置:
from("timer://userSync?delay={{etl.timer.delay}}&period={{etl.timer.period}}")
{{etl.timer.delay}} 和 {{etl.timer.period}} 是我們將在 application.properties 檔案中定義的設定值。
etl.timer.period=10000 etl.timer.delay=1000
為了在將資料插入目標資料庫之前轉換數據,我們需要提供翻譯器:
.process(exchange -> { final Map<String, Object> rows = exchange.getIn().getBody(Map.class); final String userName = (String) rows.get("username"); final String userNameToUpperCase = userName.toUpperCase(); log.info("User name: {} converted to upper case: {}", userName, userNameToUpperCase); rows.put("username", userNameToUpperCase); })
處理器介面用於實現訊息交換的使用者或實作訊息轉換器和其他用例。
瞧,我們使用 Apache Camel 在兩個資料庫之間建立了一個簡單的 ETL 應用程式。
執行應用程式時,您應該在日誌中看到以下輸出:
2024-06-09 13:15:49,257 INFO [route1] (Camel (camel-1) thread #1 - timer://userSync) Extracting Max last_modified value from source database 2024-06-09 13:15:49,258 INFO [route1] (Camel (camel-1) thread #1 - timer://userSync) No record found in target database 2024-06-09 13:15:49,258 INFO [route2] (Camel (camel-1) thread #1 - timer://userSync) The last_modified from source DB: 2024-06-09 13:15:49,274 INFO [route2] (Camel (camel-1) thread #1 - timer://userSync) Extracting records from source database 2024-06-09 13:15:49,277 INFO [org.iqn.cam.rou.UserRoute] (Camel (camel-1) thread #1 - timer://userSync) User name: john_doe converted to upper case: JOHN_DOE 2024-06-09 13:15:49,282 INFO [org.iqn.cam.rou.UserRoute] (Camel (camel-1) thread #1 - timer://userSync) User name: jane_smith converted to upper case: JANE_SMITH 2024-06-09 13:15:49,283 INFO [org.iqn.cam.rou.UserRoute] (Camel (camel-1) thread #1 - timer://userSync) User name: alice_miller converted to upper case: ALICE_MILLER
您可以在 GitHub 儲存庫中找到該應用程式的完整原始程式碼。
透過此設置,我們使用 Apache Camel 創建了一個簡單的 ETL 應用程序,該應用程式從來源資料庫中提取數據,對其進行轉換,然後將其加載到目標資料庫中。這種方法有助於減少操作資料庫的負載,並允許我們在非高峰時間執行資料擷取。
以上是Apache Camel 與 Quarkus 實用指南:建立 ETL 應用程式的詳細內容。更多資訊請關注PHP中文網其他相關文章!