Maison > base de données > Oracle > La maîtrise la plus systématique de l'extraction en temps réel des données Oracle de la série Flink CDC (pratiques de minage et de réglage)

La maîtrise la plus systématique de l'extraction en temps réel des données Oracle de la série Flink CDC (pratiques de minage et de réglage)

WBOY
Libérer: 2022-01-18 17:59:09
avant
3848 Les gens l'ont consulté

Cet article vous apporte la capture de données en temps réel et le réglage des performances d'Oracle, et partage quelques détails clés pendant le processus d'essai. J'espère qu'il sera utile à tout le monde.

La maîtrise la plus systématique de l'extraction en temps réel des données Oracle de la série Flink CDC (pratiques de minage et de réglage)

Flink CDC a publié la dernière version 2.1 le 15 novembre 2021, qui ajoute la prise en charge d'Oracle en introduisant des composants Debezium intégrés. L'auteur a immédiatement téléchargé cette version pour une utilisation d'essai et a mis en œuvre avec succès la capture de données en temps réel et le réglage des performances d'Oracle. Je vais maintenant partager quelques détails clés pendant le processus d'essai.

Environnement d'essai :

Oracle : 11.2.0.4.0 (déploiement RAC)

Flink : 1.13.1

Hadoop : 3.2.1

Déployé et utilisé via Flink sur Yarn

1. Impossible de se connecter à la base de données

Selon la documentation officielle, entrez l'instruction suivante dans Flink SQL CLI :

create table TEST (A string)
WITH ('connector'='oracle-cdc',
    'hostname'='10.230.179.125',
    'port'='1521',
    'username'='myname',
    'password'='***',
    'database-name'='MY_SERVICE_NAME',
    'schema-name'='MY_SCHEMA',
    'table-name'='TEST' );
Copier après la connexion

Essayez ensuite d'observer via select * from TEST et constatez que vous ne pouvez pas vous connecter normalement à Oracle. L'erreur est la suivante :

[ERROR] Could not execute SQL statement. Reason:
oracle.net.ns.NetException: Listener refused the connection with the following error:
ORA-12505, TNS:listener does not currently know of SID given in connect descriptor
Copier après la connexion

À en juger par le message d'erreur, cela peut être dû au fait que Flink CDC a confondu par erreur le MY_SERVICE_NAME (le nom du service Oracle) fourni dans les informations de connexion avec le SID. J'ai donc essayé de lire le code source d'Oracle Connector lié à Flink CDC, et j'ai découvert que dans com.ververica.cdc.connectors.oracle.OracleValidator, le code de la connexion Oracle est le suivant :

public static Connection openConnection(Properties properties) throws SQLException {
    DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
    String hostname = properties.getProperty("database.hostname");
    String port = properties.getProperty("database.port");
    String dbname = properties.getProperty("database.dbname");
    String userName = properties.getProperty("database.user");
    String userpwd = properties.getProperty("database.password");
    return DriverManager.getConnection(
            "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname, userName, userpwd);
}
Copier après la connexion

Comme le montre le ci-dessus, dans la version actuelle de Flink CDC , il n'y a pas de distinction entre les méthodes de connexion du SID et du nom du service, mais la méthode de connexion du SID est écrite directement dans le code (c'est-à-dire que le port et le nom de base de données sont séparés par ":") .

À partir d'Oracle 8i, Oracle a introduit le concept de nom de service pour prendre en charge le déploiement de bases de données en cluster (RAC). Un nom de service peut être utilisé comme concept logique d'une base de données pour unifier les connexions aux différentes instances SID de la base de données. Sur cette base, vous pouvez envisager les deux méthodes suivantes :

Dans l'instruction create table de Flink CDC, remplacez le nom de la base de données par le nom du service par l'un des SID. Cette méthode peut résoudre le problème de connexion, mais elle ne peut pas s'adapter au scénario réel de déploiement de cluster Oracle grand public ;

