Comment analyser les données Apache Avro ? Cet article vous présentera les méthodes de sérialisation pour générer des données Avro, de désérialisation pour analyser les données Avro et d'utilisation de FlinkSQL pour analyser les données Avro. J'espère que cela vous sera utile !
Avec le développement rapide d'Internet, les technologies de pointe telles que le cloud computing, le big data, l'intelligence artificielle et l'Internet des objets sont devenues des technologies de haute technologie courantes à l'ère d'aujourd'hui, comme les sites Web de commerce électronique. , la reconnaissance faciale, les voitures sans conducteur et les téléphones intelligents. Les maisons, les villes intelligentes, etc. facilitent non seulement l'alimentation, l'habillement, le logement et le transport des gens, mais derrière eux se trouve une grande quantité de données qui sont collectées, effacées et analysées par diverses plates-formes système à tout moment, garantissant une faible latence et une qualité de données élevée. Le débit et la sécurité sont particulièrement importants. Apache Avro lui-même est sérialisé via Schema pour la transmission binaire, d'une part, et d'autre part. d'autre part, il garantit la sécurité des données. Avro est actuellement de plus en plus utilisé dans diverses industries, plus il est étendu, plus il est important de traiter et d'analyser les données avro. Cet article montrera comment générer des données avro via la sérialisation et. utilisez FlinkSQL pour l'analyse.
Cet article est une démonstration de l'analyse avro. Actuellement, FlinkSQL ne convient qu'à l'analyse de données avro simples. Les données avro imbriquées complexes ne sont pas prises en charge pour le moment.
Introduction à la scène
Cet article présente principalement les trois contenus clés suivants :
Comment sérialiser et générer des données Avro
Comment désérialiser et analyser les données Avro
Comment utiliser FlinkSQL pour analyser les données Avro
Prérequis
Pour comprendre ce qu'est avro, vous pouvez vous référer au guide de démarrage rapide du site officiel d'Apache avro
Comprendre les scénarios d'application avro
Étapes de fonctionnement
1. Créer un nouveau maven avro. projetez et configurez les dépendances pom
Le contenu du fichier pom est le suivant :
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.huawei.bigdata</groupId>
<artifactId>avrodemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Copier après la connexion
Remarque : Le fichier pom ci-dessus est configuré avec le chemin d'accès à la classe générée automatiquement, c'est-à-dire basedi r/s et {project.basedir }/src/main/ java/. Après cette configuration, lors de l'exécution de la commande mvn, ce plug-in générera automatiquement un fichier de classe à partir du schéma avsc dans ce répertoire et le placera dans ce dernier répertoire. Si le répertoire avro n'est pas généré, créez-le simplement manuellement. 2. Définir le schémaUtilisez JSON pour définir le schéma pour Avro. Le schéma se compose de types de base (null, boolean, int, long, float, double, bytes et string) et de types complexes (record, enum, array, map, union et fixed). Par exemple, ce qui suit définit le schéma d'un utilisateur, crée un répertoire avro dans le répertoire principal, puis crée un nouveau fichier user.avsc dans le répertoire avro : {"namespace": "lancoo.ecbdc.pre",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Copier après la connexion
3. Compilez le schéma.点击maven projects项目的compile进行编译,会自动在创建namespace路径和User类代码
4、序列化
创建TestUser类,用于序列化生成数据
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
// Leave favorite col or null
// Alternate constructor
User user2 = new User("Ben", 7, "red");
// Construct via builder
User user3 = User.newBuilder()
.setName("Charlie")
.setFavoriteColor("blue")
.setFavoriteNumber(null)
.build();
// Serialize user1, user2 and user3 to disk
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("user_generic.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
Copier après la connexion
执行序列化程序后,会在项目的同级目录下生成avro数据
user_generic.avro内容如下:
Objavro.schema�{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}
Copier après la connexion
至此avro数据已经生成。
5、反序列化
通过反序列化代码解析avro数据
// Deserialize Users from disk
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(new File("user_generic.avro"), userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
// Reuse user object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
user = dataFileReader.next(user);
System.out.println(user);
}
Copier après la connexion
执行反序列化代码解析user_generic.avro
avro数据解析成功。
6、将user_generic.avro上传至hdfs路径
hdfs dfs -mkdir -p /tmp/lztest/
hdfs dfs -put user_generic.avro /tmp/lztest/
Copier après la connexion
7、配置flinkserver
将flink-sql-avro-*.jar、flink-sql-avro-confluent-registry-*.jar放入flinkserver lib,将下面的命令在所有flinkserver节点执行
cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib
chmod 500 flink-sql-avro*.jar
chown omm:wheel flink-sql-avro*.jar
Copier après la connexion
8、编写FlinkSQL
CREATE TABLE testHdfs(
name String,
favorite_number int,
favorite_color String
) WITH(
'connector' = 'filesystem',
'path' = 'hdfs:///tmp/lztest/user_generic.avro',
'format' = 'avro'
);CREATE TABLE KafkaTable (
name String,
favorite_number int,
favorite_color String
) WITH (
'connector' = 'kafka',
'topic' = 'testavro',
'properties.bootstrap.servers' = '96.10.2.1:21005',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'avro'
);
insert into
KafkaTable
select
*
from
testHdfs;
Copier après la connexion
保存提交任务
9、查看对应topic中是否有数据
FlinkSQL解析avro数据成功。
【推荐:Apache使用教程】
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!