博主信息
小爱豆
博文
258
粉丝
0
评论
0
访问量
4929
积分:0
P豆:630

Flink 数据持久化踩过的坑

2021年10月24日 20:34:38阅读数:11博客 / 小爱豆

前言

Flink用来消费消息队列中的数据,在消费之后一定会需要用某种方式存起来,这里我简述一下在数据持久化中可能会遇到的坑和解决方案。 这篇文章中的代码,都经过本公司业务系统上调试过,是我们在使用Flink开发入库服务的时候踩过的一个个小坑,将它们总结起来,希望减少各位踩坑的数量。

本文会以消费Kafka为例,展示持久化到MySQL,MongoDB和HBase等数据库的思路。语言如果没有特殊的标记,一般都是Scala。 本文假定你已经搞定了Flink集群的搭建、程序的提交,同时你的数据库应该也能够支持这样规模的数据写入。

由于公司代码涉及业务,暂时不能开源,但是文中的源代码已经足够使用了。 为了避免文章过于臃肿,代码放在了

Flink持久化踩坑笔记代码gist.github.com

Github Gist 需要翻墙,而知乎的文章长度限制也不允许我放入太多代码,请借个梯子看吧。

读的时候可能会有点儿繁琐,不过请耐心读完,一定会有收获。(大牛就绕道吧)

业务定义

假设我们现在有一个DataStream[T],这个Stream以一个稳定的速度吐着消息,这个时候你需要做的很简单,就是把数据:

如果持久化到HBase:转换为Put,然后创建连接、写入如果持久化到SQL:创建SQL语句,创建连接、写入持久化到MongoDB:创建BSON对象,创建连接、写入

坑1:过于频繁的连接创建

任你是神仙一般的系统,也受不了频繁的打开和关闭连接这种操蛋的操作,可偏偏有人这么写:每次收到数据的时候打开连接,写完关闭,每当看到这种博客我都无言以对。 正常情况下,应该是创建一个单例,如果写入要并发的话应该创建一个连接池。

下面以写入HBase为例,展示一下大致的思路是怎么样的。以下是需要注意的几点:

这里继承了RichOutputFormat<T>,但其实使用RichSinkFunction<T>也是一样的,但是记得覆盖其Open方法来初始化连接。Flink官方的测试代码用的也是RichOutputFormat<T>来做。PutCollection是我公司根据业务封装的Put类,方便聚合、序列化和调试,你可以理解为Map<String,List<Put>>,其中String是TableName。之所以使用的都是Rich,因为我们把配置放在了Consul上,而Consul的地址在启动的时候指定,并且放在GlobalJobParameters里面,Rich中可以通过getRuntimeContext().getExecutionConfig().getGlobalJobParameters()获取。

代码参见:

HBaseRichOutputFormat.javagist.github.com

坑1.1:数据库配置何去何从

正常来说,我们都会把配置放在文件里面,但是这样的话修改配置就要重新打包,麻烦。 我们使用了Consul(其实用ZK、Redis之类的都是一样的)作为配置中心,主要是因为Consul有UI,手动操作比较方便。

/**  * 查询Consul中的配置信息,并且按照properties的文件格式解码  *  * @param key  * @return  */private def queryConsulProperties(key: String): Properties = {
   val prop = new Properties()
   prop.load(new StringReader(queryConsulString(key)))
   prop}

坑2:尽可能使用ProcessingTime减少资源消耗

诚然,Flink相当牛逼的一点就是可以处理乱序数据,有EventTime这个大杀器,但是我建议能不用就不用,使用ProcessingTime将更加的高效。

你可以在处理流数据的时候使用EventTime,但是在最后的写入阶段,建议使用ProcessingTime,这样写入也会更加及时。

坑3:使用TimeWindow+CountTrigger避免峰值出现异常

写入的时候往往是批量写入,而不是单条写入,这个时候如果使用CountWindow不能保证时效性,使用TimeWindow容易造成瞬时峰值数据量过大搞崩某些程序,我们想要一个TimeCountWindow,当超时或者队列过长任意条件满足的时候触发写入。然而我没在Flink里面找到这个东西,QQ号码出售平台可以使用TimeWindow+Trigger完成这个功能。

首先,对于某个流:

someStream.timeWindow(
   Time.milliseconds(1000)//这里设置你要的时间).trigger(
   new TimeCountTrigger(
       1000,//这里设置你要的数量        timeCharacteristic = TimeCharacteristic.ProcessingTime //用什么时间触发    ))

其中,TimeCountTrigger类的源码如下所示:

TimeCountTrigger.scalagist.github.com

这里有个小坑需要注意,Trigger的clear方法有的时候不会被自动调用,需要手动调用。

坑4:动态插入的生成和写入

你,怎么做插入? 是手撸Insert语句?是强行Parse出一个Put,还是继续用原生API的InsertMany? 如果能用一个Sink解决写入问题,你能在写入之前对语句做一些合并,这个是坠吼的。

MongoDB插入的合并

最简单了,只需要用Map<String,List<Document>>去描述插入的数据集并且做合并即可。

SQL的合并

SQL的合并比较简单,只需将同一张表的插入聚合到一起就可以了,但是如果要将不同的表的插入合并到一起,那就需要一个通用的数据格式。 这个数据结构可以被描述为: Map<String,List<Map<String,Object>>> 第一个String代表表名,第二个代表字段名,第三个代表数值。

如果你的数据插入是有主键的,那么List<Map<String,Object>>可以被: Map<KVPair,List<Map<String,Object>>>来描述。KVPair自己定义。 这样的话可以合并主键相同的数据。

需要注意的是,由于每一条插入记录里面,使用Map描述以后,有些字段为空,取不到的时候需要处理返回NULL。

Object 的处理方式如下所示:

val valueToString : Any => String = {
  case numberValue: Number => numberValue.toString
  case intValue: Int => intValue.toString
  case doubleValue: Double => "%f".format(doubleValue)
  case longValue: Long => longValue.toString
  case dateValue: Date => "'%s'".format(DATE_FORMAT.format(dateValue))
  case strValue: String => "'%s'".format(escapeSql(strValue))
  case anyType: Any => "'%s'".format(escapeSql(GSON.toJson(anyType)))}// DATE_FORMAT is SimpleDateFormat

HBase Put的合并

对于HBase来说,应该对两类东西做合并,第一个是针对同一个Table的Put操作可以合并,第二个是针对同一个Row的操作可以合并。 并且,对于同一行来说,同一个列族下,相同的Qualifier的数据应该做覆盖,在流计算期间就可以减少写入的数据量,可以带来更加高效的写入体验。

为此,我们首先写一个SealedPut类,用来替代原生的Put类,这个类在必要的时候可以直接生成Put。

SealedPut.javagist.github.com


随后我们再编写一个PutCollection,用于管理不同表的Put。

PutCollection.javagist.github.com

在加窗以后,我们可以在Process里面获取一大堆PutCollection,而我们需要做的,就是将它合成一个:

someIntKeyedWindowStreamOfPutCollection.process[PutCollection](
   new ProcessWindowFunction[(PutCollection, Int), PutCollection, Int, TimeWindow] {
       override def process(key: Int, context: Context, elements: Iterable[(PutCollection, Int)], out: Collector[PutCollection]): Unit = {
           if (elements.size > 1) {
               out.collect(
                   PutCollection.merge(
                       elements.map(_._1).toSeq: _*
                   )
               )
           } else {
               elements.map(_._1).foreach(out.collect)
           }
           LOGGER.debug("Merging {} elements (k={}).", elements.size, key)
       }
   })


版权申明:本博文版权归博主所有,转载请注明地址!如有侵权、违法,请联系admin@php.cn举报处理!

全部评论

文明上网理性发言,请遵守新闻评论服务协议

条评论
  • Vue.js是一个轻巧、高性能、可组件MVVM库,VUE适用于具有复杂交互逻辑前端应用,也适用于基础抽象架构,还可以通AJAX进行,保证前端用户体验。
    对象关系映射是通使用描述对象和库之间映射,将面向对象语言程序中对象自动到关系库中。本质上就是将从一种形式转换到另外一种形式。
    vue.js主要应用方面有:1、针对具有复杂交互逻辑前端应用;2、它可以提供基础架构抽象;3、可以通AJAX,保证前端用户体验。
    Redis是完全开源免费,用C语言编写,遵守BSD协议一个高性能(key/value)分布式内存库,也是基于内存运行并支NoSQL库,是当前最热门NoSql库之一,也被人们称为结构服务器
    vue.js一般用地方是:1、针对于移动端,首选vue入门成本低,快速上手;2、针对于维护较少,组件复用要求不高项目;3、针对具有复杂交互逻辑前端应用;4、可以提供基础架构抽象;5、可以通AJAX