Modifiez le code source ; Plus précisément, vous pouvez réécrire la méthode com.ververica.cdc.connectors.oracle.OracleValidator dans le nouveau projet et la remplacer par la méthode de connexion du nom du service (c'est-à-dire utiliser "/" pour séparer le port et le nom de base de données), c'est-à-dire :

"jdbc :oracle:thin:@" + nom d'hôte + ":" + port + "/" + dbname, userName, userpwd);

L'auteur utilise la deuxième méthode pour obtenir une connexion normale à la base de données tout en conservant l'accès à l'utilisation du service Oracle de l'attribut Nom.

2. La table Oracle est introuvable

Suivez les étapes ci-dessus et observez à nouveau en sélectionnant * dans TEST. Il s'avère que les données ne peuvent toujours pas être obtenues normalement. L'erreur est signalée comme suit :

[ERROR] Could not execute SQL statement. Reason:
io.debezium.DebeziumException: Supplemental logging not configured for table MY_SERVICE_NAME.MY_SCHEMA.test.  Use command: ALTER TABLE MY_SCHEMA.test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
Copier après la connexion

J'ai observé que la table mentionnée dans le journal des erreurs est MY_SERVICE_NAME .MY_SCHEMA.test, pourquoi le nom de la base de données et le nom du schéma sont-ils en majuscules, mais le nom de la table est en minuscules ?

Remarquez que cette erreur est signalée par le package io.debezium. En analysant le code source du package (on peut le voir dans le fichier pom.xml de Flink CDC, la version debezium 1.5.4 est actuellement utilisée). On peut voir que dans io.debezium.relational Il y a le code suivant dans .Tables :

private TableId toLowerCaseIfNeeded(TableId tableId) {
    return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}
Copier après la connexion

On peut voir que les développeurs de Debezium définissent uniformément « l'insensibilité à la casse » comme « le nom de la table doit être converti en minuscules ». Cela est vrai pour PostgreSQL, Mysql, etc. pris en charge par Debezium. Cependant, pour la base de données Oracle, « insensibilité à la casse » signifie que le nom de la table doit être converti en majuscules lors du stockage des métainformations internes.

Par conséquent, après que Debezium ait lu la configuration « insensibilité à la casse », il suit la logique de code ci-dessus et ne rapportera que une erreur lors de la tentative de lecture d'un nom de table en minuscules.

Étant donné que Debezium n'a résolu ce problème qu'avec la dernière version stable 1.7.1 et la dernière version de développement 1.8.0, nous pouvons contourner ce problème grâce aux deux méthodes suivantes :

Si vous devez utiliser Oracle "insensible à la casse Fonctionnalité " Sensible », vous pouvez modifier directement le code source et changer ce qui précède en minuscule en majuscule (c'est aussi la méthode choisie par l'auteur)

Si vous ne souhaitez pas modifier le code source et n'avez pas besoin d'utiliser " d'Oracle ); ", vous pouvez utiliser l'instruction create dans l'ajout 'debezium.database.tablename.case.insensitive'='false', comme dans l'exemple suivant :

create table TEST (A string)
WITH ('connector'='oracle-cdc',
    'hostname'='10.230.179.125',
    'port'='1521',
    'username'='myname',
    'password'='***',
    'database-name'='MY_SERVICE_NAME',
'schema-name'='MY_SCHEMA',
'table-name'='TEST',
'debezium.database.tablename.case.insensitive'='false' );
Copier après la connexion

L'inconvénient de cette méthode est qu'elle perd le " " d'Oracle fonctionnalité "insensible à la casse", qui doit être utilisée dans 'nom-table'. Spécifie explicitement les noms de table en majuscules.

Il convient de noter que pour le paramètre database.tablename.case.insensitive, Debezium ne le définit actuellement que sur true par défaut pour Oracle 11g, et il est défini sur false par défaut pour les autres versions d'Oracle. Par conséquent, si le lecteur n'utilise pas la version Oracle 11g, il n'est pas nécessaire de modifier ce paramètre, mais le nom de la table en majuscule doit quand même être explicitement spécifié.

3. Le retard des données est important

Le retard des données est important, il faut parfois 3 à 5 minutes pour capturer les modifications des données. Pour ce problème, une solution claire a été donnée dans la FAQ Flink CDC : ajoutez les deux éléments de configuration suivants à l'instruction create :

'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'
Copier après la connexion

那么为什么要这样做呢?我们依然可以通过分析源码和日志,结合 Oracle Logminer 的工作原理来加深对工具的理解。

对 Logminer 的抽取工作,主要在 Debezium 的 io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource 中 execute 方法进行。为节约篇幅,本文不列出实际的源码,仅提炼出关键过程绘于下面的流程图,有兴趣的读者可以对照该流程图,结合实际源码进行分析:

La maîtrise la plus systématique de lextraction en temps réel des données Oracle de la série Flink CDC (pratiques de minage et de réglage)

采用 redo_log_catalog 的方式,可以监控数据表的 DDL 信息,且由于 archive logs 被永久保存到磁盘上,可以在数据库宕机后依然正常获取到宕机前的所有 DDL 和 DML 操作。但由于涉及到比 online catalog 更多的信息监控,以及由此带来的频繁的日志切换和日志转储操作,其代价也是惊人的。

根据笔者实际测试情况,如果 debezium.log.mining.strategy 为默认配置 redo_log_catalog,则不仅需要多执行第 ① 步操作 (该操作耗时约为半分钟到 1 分钟之间),在第 ④ 步,根据 archived logs 的数据量,耗时也会在 1 分钟到 4 分钟之间浮动;在第 ⑤ 步,实际查询 V$LOGMNR_CONTENTS 视图也常常需要十几秒才能完成。

此外,由于 archive logs 在实际系统中增长速度较快,因此在实际使用中,常会配合进行定期删除或转储过期日志的操作。由于上述第 ④ 步的耗时较长,笔者观察到在第 ④ 步执行的过程中,在一定概率下会发生第 ② 步加入的a rchive logs 已过期而被删除转储的情况,于是在第 ⑤ 步查询的时候,会由于找不到第 ② 步加入的日志,而报下面的错误:

ORA-00308: cannot open archive log '/path/to/archive/log/...'
ORA-27037: unable to obtain file status
Copier après la connexion

一般来说,Flink CDC 所需要监控的表,特别是对于业务系统有重大意义的表,一般不会进行 DDL 操作,仅需要捕捉 DML 操作即可,且对于数据库宕机等极特殊情况,也可使用在数据库恢复后进行全量数据更新的方式保障数据的一致性。因而,online_catalog 的方式足以满足我们的需要。

另外,无论使用 online_catalog,还是默认的 redo_log_catalog,都会存在第 ② 步找到的日志和第 ⑤ 步实际需要的日志不同步的问题,因此,加入 'debezium.log.mining.continuous.mine'='true' 参数,将实时搜集日志的工作交给 Oracle 自动完成,即可规避这一问题。

笔者按照这两个参数配置后,数据延迟一般可以从数分钟降至 5 秒钟左右。

四、调节参数继续降低数据延迟

上述流程图的第 ③ 步和第 ⑦ 步,提到了根据配置项来确定 LogMiner 监控时序范围,以及确定休眠时间。下面对该过程进行进一步分析,并对单个表的进一步调优给出一般性的方法论。

通过观察 io.debezium.connector.oracle.logminer.LogMinerHelper 类中的 getEndScn 方法,可了解到 debezium 对监控时序范围和休眠时间的调节原理。为便于读者理解,将该方法用流程图说明如下:

La maîtrise la plus systématique de lextraction en temps réel des données Oracle de la série Flink CDC (pratiques de minage et de réglage)

从上述的流程图中可以看出,debezium 给出 log.mining.batch.size.* 和 log.mining.sleep.time.* 两组参数,就是为了让每一次 logMiner 运行的步长能够尽可能和数据库自身 SCN 增加的步长一致。由此可见:

log.mining.batch.size.* 和 log.mining.sleep.time.* 参数的设定,和数据库整体的表现有关,和单个表的数据变化情况无关;

log.mining.batch.size.default 不仅仅是监控时序范围的起始值,还是监控时序范围变化的阈值。所以如果要实现更灵活的监控时序范围调整,可考虑适当减小该参数;

由于每一次确定监控时序范围时,都会根据 topScn 和 currentScn 的大小来调整 sleepTime,所以为了实现休眠时间更灵活的调整,可考虑适当增大 log.mining.sleep.time.increment.ms;

log.mining.batch.size.max 不能过小,否则会有监控时序范围永远无法追上数据库当前 SCN 的风险。为此,debezium 在 io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics 中存在以下逻辑:

if (currentBatchSize == batchSizeMax) {
    LOGGER.info("LogMiner is now using the maximum batch size {}. This could be indicative of large SCN gaps", currentBatchSize);
}
Copier après la connexion

如果当前的监控时序范围达到了 log.mining.batch.size.max,那么 debezium 会在日志中给出如上提示。在实际应用中,观察 Flink CDC 产生的 log 是否包含该提示,便可得知 log.mining.batch.size.max 的值是否合理。

五、Debezium Oracle Connector 的隐藏参数

En fait, de ce qui précède, nous avons découvert deux paramètres cachés : debezium.database.tablename.case.insensitive (voir la deuxième section) et debezium.log.mining.continuous.mine (voir la troisième section). ne sont pas réellement décrits dans la documentation officielle de Debezium, mais ils peuvent réellement être utilisés. En analysant le code source, tous les paramètres cachés du Connecteur Debezium Oracle sont désormais donnés, et leurs descriptions sont les suivantes :

La maîtrise la plus systématique de lextraction en temps réel des données Oracle de la série Flink CDC (pratiques de minage et de réglage)

L'auteur estime qu'en plus des deux paramètres que nous avons utilisés ci-dessus, log.mining.history vaut également la peine de se concentrer sur le paramètre .recorder.class. Étant donné que ce paramètre est actuellement par défaut io.debezium.connector.oracle.logminer.NeverHistoryRecorder, qui est une classe vide, lors de l'analyse du comportement de Flink CDC, nous personnalisons une classe qui implémente l'interface io.debezium.connector.oracle.logminer.HistoryRecorder. , qui peut réaliser une surveillance personnalisée du comportement de Flink CDC sans modifier le code source.

Tutoriel recommandé : "Tutoriel Oracle"

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:juejin.im
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal