How to parse Apache Avro data? This article will introduce you to the methods of serializing to generate Avro data, deserializing to parse Avro data, and using FlinkSQL to parse Avro data. I hope it will be helpful to you!
With the rapid development of the Internet, cutting-edge technologies such as cloud computing, big data, artificial intelligence AI, and the Internet of Things have become mainstream high-tech technologies in today's era, such as e-commerce websites , face recognition, driverless driving, smart homes, smart cities, etc., not only facilitate people's daily necessities, food, housing and transportation, but behind the scenes, there is always a large amount of data being collected, cleared and analyzed by various system platforms. , and it is particularly important to ensure low latency, high throughput, and security of data. Apache Avro itself is serialized through Schema for binary transmission. On the one hand, it ensures high-speed transmission of data, and on the other hand, it ensures data security. , avro is currently used more and more widely in various industries. How to process and parse avro data is particularly important. This article will demonstrate how to generate avro data through serialization and use FlinkSQL for analysis.
This article is a demo of avro parsing. Currently, FlinkSQL is only suitable for simple avro data parsing. Complex nested avro data is not supported for the time being.
This article mainly introduces the following three key contents:
How to serialize and generate Avro data
How to deserialize and parse Avro data
How to use FlinkSQL to parse Avro data
To understand what avro is, please refer to the apache avro official website quick start guide
Understand avro application scenarios
1. Create a new avro maven project and configure the pom dependency
The content of the pom file is as follows:
4.0.0 com.huawei.bigdata avrodemo 1.0-SNAPSHOT org.apache.avro avro 1.8.1 junit junit 4.12 org.apache.avro avro-maven-plugin 1.8.1 generate-sources schema ${project.basedir}/src/main/avro/ ${project.basedir}/src/main/java/ org.apache.maven.plugins maven-compiler-plugin 1.6
Note: The above pom file is configured to be automatically generated The path to the class, i.e. {project.basedir}/src/main/java/, after this configuration, when executing the mvn command, this plug-in will automatically generate a class file from the avsc schema in this directory and put it in the latter Under contents. If the avro directory is not generated, just create it manually.2. Define schema
Use JSON to define schema for Avro. The schema consists of basic types (null, boolean, int, long, float, double, bytes, and string) and complex types (record, enum, array, map, union, and fixed). For example, the following defines a user's schema, creates an avro directory in the main directory, and then creates a new file user.avsc in the avro directory:
{"namespace": "lancoo.ecbdc.pre", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
3. Compile schema
点击maven projects项目的compile进行编译,会自动在创建namespace路径和User类代码
4、序列化
创建TestUser类,用于序列化生成数据
User user1 = new User(); user1.setName("Alyssa"); user1.setFavoriteNumber(256); // Leave favorite col or null // Alternate constructor User user2 = new User("Ben", 7, "red"); // Construct via builder User user3 = User.newBuilder() .setName("Charlie") .setFavoriteColor("blue") .setFavoriteNumber(null) .build(); // Serialize user1, user2 and user3 to disk DatumWriteruserDatumWriter = new SpecificDatumWriter (User.class); DataFileWriter dataFileWriter = new DataFileWriter (userDatumWriter); dataFileWriter.create(user1.getSchema(), new File("user_generic.avro")); dataFileWriter.append(user1); dataFileWriter.append(user2); dataFileWriter.append(user3); dataFileWriter.close();
执行序列化程序后,会在项目的同级目录下生成avro数据
user_generic.avro内容如下:
Objavro.schema�{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}
至此avro数据已经生成。
5、反序列化
通过反序列化代码解析avro数据
// Deserialize Users from disk DatumReaderuserDatumReader = new SpecificDatumReader (User.class); DataFileReader dataFileReader = new DataFileReader (new File("user_generic.avro"), userDatumReader); User user = null; while (dataFileReader.hasNext()) { // Reuse user object by passing it to next(). This saves us from // allocating and garbage collecting many objects for files with // many items. user = dataFileReader.next(user); System.out.println(user); }
执行反序列化代码解析user_generic.avro
avro数据解析成功。
6、将user_generic.avro上传至hdfs路径
hdfs dfs -mkdir -p /tmp/lztest/ hdfs dfs -put user_generic.avro /tmp/lztest/
7、配置flinkserver
将flink-sql-avro-*.jar、flink-sql-avro-confluent-registry-*.jar放入flinkserver lib,将下面的命令在所有flinkserver节点执行
cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib chmod 500 flink-sql-avro*.jar chown omm:wheel flink-sql-avro*.jar
同时重启FlinkServer实例,重启完成后查看avro包是否被上传
hdfs dfs -ls /FusionInsight_FlinkServer/8.1.2-312005/lib
8、编写FlinkSQL
CREATE TABLE testHdfs( name String, favorite_number int, favorite_color String ) WITH( 'connector' = 'filesystem', 'path' = 'hdfs:///tmp/lztest/user_generic.avro', 'format' = 'avro' );CREATE TABLE KafkaTable ( name String, favorite_number int, favorite_color String ) WITH ( 'connector' = 'kafka', 'topic' = 'testavro', 'properties.bootstrap.servers' = '96.10.2.1:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'avro' ); insert into KafkaTable select * from testHdfs;
保存提交任务
9、查看对应topic中是否有数据
FlinkSQL解析avro数据成功。
【推荐:Apache使用教程】
The above is the detailed content of Let's talk about how to parse Apache Avro data (explanation with examples). For more information, please follow other related articles on the PHP Chinese website!