流式数据处理概念
为什么选Flink?
如果你对大数据流式处理有以下需求,
- 乱序数据流有序化处理
- 处理过程有状态
- 处理过程容错,需无缝从失败中恢复
- 超大数据规模(需分布式支持)
- 期望用一套框架支持批式处理和流式处理 那么可供你选择的成熟框架并不多。
Google Dataflow
批流一体的理论基石。
基本概念
本地状态,存储中间信息、缓存信息。
窗口操作
事件驱动
管道 允许输入流数据,输出流数据,交给下一个任务
DAG
数据传递方式 hash / rebalance / forward / shuffle / rescale
广播变量 getRuntimeContext().getBroadcastVariable / withBroadcastSet
Exactly-Once Exactly-Once是流处理系统核心特性之一,它保证每一条消息只被流处理系统处理一次,通过借鉴Chandy和Lamport在1985年发表的一篇关于分布式快照的论文,Flink实现了Exactly-Once特性。
JobGraph JobGraph是通过 Flink 各类API构建起来的一张任务执行图。 当 JobGraph 提交给 Flink 集群后,能够以 Local、Standalone、Yarn 和 Kubernetes 四种模式运行。
JobManager
JobManager 相当于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理。
TaskManager
TaskManager 相当于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。一个TM对应一个JVM。
任务槽是Flink计算资源的基本单位. Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。 每个任务槽可以在同一时间执行一个Task,而TaskManager可以拥有一个或者多个任务槽。 任务槽可以实现TaskManager中不同Task的资源隔离,不过是逻辑隔离,并且只隔离内存,亦即在调度层面认为每个任务槽“应该”得到taskmanager.heap.size的N分之一大小的内存。CPU资源不算在内。
Checkpoint 和 Savepoint
Flink中的每个方法或算子都可以是有状态的,我们称之为state。Checkpoint是把State数据定时持久化存储
这两者都是用于恢复作用。尤其是checkpoint用于恢复异常失败的作业。
- Checkpoint 是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;Checkpoint 是作业 failover 的时候自动使用,不需要用户指定。
-
Savepoint 是全量做的,每次的时间较长,数据量较大,需要用户主动去触发(用户手动执行,是指向Checkpoint的指针,不会过期)。Savepoint 一般用于程序的版本更新,Bug 修复,A/B Test 等场景,需要用户指定。
- 内部化的 Checkpoint – 一种由 Flink 自动执行的快照,其目的是能够从故障中恢复。Checkpoints 可以是增量的,并为快速恢复进行了优化。
- 外部化的 Checkpoint – 通常 checkpoints 不会被用户操纵。Flink 只保留作业运行时的最近的 n 个 checkpoints(n 可配置),并在作业取消时删除它们。但你可以将它们配置为保留,在这种情况下,你可以手动从中恢复。 习惯上,我们常把最后一个自动cp保留作为手动启动的起点。
-
Savepoint – 用户出于某种操作目的(例如有状态的重新部署/升级/缩放操作)手动(或 API 调用)触发的快照。Savepoints 始终是完整的,并且已针对操作灵活性进行了优化。
- 概念比较 https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink
- CP原理介绍 https://www.infoq.cn/article/pb8pxvssiw2evebhpz7e , https://juejin.cn/post/6844904147494371342
-
CP使用介绍: https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/datastream/fault-tolerance/checkpointing/ , https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/state/checkpoints/
- 问题:Failover启动之后的再一次Checkpoint时间是怎么确定?
- 问题:Checkpoint的成本是什么?Checkpoint在生成和恢复的时候都会消耗资源。
- 问题:Checkpoint 里面存的是什么内容? _metadata
- 问题:Checkpoint 太大了会怎么样?
异步快照
在Flink做分布式快照过程中核心是Barriers的使用。这些Barriers是在数据接入到Flink之初就注入到数据流中,并随着数据流向每个算子。
Barrierr 会周期性地注入数据流中,作为数据流的一部分,从上游到下游被算子处理。Barrier 会严格保证顺序,不会超过其前边的数据。Barrier 将记录分割成记录集,两个 Barrier 之间的数据流中的数据隶属于同一个检查点。每一个 Barrier 都携带一个其所属快照的 ID 编号。
Barrier 随着数据向下流动,不会打断数据流,因此非常轻量。
作业 Failover、 容错、 灾备
Checkpoint 在作业failover的时候自动使用。 Flink 的容错机制主要分为从 checkpoint 恢复状态和重流数据两步,这也是为什么 Flink 通常要求数据源的数据是可以重复读取的。对于重启后的新 Task 来说,它可以通过读取 checkpoint 很容易地恢复状态信息,但是却不能独立地重流数据,因为 checkpoint 是不包含数据的。
通过 Flink 配置文件 flink-conf.yaml 中的 jobmanager.execution.failover-strategy 配置项进行配置Failover策略:
- 全图重启 full
- 基于Region的局部重启 region
如何从checkpoint恢复启动作业? 常用的重启策略 (1)固定间隔策略 (Fixed delay) (2)失败率策略 (Failure rate) (3)无重启 (No restart)
- 问题:自动failover和手动从cp启动,其起始点的差异是什么?
What happens if a task manager is lost? https://ververica.zendesk.com/hc/en-us/articles/360002262919-What-happens-if-a-task-manager-is-lost-
Task-Local Recovery
Flink 单点恢复 https://segmentfault.com/a/1190000025168779
Flink 与 HDFS
Flink和HDFS打交道主要有两类情况(当然这不是唯一方式),一类是Checkpoint,一类是Hdfs-Sink
flink的cp会备份到hdfs去,当作业并发量大(TM多)时,HDFS的压力会大: 1)大量的 RPC 请求会影响 RPC 的响应时间; 2)大量文件对 NameNode 内存造成很大压力; 3) 大量产出小文件,其他任务读取小文件数据的成本也增加;
减小Checkpointing对HDFS的压力
参考:https://www.infoq.cn/article/OLlJNzQpTOHfyrgOG8xq
HDFS-Sink避免小文件过多
- 减少并行度:回顾一下文件生成格式:part + subtaskIndex + connter,其中subtaskIndex代表着任务并行度的序号,也就是代表着当前的一个写task,越大的并行度代表着越多的subtaskIndex,数据就越分散,如果我们减小并行度,数据写入由更少的task来执行,写入就相对集中,这个在一定程度上减少的文件的个数,但是在减少并行的同时意味着任务的并发能力下降;
- 增大checkpoint周期或者文件滚动周期:以parquet写分析为例,parquet写文件由processing状态变为pending状态发生在checkpoint的snapshotState阶段中,如果checkpoint周期时间较短,就会更快发生文件滚动,增大checkpoint周期,那么文件就能积累更多数据之后发生滚动,但是这种增加时间的方式带来的是数据的一定延时;
- 下游任务合并处理:待Flink将数据写入hdfs后,下游开启一个hive或者spark定时任务,通过改变分区的方式,将文件写入新的目录中,后续任务处理读取这个新的目录数据即可,同时还需要定时清理产生的小文件,这种方式虽然增加了后续的任务处理成本,但是其即合并了小文件提升了后续任务分析速度,也将小文件清理了减小了对NameNode的压力,相对于上面两种方式更加稳定,因此也比较推荐这种方式。
批处理和流处理都支持,批流一体
batch application stream application
Flink如何以Table的概念支持批流一体
Flink 数据处理流水线开始于 source 表。source 表产生在查询执行期间可以被操作的行;它们是查询时 FROM 子句中引用的表。这些表可能是 Kafka 的 topics,数据库,文件系统,或者任何其它 Flink 知道如何消费的系统。
流到动态表的转换
动态表 是 Flink 的支持流数据的Table的核心概念。动态表的查询是一种永不停止的查询,动态表的查询结果也是一种永在变更的结果。
查询动态表将生成一个 连续查询 。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。
动态表 (Dynamic Table): https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/streaming/dynamic_tables.html
在创建Table和从Datatream转换为Table时,可以有一个处理时间字段(以 .proctime 为后缀),可以有一个事件时间字段(以 .rowtime 为后缀)。
动态表到流的转换
- Append-only 流
- Retract 流
- Upsert 流: 与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。
Retraction 机制
又名 Changelog 机制。Retraction 是流式数据处理中撤回过早下发(Early Firing)数据的一种机制,类似于传统数据库的Update 操作。 retract是流式计算场景下对数据更新的处理方式。
keygroup
Flink的状态分为两类:Keyed State和Operator State。前者与每个键相关联,后者与每个算子的并行实例(即Sub-Task)相关联。
Key Group是Flink状态机制中的一个重要设计. Key Group是Keyed State分配的原子单位,且Flink作业内Key Group的数量与最大并行度相同,也就是说Key Group的索引位于[0, maxParallelism - 1]的区间内。 从这里可以看到key-group与最大并发数有关系,如果key-group分配不均匀的话,状态分配也会不均匀。
Catalog
Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
Metastore 即元数据服务,是Hive用来管理库表元数据的一个服务
HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hadoopConfDir, String hiveVersion)
Catalog 函数
Flink功能
官网是最好的,你会发现百分之99的网页内容其实都出自于官网:https://nightlies.apache.org/flink/flink-docs-master/zh/ 不过请注意,不同大版本之间(比如1.12/1.13/1.14/1.15)之间的API可能有明显差异,版本对应的官方文档内容也有差异。
Environment
不管是本地执行还是集群执行,不管是流式模式还是批式模式,都需要特定的environment
LocalEnvironment vs RemoteEnvironment
StreamExecutionEnvironment
每一个flink应用都需要一个执行环境, 对于流处理程序,使用的执行环境类是 StreamExecutionEnvironment. 这是Flink单纯做流式处理所用的执行环境。 当 env.execute() 方法执行之后,代码所设计的一张执行图就会被打包发送到Flink Master,进行任务拆解和并行化,分配到TaskManager执行。
env.setStateBackend 设置状态后端的存储机制
TableEnvironment
TableEnvironment是 Table/SQL API的运行环境。
TableConfig 对象用于设置Table/SQL API程序的配置。
StatementSet 对象,把几个sql 放到一个 statement 里面,以一个作业的方式去执行,能够实现节点的复用。 Table (通过 StatementSet.addInsert() 输出给某个 Sink)和 INSERT 语句 (通过调用 StatementSet.addInsertSql())会先被缓存到 StatementSet 中。
真正的执行发生在sink时, 当 ?? 执行之后, 执行图会发送到master端。 StatementSet.execute() 方法被调用时,所有的 sink 会被优化成一张有向无环图。
StreamTableEnvironment
DataStream 和 Table 之间的转换(面向 StreamTableEnvironment )
- toRetractStream : 任何情形都可以使用此模式。它使用 boolean 值对 INSERT 和 DELETE 操作的数据进行标记。
- toAppendStream : 仅当动态 Table 仅通过INSERT更改进行修改时,才可以使用此模式,即,它仅是追加操作,并且之前输出的结果永远不会更新。
- toDataStream
- fromValues
- fromDataStream
一旦 Table 被转化为 DataStream,必须使用 StreamExecutionEnvironment 的 execute 方法执行该 DataStream 作业。
几种stream
DataStream
SingleOutputStreamOperator
Flink特有的数据结构
- org.apache.flink.table.data.RowData
- org.apache.flink.types.Row
数据重分布
keyBy、broadcast、rebalance、rescale、shuffle、global
partitionCustom
StatusBackend
Flink 提供了内存、文件系统、RocksDB 三种 StateBackends.
- MemoryStateBackend: 状态信息是存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到 JobManager 的堆内存中。
- FsStateBackend: TaskManager会定期地把state存到HDFS上。也就是checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)。
- RocksDBStateBackend:状态信息存储在 RocksDB 数据库 (key-value 的数据存储服务), 最终保存在本地文件中。checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)。
关于超大状态存储策略选择,生产环境状态存储 Backend 有两种方式:
- FsStateBackend: State 存储在内存, Checkpoint 时持久化到 HDFS;
- RocksDBStateBackend: State 存储在 RocksDB 实例,可增量 Checkpoint ,适合超大 State。在推荐广告搜索等场景下展现流 20 分钟数据有 1 TB 以上。
参考: https://cloud.tencent.com/developer/article/1592441
参考: https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/datastream/fault-tolerance/checkpointing/
Checkpointed Function
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor;
Checkpoint
- Aligned Checkpoint: 一旦Operator从输入流接收到CheckPoint barrier n,它就不能处理来自该流的任何数据记录,直到它从其他所有输入接收到barrier n为止。否则,它会混合属于快照n的记录和属于快照n + 1的记录.
- Unaligned Checkpoint: 如果您的 Checkpointing 由于背压导致周期非常的长,您应该使用非对齐 Checkpoint。这样,Checkpointing 时间基本上就与端到端延迟无关。
- 请注意,非对齐 Checkpointing 会增加状态存储的 I/O,因此当状态存储的 I/O 是 整个 Checkpointing 过程当中真正的瓶颈时,您不应当使用非对齐 Checkpointing。
- 增量快照 :
- 不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。一个增量快照是基于(通常多个)前序快照构建的。由于 RocksDB 内部存在 compaction 机制对 sst 文件进行合并,Flink 的增量快照也会定期重新设立起点(rebase),因此增量链条不会一直增长,旧快照包含的文件也会逐渐过期并被自动清理。
- 和基于全量快照的恢复时间相比,如果网络带宽是瓶颈,那么基于增量快照恢复可能会消耗更多时间,因为增量快照包含的 sst 文件之间可能存在数据重叠导致需要下载的数据量变大;而当 CPU 或者 IO 是瓶颈的时候,基于增量快照恢复会更快,因为从增量快照恢复不需要解析 Flink 的统一快照格式来重建本地的 RocksDB 数据表,而是可以直接基于 sst 文件加载。
- 需要注意的是,一旦启用了增量快照,网页上展示的 Checkpointed Data Size 只代表增量上传的数据量,而不是一次快照的完整数据量。
- 本地(局部)checkpoint
- 远端(完整)checkpoint
Checkpoint 配置
- checkpoint interval
- checkpoint Timeout
- pause between checkpoints
- number of concurrent checkpoints
- Tolerable Failed Checkpoints
- Persist Checkpoints Externally是什么意思???
- setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-
cp失败的时候是否抛异常? setFailOnCheckpointingErrors , setTolerableCheckpointFailureNumber(X)
- 问题:Checkpoint失败与作业失败的关联?如何应对 Exceeded checkpoint tolerable failure threshold ?
- 问题:cp失败的影响是什么? 为什么要抛异常重启?
Checkpoint 状态
- End to End Duration
- Checkpoint Counts: 包含 Triggered: 1018 In Progress: 1 Completed: 1016 Failed: 1 Restored: 1
问题:怎么判断flink任务是从cp启动的还是从0启动的? Dashboard的checkpoints页面里有一个”Latest Restore”:
- 如果是None的话,一定是从0启动的。
- 如果有记录的话,且jobid和当前运行的jobid一致,说明是自动failover从cp恢复的。
- 如果有记录的话,且jobid和当前运行的jobid不一致,说明是手工从cp启动的。
Checkpoint 外部化文件
外部化checkpoint文件的构成: 很长的一个jobID作为一个的文件名,这个jobid与flinkdashboard上的任务运行的ID一致(每一次手工启动时产生),目录下三个文件夹:
- chk-xx 其中xx表明这是第xx次checkpoint
- shared Checkpoint 过程中所有 sstable 文件都保存在当前 job Checkpoint 目录下的 shared 目录里
- taskowned
需要注意的是,不同jobid下的文件可能是有依赖关系的,如果使用增量快照(state.backend.incremental : true)因为rockdb增量存储,如果一次手动启动是从前一次cp文件的话,那么这两个jobid的文件就是有依赖关系的。 详见 Flink 清理过期 Checkpoint 目录的正确姿势: https://blog.csdn.net/tzs_1041218129/article/details/104421686
Operator算子 DataStream Transformations
ProcessFunction / KeyedProcessFunction /
filter 过滤器,对数据流中的每个元素进行过滤判断,判断为true的元素进入下一个数据流 RichFilterFunction
flatmap 可以理解为将元素摊平,每个元素可以变为0个、1个、或者多个元素。 RichFlatMapFunction
map 可以理解为映射,对每个元素进行一定的变换后,映射为另一个元素。
mapPartition 维护每个并行分区内记录的顺序, RichMapPartitionFunction
name 方法Sets the name of the current data stream.
returns 方法Adds a type information hint about the return type of this operator.
keyby DataStream → KeyedStream
key Agg
getSideOutput 侧输出
Evictor:可以译为“驱逐者”。在Trigger触发之后,在窗口被处理之前,Evictor(如果有Evictor的话)会用来剔除窗口中不需要的元素,相当于一个filter。
数据关联算子 join / coGroup / intervalJoin
- connect
- join:是一个流join另一个流,需要设置窗口,2个流join需要的key字段。使用的是innerJoin。对Processing Time和Event Time都支持。
- coGroup:和join类似,不过CoGroupFunction和JoinFunction的参数不一样。coGroup是需要自己组装数据。
-
intervalJoin:是一个流join另一个流,不需要设置窗口,但是需要设置流join的时间范围(需要时间字段),仅支持Event Time的计算。
- Join转换使用来自两个输入的匹配记录对调用JoinFunction,这两个输入具有相同的键字段值.此行为与相等的内部联接非常相似.
- CoGroup转换在具有相同键值字段的两个输入的所有记录上调用带有迭代器的CoGroupFunction.如果输入没有某个键值的记录,则传递空迭代器.除了别的以外,CoGroup转换可以用于内部和外部的相等连接.因此它比Join变换更通用.
- intervalJoin
https://developer.aliyun.com/article/778485
Timer 定时器
Time Characteristic
setStreamTimeCharacteristic
- TimeCharacteristic.EventTime :事件产生的时间,即数据产生时自带时间戳
- TimeCharacteristic.ProcessingTime: 与数据本身的时间戳无关,即在window窗口内计算的时间(默认的Time)
- TimeCharacteristic.IngestionTime: 数据进入到Flink的时间
onTimer
https://help.aliyun.com/document_detail/470392.html
window
Window是无限数据流处理的核心。 Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。 在有限批处理的概念里看起来没有窗口的概念,但可以看做整个一批就是一次窗口。 官网对window的介绍 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html 中文介绍 https://blog.csdn.net/dafei1288/article/details/98919202
- 滚动窗口合并 TumblingEventTimeWindows
- 固定大小窗口,不重叠,一个贴一个,同一元素不会分配到多个窗口。
- 滑动窗口合并 SlidingEventTimeWindows
- 固定大小窗口,多了一个滑动参数,每次滑动都按滑动参数大小,窗口大小>滑动大小的话,就会有一部分重叠,落入重叠的元素会分配到多个窗口。
- Session窗口合并 EventTimeSessionWindows
- 不同流之间的窗口不是按时间分组的,而是按各自的session分组。session的划分是当固定时间周期内不受到元素,则窗口关闭。
- 间隔关联合并
- 间隔关联目前只支持eventtime,只支持inner join.
典型模式: 分组的流 vs 非分组的流。唯一的区别是分组的stream调用keyBy(…)和window(…),而非分组的stream中window()换成了windowAll(…) stream.keyby(…).window(….).trigger(new XXX).apply(…)
窗口内处理
ProcessWindowFunction
窗口的触发器
Trigger抽象类,定义了窗口是何时被触发并同时决定触发行为(对窗口进行清理或者计算)。
- onElement:每个元素到达触发的回调方法;
- onProcessingTime:基于处理时间定时器触发的回调方法;
- onEventTime:基于事件时间定时器触发的回调方法;
- onMerge:窗口在合并时触发的回调方法(会话窗口分配器assigner);
TriggerContext 接口(定义在Trigger类中),用于维持状态,注册定时器等:
- registerXXXTimeTimer:注册(处理/事件)时间定时器;
- deleteXXXTimeTimer:删除(处理/事件)时间定时器;
- getPartitionedState:从Flink状态存储终端获取状态;
TriggerResult 枚举类,用于决定窗口在触发后的行为:
- CONTINUE:不作任何处理;
- FIRE_AND_PURGE:触发窗口计算并输出结果同时清理并释放窗口(该值只会被清理触发器PurgingTrigger使用);
- FIRE:触发窗口计算并输出结果,但窗口并没有被释放并且数据仍然保留;
- PURGE:不触发窗口计算,不输出结果,只清除窗口中的所有数据并释放窗口
多次触发的用法: 实现一个Trigger的派生类XXX,将其作用在窗口操作后的.trigger(new XXX)中。比如stream.keyby(…).window(….).trigger(new XXX).apply(…)
- onElement: 每个元素到达
窗口的驱逐器
在窗口apply前后允许删除窗口里特定的元素。
- evictBefore
- evictAfter
watermark
watermark是和Event Time一起使用的一个概念。由于消息自身的时间和消息被flink处理的时间往往是不同的,为了准确地表达数据的处理进度,出现了水印的概念。
水印就是一个时间戳,可以给每个消息添加一个 允许一定延迟 的时间戳。
watermark是用于处理乱序事件的,通常用watermark机制结合window来实现。
水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。
DataStream.assignTimestampsAndWatermarks()方法来提取事件时间并同时产生水印。
当我们把消息生产的时间戳赋值给水印值,就意味着水印值一定能够表示消息生产的先后顺序。
- 窗口可以继续计算一定时间范围内延迟的消息
- 添加水印后,窗口会等 5 秒,再执行计算。若超过5秒,则舍弃。
- 窗口执行计算时间由 水印时间 来触发,当接收到消息的 watermark >= endtime ,触发计算
AscendingTimestampExtractor 的作用? AscendingTimestampExtractor 产生的时间戳和水印必须是单调非递减的,用户通过覆写extractAscendingTimestamp()方法抽取时间戳.
BoundedOutOfOrdernessTimestampExtractor 的作用? BoundedOutOfOrdernessTimestampExtractor 产生的时间戳和水印是允许“有界乱序”的,构造它时传入的参数maxOutOfOrderness就是乱序区间的长度,而实际发射的水印为通过覆写extractTimestamp()方法提取出来的时间戳减去乱序区间,相当于让水印把步调“放慢一点”。这是Flink为迟到数据提供的第一重保障。
需要深刻理解这几个概念才行: https://www.jianshu.com/p/c612e95a5028
source
理论上,flink任务的输入可以从任何介质来。
流式source
flink中的source作为整个stream中的入口,而sink作为整个stream的终点。
SourceFunction为所有flink中source的根接口,其定义了run()方法和cancel()方法。
- run方法的功能是核心功能,主要用于source往出emit元素
- cancel方法时用于取消run方法的执行,一般来说run方法内部是一个循环,cancel方法中控制run方法的循环不满足条件,从而取消run方法的执行。
addSource(sourceFunction)
- SourceFunction
- ParallelSourceFunction
- RichParallelSourceFunction
Source表
source表的出处可能是 Kafka 的 topics,数据库,文件系统,或者任何其它 Flink 知道如何消费的系统。
一个table必须被注册(createTemporaryView)到 TableEnvironment 里去才可以被后续查询使用。
一个Table可以来自于 TableSource, Table, CREATE TABLE statement, DataStream. 或者可以register catalogs in a TableEnvironment to specify the location of the data sources.
sink
理论上,flink任务的输出可以写入到任何介质去。
流式sink
Sink是流的重点,根接口是sinkFunction。
其重要的方法是invoke()方法,用以实现结果数据的处理逻辑
SinkFunction 是一个接口,类似于SourceFunction接口。SinkFunction中主要包含一个方法,那就是用于数据输出的invoke 方法,每条记录都会执行一次invoke方法,用于执行输出操作。
addSink(sinkFunction)
- RichsinkFunction
表sink
TableSink 是一个通用接口,用于支持以多种文件格式(如 CSV、Apache Parquet、Apache Avro),向不同存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如 Apache Kafka、RabbitMQ)输出。
Flink BulkWriter
Connector 与外部系统的连接器
用于支持与其他组件数据连接的 source 和 sink。比如和kafka连接,比如和Hadoop连接,比如和RaddbitMQ连接。
其中最为常用的当属Flink kafka connector。 此外,Apache Bahir 项目中也提供了更多连接器。
针对不同的Flink API也有不同的连接器, Datastream Connector 和 Table API Connector。
FlinkKafkaConsumer
1.12官方文档: https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html
kafka 中数据都是以二进制 byte 形式存储的。读到 Flink 系统中之后,需要将二进制数据转化为具体的 java、scala 对象。所以需要实现一个 schema 类,定义如何序列化和反序列数据。
反序列化时需要实现 DeserializationSchema 接口,并重写 deserialize(byte[] message) 函数。 如果是反序列化 kafka 中 kv 的数据时,需要实现 KeyedDeserializationSchema 接口,并重写 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) 函数。
- DeserializationSchema 接口类
下面是三个内置的常用序列化类
- SimpleStringSchema,按字符串方式进行序列化、反序列化。
- TypeInformationSerializationSchema,它可根据 Flink 的 TypeInformation 信息来推断出需要选择的 schema。
- JsonDeserializationSchema 使用 jackson 反序列化 json 格式消息,并返回 ObjectNode,可以使用 .get(“property”) 方法来访问相应字段。
问题:kafka生产限速的时候,flink sink是什么效果? ERROR-Expiring xx record(s) for xxx:600346 ms has passed since batch creation
kafka offset checkpoint
读kafka重要的是设置offset:
- flink消费了kafka数据后,不会更新offset到kafka,直到checkpoint完成。
- flink在没有使用savepoint重启作业的时候,消费kafka的offset还是从kafka自身获取,存在重复消费数据的情况。
- flink使用savepoint重启作业,不会重复消费kafka数据,也会正确更新kafka的offset。
Flink的kafka consumer一共有三种offset commit模式(FlinkKafkaConsumer基类的成员):
-
- OffsetCommitMode.DISABLED: 完全disable offset的commit
-
- OffsetCommitMode.ON_CHECKPOINTS: Flink的默认行为,只有当Flink checkpoint完成时,才会将offset commit到Kafka
-
- OffsetCommitMode.KAFKA_PERIODIC: 使用Kafka的internal client的默认行为,周期性将offset commit到kafka 如果不想借助checkpoint来重置kafka的offset, 注意:该模式的前提是Flink任务不开启checkpoint
- 可以利用 FlinkKafkaConsumerBase 设置 setCommitOffsetsOnCheckpoints(false),以及在kafka properties里面设置 “auto.commit.enable” 为false,这样就相当于没有commit offset(在kafka broker端显示的消费offset一直不变),作业恢复的时候,如果你们设置是从kafka consume from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed state相关吧),也可以从最新的offset消费。
StreamingFileSink checkpoint
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/streamfile_sink/
文件系统重要的是bucketassign和rollingPolicy
- StreamingFileSink
- forBulkFormat
- withRollingPolicy
- withOutputFileConfig
- withBucketAssigner
- 使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。
- 按行编码存储 Row-encoded sink: StreamingFileSink.forRowFormat(basePath, rowEncoder) 需要指明一个encoder
- 批量编码存储 Bulk-encoded sink: StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory) 需要指明一个BulkWriter.Factory
- ParquetWriterFactory
- AvroWriterFactory
- SequenceFileWriterFactory
- CompressWriterFactory
- OrcBulkWriterFactory
TableFactory
TableFactory 用来创建与table相关的实例工厂接口,实例的来源来自字符串形式的properties。
实现该接口的类应该被这样添加: Classes that implement this interface can be added to the “META_INF/services/org.apache.flink.table.factories.TableFactory” file of a JAR file in the current classpath to be found.
- TableSourceFactory
- tableSinkFactory
Flink SQL && Flink Table API
和传统SQL不同,Flink SQL设计成的是一个批流一体的SQL。 使得查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。 一直以来SQL都是用来处理关系型批量数据的,而不是处理流式数据。尽管存在这些差异,但是使用关系查询和 SQL 处理流并不是不可能的。
Flink SQL 工作机制: https://zhuanlan.zhihu.com/p/150473300
Flink SQL如何实现数据流的join: http://www.whitewood.me/2019/12/15/Flink-SQL-%E5%A6%82%E4%BD%95%E5%AE%9E%E7%8E%B0%E6%95%B0%E6%8D%AE%E6%B5%81%E7%9A%84-Join/
Streaming SQL
批式模型和流式模式
- EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
- EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
Table API 和 SQL API使用的差别
- executeSql 执行create drop select insert各种sql, 其返回值是TableResult对象
-
sqlQuery 特定于select sql语句,其返回值是一个Table
- Table table2 = tableEnv.from(“table1”).select(…); // create a Table object from a Table API query
- Table table3 = tableEnv.sqlQuery(“SELECT … FROM table1 … “); // create a Table object from a SQL query
批流一体的概念中,SQL是真正一体的,Environment、Source、Sink并不是一体的。
- 创建表(tablesource):
- 查询表
- 输出表(TableSink): Table 通过写入 TableSink 输出。TableSink 是一个通用接口,用于支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如 Apache Kafka、RabbitMQ)。批处理 Table 只能写入 BatchTableSink,而流处理 Table 需要指定写入 AppendStreamTableSink,RetractStreamTableSink 或者 UpsertStreamTableSink。
将流式概念转为批流一体的Table概念 不管流式数据源还是批式数据源,进入到Flink SQL里,都以table的概念来表达。
- tableEnv.createTemporaryView: 在 TableEnvironment 中可以将 DataStream 或 DataSet 注册成视图。结果视图的 schema 取决于注册的 DataStream 或 DataSet 的数据类型
- tableEnv.fromDataStream: DataStream 和 DataSet 还可以直接转换成 Table
table概念还可以再转回datastream
- tableEnv.toAppendStream
- tableEnv.toRetractStream
Flink SQL建表
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv()) //withFormat 是用来告诉flink我应该怎么处理来源用的每一条数据 比如按csv的格式,号分割
.withSchema(new Schema() //withSchema 是声明创建表的表结构 要按照解析得到的数据的先后顺序对应
.field("id", DataTypes.STRING())
.field("time", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE()))
.createTemporaryTable("inputTable");
https://www.cnblogs.com/21airuirui1123/p/14644933.html
自定义 SQL Connector
- 自定义Factory,根据需要实现StreamTableSourceFactory和StreamTableSinkFactory
- 根据需要继承 ConnectorDescriptorValidator ,定义自己的connector参数(with 后面跟的那些)
- Factory中的requiredContext、supportedProperties都比较重要,框架中对Factory的过滤和检查需要他们
- 需要自定义个TableSink,根据你需要连接的中间件选择是AppendStreamTableSink、Upsert、Retract,并重写consumeDataStream方法
- 自定义一个SinkFunction,在invoke方法中实现将数据写入到外部中间件。
CollectionTableFactory
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sourcessinks/
SQL语法扩展
https://www.jianshu.com/p/623266b941de
DataSet API
从flink-1.12版本开始DataSetAPI正在废弃中。
Python API
PythonAPI,就是 PyFlink, 换了开发语言,其API也分为 PyFlink Table API 和 PyFlink DataStream API 两种。
UDF
Table API/SQL UDF
- Scalar 函数, 将一个标量数据 转换成 一个标量数据。
- Table 函数, 将一个标量数据 转换成 数据。
- Aggregate 函数, 将多行的标量数据 聚合成为一个标量数据。
- Table aggregate 函数, 将多行的标量数据 聚合成 行数据。
- Async table 函数, lookup。
DataStreamAPI UDF
- 最基础的方式就是 implements MapFunction去实现一个新的类,或者是data.map(后面直接new MapFunction出一个匿名的类)
- Rich Function, 附加了open, close, getRuntimeContext, setRuntimeContext 四个方法。
Flink部署
- Flink 镜像
- Flink 内核程序
- 依赖的包
- Flink 程序
standalone mode
mac环境下: 用brew安装flink, $ brew install apache-flink
-
brew安装的flink会放置在 /usr/local/Cellar/apache-flink/1.9.1/libexec
-
./libexec/bin/start-cluster.sh
-
./libexec/bin/stop-cluster.sh
yarn mode
用户任务通过flink命令提交到 yarn 管理的集群上
kubernetes mode
用户任务通过flink命令提交到 k8s 管理的集群上
- 首先往 Kubernetes 集群提交了资源描述文件后,会启动 Master 和 Worker 的 container。
- Master Container 中会启动 Flink Master Process,包含 Flink-Container ResourceManager、JobManager 和 Program Runner。
- Worker Container 会启动 TaskManager,并向负责资源管理的 ResourceManager 进行注册,注册完成之后,由 JobManager 将具体的任务分给 Container,再由 Container 去执行。
java/python/sql 任务提交方式
Flink监控
Flink自带的dashboard
flink自定义metric
flink metric类型分为Counter、Gauge、Histogram、Meter
第一,常用的如 Counter,写过 mapreduce 作业的开发人员就应该很熟悉 Counter,其实含义都是一样的,就是对一个计数器进行累加,即对于多条数据和多兆数据一直往上加的过程。 第二,Gauge,Gauge 是最简单的 Metrics,它反映一个值。比如要看现在 Java heap 内存用了多少,就可以每次实时的暴露一个 Gauge,Gauge 当前的值就是 heap 使用的量。 第三,Meter,Meter 是指统计吞吐量和单位时间内发生“事件”的次数。它相当于求一种速率,即事件次数除以使用的时间。 第四,Histogram,Histogram 比较复杂,也并不常用,Histogram 用于统计一些数据的分布,比如说 Quantile、Mean、StdDev、Max、Min 等。
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
Prometheus 监控flink metric
日志
JM日志
TM日志
flink cluster log
flink jobs log
configuration trace log
Flink资源规划
CPU和内存
- JobManager个数
- TaskManager规格
- TaskManager个数
-
单个TaskManager槽位数
- 进程总内存(taskmanager.memory.process.size): JobManager 的总内存,即 Master 进程的内存
- Flink 总内存(taskmanager.memory.flink.size): JobManager 框架的内存,包括 JVM Heap 内存和 Off-Heap 部分内存。
- 其中,JVM Heap 内存 用于 Flink 框架、用户代码(作业提交及 Checkpoint 完成的回调函数);
- Off-Heap 内存用于 Flink 框架依赖(例如 Akka 的网络通信)和包含 Metaspace,其值等于 Total Flink Memory 减去 JVM Heap。
-
进程总内存 - Flink总内存 额外的部分是JVM Metaspace: 主要是加载用户程序中的类
- CPU Cores
- Physical Memory
- JVM Heap Size
- Flink Managed Memory
Task slot
每个Flink TaskManager在集群中提供处理槽。 插槽的数量通常与每个TaskManager的可用CPU内核数成比例。一般情况下你的slot数是你每个TM的cpu的核数。
apus.slotmanager.slot-placement-policy SLOT
并行度设定
设置parallelism的防范优先级是:算子(operator)级别 > 运行环境级别 > 客户端级别 > 系统级别
setParallelism 设置一个job或一个算子op的并发度。
setMaxParallelism 控制的是状态后端中keyed-state可以被分配的task最大个数。
特殊配置项
集群配置项
- JM的内存配置(打开flink dashboard的JM页面你可以看到一张Flink Memory Model图)
- jobmanager.memory.process.size 对应到图中的 Total Process Memory
- jobmanager.memory.flink.size 对应到图中的Total Flink Memory
- jobmanager.memory.heap.size
- jobmanager.memory.task.off-heap.size
- jobmanager.memory.jvm-metaspace.size
- jobmanager.memory.jvm-overhead.fraction:默认值 0.1(Total Process Memory的0.1)
- TM的内存配置(打开flink dashboard的TM页面你可以看到一张Flink Memory Model图)
- taskmanager.memory.process.size 对应到图中的 Total Process Memory
- taskmanager.memory.flink.size 对应到图中的Total Flink Memory
- taskmanager.memory.framework.heap.size
- taskmanager.memory.task.heap.size
- taskmanager.memory.managed.size
- taskmanager.memory.framework.off-heap.size
- taskmanager.memory.task.off-heap.size
- taskmanager.memory.network.fraction
- taskmanager.memory.jvm-metaspace.size
- taskmanager.memory.jvm-overhead.fraction
- TM的network配置
- taskmanager.memory.network.fraction 网络缓冲区也是内存的一部分
- taskmanager.memory.network.max
- taskmanager.memory.network.min
- taskmanager.network.sort-shuffle.min-buffers
- taskmanager.network.blocking-shuffle.compression.enabled
- taskmanager.network.request-backoff.initial
- taskmanager.network.request-backoff.max
- taskmanager.numberOfTaskSlots
- akka.ask.timeout 阻塞操作,可能因为机器繁忙或者网络堵塞导致timeout,可以尝试设置大一点
- akka.framesize JM和TM之间传输的最大消息值
- heartbeat.timeout 300000
- restart-strategy fixed-delay
- restart-strategy.fixed-delay.delay 180s
- restart-strategy.fixed-delay.attempts 1000
- state.backend.local-recovery true 此选项配置此 state backend 的本地恢复。
- state.backend.rocksdb.timer-service.factory ROCKSDB
- env.java.opts.taskmanager -XX:MaxGCPauseMillis=300
- env.java.opts.jobmanager
- cluster.evenly-spread-out-slots true
- jobmanager.execution.failover-strategy
- classloader.resolve-order
- web.checkpoints.history
- containerized.heap-cutoff-ratio
- classloader.parent-first-patterns.default
- state.backend rocksdb/filesystem
- state.backend.rocksdb.ttl.compaction.filter.enabled
- state.backend.rocksdb.memory.managed
- state.checkpoints.dir: hdfs:///flink/checkpoints
- state.savepoints.dir: hdfs:///flink/checkpoints
- state.backend.rocksdb.localdir
- state.backend.rocksdb.predefined-options SPINNING_DISK_OPTIMIZED_HIGH_MEM
- state.backend.incremental
- execution.checkpointing.interval: 5000
- execution.checkpointing.mode: EXACTLY_ONCE
- execution.checkpointing.timeout: 600000
- execution.checkpointing.min-pause: 500
- execution.checkpointing.max-concurrent-checkpoints: 1
- state.checkpoints.num-retained: 3
- execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION / DELETE_ON_CANCELLATION
- execution.checkpointing.tolerable-failed-checkpoints
- execution.checkpointing.unaligned 默认是false,如果设置true:非对齐checkpoint,以提升性能
- pipeline.max-parallelism The program-wide maximum parallelism used for operators which haven’t specified a maximum parallelism
参考: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html 参考: https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/config/
Table 环境配置项
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/config/
Flink 内存模型 (JobManager)
https://cloud.tencent.com/developer/article/2026134
Flink 内存模型 (TaskManager)
- Framework Heap: taskmanager.memory.framework.heap.size 用于 Flink 框架的 JVM 堆内存
- Task Heap: taskmanager.memory.task.heap.size 用于 Flink 应用的算子及用户代码的 JVM 堆内存
- Managed Memory : taskmanager.memory.managed.size 由 Flink 管理的用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。
- Framework Off-Heap : taskmanager.memory.framework.off-heap.size 用于 Flink 框架的堆外内存(直接内存或本地内存)
- Task Off-Heap : taskmanager.memory.task.off-heap.size 用于 Flink 应用的算子及用户代码的堆外内存(直接内存或本地内存)
- Network: 用于任务之间数据传输的直接内存(例如网络传输缓冲)。该内存部分为基于 Flink 总内存的受限的等比内存部分。
- JVM Metaspace
- JVM Overhead 用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。
Flink程序调试
问题:Flink程序出了问题怎么定位原因? 现象->本质
背压/反压的原因
- 定位反压节点:
- 解决反压首先要做的是定位到造成反压的节点: 借助反压面板,寻找第一个出现反压的节点,那么反压根源要么是就这个节点,要么是它紧接着的下游节点。
- 确定反压原因:
- 数据倾斜造成
- 用户代码的执行效率问题(频繁被阻塞或者性能问题)
- TaskManager 的内存以及 GC 问题也可能会导致反压
- 观察 Task Thread 阻塞在哪里,可能是用户函数本身有些同步的调用,可能是 checkpoint 或者 GC 等系统活动导致的暂时系统暂停
反压和checkpoint的关联
- 反压会影响到两项指标: checkpoint 时长和 state 大小。
- 前者是因为 checkpoint barrier 是不会越过普通数据的,数据处理被阻塞也会导致 checkpoint barrier 流经整个数据管道的时长变长,因而 checkpoint 总体时间(End to End Duration)变长。
- 后者是因为为保证 EOS(Exactly-Once-Semantics,准确一次),对于有两个以上输入管道的 Operator,checkpoint barrier 需要对齐(Alignment),接受到较快的输入管道的 barrier 后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这些被缓存的数据会被放到state 里面,导致 checkpoint 变大。
- 为了缓解反压给 checkpoint 造成的压力,社区提出了 FLIP-76: Unaligned Checkpoints 来解耦反压和checkpoint
https://stackoverflow.com/questions/61311010/flink-checkpoints-causes-backpressure
checkpoint失败原因
- 如果 Checkpoint 做的非常慢,超过了 timeout 还没有完成,则整个 Checkpoint 也会失败。 Checkpoint慢的原因需要细查。
- 用户代码逻辑没有对于异常处理,让其直接在运行中抛出。比如解析 Json 异常,没有捕获,导致 Checkpoint 失败,或者调用 Dubbo 超时异常等等。
- 依赖外部存储系统,在进行数据交互时,出错,异常没有处理。比如输出数据到 Kafka、Redis、HBase 等,客户端抛出了超时异常,没有进行捕获,Flink 任务容错机制会再次重启。
- 内存不足,频繁 GC,超出了 GC 负载的限制。比如 OOM 异常
- 网络问题、机器不可用问题等等。
- 参考:Flink Checkpoint 问题排查实用指南
checkpoint慢/超时的原因
- 背压导致chechkpoint超时: 缓冲区的数据处理不过来,barrier流动慢,导致checkpoint生成时间长, 出现超时的现象.
- 在 task 端,所有的处理都是单线程的,数据处理和 barrier 处理都由主线程处理,如果主线程在处理太慢(比如使用 RocksDBBackend,state 操作慢导致整体处理慢),导致 barrier 处理的慢,也会影响整体 Checkpoint 的进度
Flink 传递数据的序列化方式
https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/ 任何在Flink算子之间流转的数据、任何在算子状态内缓存的数据,都必须提供序列化和反序列化机制。
- Flink-provided special serializers :
- for basic types (Java primitives and their boxed form), arrays, composite types (tuples, Scala case classes, Rows), and a few auxiliary types (Option, Either, Lists, Maps, …),
- POJOs:
- a public, standalone class with a public no-argument constructor and all non-static, non-transient fields in the class hierarchy either public or with a public getter- and a setter-method; see POJO Rules,
- Generic types:
- user-defined data types that are not recognized as a POJO and then serialized via Kryo.
- Avro
- Kryo
- Protobuf (via Kryo)
- Apache Thrift (via Kryo)
Flink HelloWorld
下面是一些最为简单的例子程序
Basic Commands
cd /usr/local/Cellar/apache-flink/1.9.1 && ./libexec/bin/start-cluster.sh
./bin/flink run -c com.aaa.worldcount xxx.jar --host localhost --port 7777
./bin/flink list --all
./bin/flink cancel :job_id
./bin/flink run -s :savepointPath
scala code
import org.apache.flink.api.scala._
object FlinkWordCount {
def main(args:Array[String]):Unit = {
//val env = ExecutionEnvironment.getExecutionEnvironment;
val env = ExecutionEnvironment.createRemoteEnvironment("flink-master", 6123, "D:\\CodeBase\\jvmlearning\\flink-learning\\target\\flink-learning-1.0-SNAPSHOT.jar")
val text = env.readTextFile("hdfs://flink-master:9000/user/flink/input/SogouQ.sample")
println(text.count())
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
//env.execute()
println(counts.count())
//println(env.getExecutionPlan());
//counts.print()
}
}
java code
这是我在IDEA上编译运行的第一个flink程序(maven构建)。
可以通过在命令行 $ nc -lk 9000 来往flink程序里输入字节流。
package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// Create the execution environment.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Get the input data by connecting the socket.
// Here it is connected to the local port 9000.
// If the port 9000 has been already occupied, change to another port.
DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");
// Parse the data, and group, windowing and aggregate it by word.
DataStream<Tuple2<String, Integer>> windowCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(10))
.sum(1);
// Print the results to the console, note that here it uses the single-threaded printing instead of multi-threading
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 添加自定义数据源
DataStreamSource<Person> data = env.addSource(new MyMysqlSource());
data.print().setParallelism(2);
data.addSink(new MyMysqlSink());
// 提交执行任务
env.execute("MySourceMysql");
Flink Job
首先你得把java或scala程序变成jar包,直接使用mave的package功能(注意maven指定的jdk版本要和运行时的版本一致)。
打开 http://localhost:8081/#/overview ,在Web界面提交job。
然后在Task Manager里面就可以看到自己提交的job,其日志和标准输出都可以看到。
Flink源码剖析
从 StreamGraph 转化为 JobGrpah 的过程 : 节点(StreamNode)合并,避免无意义的跨节点通信
StreamingJobGraphGenerator: 生成 JobGraph
Flink SQL的编译过程
Flink SQL 利用 Apache Calcite 将 SQL 翻译为关系代数表达式,使用表达式折叠(Expression Reduce),下推优化(Predicate / Projection Pushdown )等优化技术生成物理执行计划(Physical Plan),利用 Codegen 技术生成高效执行代码。
基于calcite的编译流程
- sql 解析阶段:calcite parser 解析(sql -> AST,AST 即 SqlNode Tree)
- SqlNode 验证阶段:calcite validator 校验(SqlNode -> SqlNode,语法、表达式、表信息)
- 语义分析阶段:SqlNode 转换为 RelNode,RelNode 即 Logical Plan(SqlNode -> RelNode)
- 优化阶段:calcite optimizer 优化(RelNode -> RelNode,剪枝、谓词下推等)
- 物理计划生成阶段:Logical Plan 转换为 Physical Plan(等同于 RelNode 转换成 DataSet\DataStream API)
- 后续的运行逻辑与 datastream 一致
SQL背后对应的执行算子
- MultipleInput
- HashJoin
- Calc(select=[], where=[])
- OverAggregate
- HashAggregate
- LocalHashAggregate
- Sort(orderBy=[])
Flink背后的依赖
- 组件间通信 akka ,JobManager和TaskManager之间的控制通信
- 数据传输 netty, 比如Operator之间的传输数据 https://github.com/wangzhiwubigdata/God-Of-BigData/tree/master/Netty
- SQL解析 Calcite, Flink parses SQL using Apache Calcite, which supports standard ANSI SQL.
Flink流控机制
https://www.jianshu.com/p/c8b3e32a9fa3
Flink周边
Alink
Alink是基于Flink的通用算法平台 https://github.com/alibaba/Alink
Alink 最大的亮点是有流式算法和在线学习
AlgoOperator: AlgoOperator有两个子类,BatchOperator和StreamOperator:所有的批式机器学习算法都会继承BatchOperator,所有的流式机器学习算法都会继承StreamOperator
Alink算法库中最重要的是 Iterative Communication/Computation Queue (简称ICQ),是我们面向迭代计算场景总结的一套迭代通信计算框架,它集成了内存缓存技术和内存数据通信技术。我们把每个迭代步抽象为多个ComQueueItem(通信模块与计算模块)串联形成的队列。
BatchOperator
FlatMapStreamOp
linkFrom && link
FlinkML 和 Alink 的关系?
FlinkML是1.8之前的一个机器学习组件,在1.9开始已经不存在了。 https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/ml/index.html
PyAlink 和 Alink 的关系?
PyAlink 和 PyFlink 的关系?
https://zhuanlan.zhihu.com/p/114717285 https://github.com/uncleguanghui/pyflink_learn/blob/master/examples/README.md
PyFlink是 apache-flink项目里的一个还未成熟的部分。
flink-ai-extended 和 ai-flow
Tensorflow 官方有java版本的调用库,JVM单机训练、单机预测都没问题。如何分布式训练是个问题。 flink-ai-extended
- TFConfig
- MLContext
参考学习
- 最权威的使用文档是:https://flink.apache.org/
- Flink社区: https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home
- 最权威的底层实现文档是: Flink Internals https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals
- flink基本概念介绍 https://www.jianshu.com/p/2ee7134d7373
- 如何正确使用 flink connector https://yq.aliyun.com/articles/716838
- idea+maven打jar包 https://blog.csdn.net/branwel/article/details/79918018
- 官网的内容超级全 https://ci.apache.org/projects/flink/flink-docs-release-1.10/
- Flink如何支持特征工程、在线学习、在线预测等AI场景? https://mp.weixin.qq.com/s/C2Uft-IuzgiKa1aDlROIng
- 快手的Flink实践 https://new.qq.com/omn/20190717/20190717A0HNBE00.html*
Flink Forward
https://www.flink-forward.org/
Flink Forward China 2020
failover单点恢复而不进行全局恢复
实时数仓
hudi 支持基于主键的upsert/delete
数据湖框架
PyFlink
Flink AL Extended
Flink Forward China 2021
应用API
14支持同一个应用中混合使用有界流和无界流
批执行模式现在支持在同一应用中混合使用 DataStream API 和 SQL/Table API(此前仅支持单独使用 DataStream API 或 SQL/Table API)
状态
state-backend优化
cp、snapshot优化: 不被流动缓慢的中间数据阻塞 unaligned checkpoint ; 更少的中间数据 buffer debloating; Log Based(类似WAL) 将snapshot和uploading剥离开, cp不再受限于具体的state backend;
容错处理:机器学习场景对数据一致性的要求是弱化的, Task-Local Recovery; rescaling vs failover
流批一体
面向流批一体的 Flink Runtime: 流执行模式(增量处理)、批执行模式(算子逐级允许)
流批一体API
流批一体存储: iceberg
流批一体认知: 只实现流批计算统一、只实现流批存储统一、 计算和存储均统一
插件化shuffle api, remote shuffle
flink cdc(Change Data Capture)
从各种数据库中获取变更流并接入到Flink
上游 mysql、mongodb
下游 clickhouse、hudi、iceberg
实时物化视图
实时数仓→流式数仓
基于 Flink Dynamic Table 构建流批一体数仓
Flink Forward China 2022
Flink tableStore 流批一体动态表存储 具体是怎么样的实现?
Retry Lookup