Flink开发实践

流式数据处理概念

如果你对大数据流式处理有以下需求,

Google Dataflow

批流一体的理论基石。

基本概念

本地状态,存储中间信息、缓存信息。

窗口操作

事件驱动

管道 允许输入流数据,输出流数据,交给下一个任务

DAG

数据传递方式 hash / rebalance / forward / shuffle / rescale

广播变量 getRuntimeContext().getBroadcastVariable / withBroadcastSet

Exactly-Once Exactly-Once是流处理系统核心特性之一,它保证每一条消息只被流处理系统处理一次,通过借鉴Chandy和Lamport在1985年发表的一篇关于分布式快照的论文,Flink实现了Exactly-Once特性。

JobGraph JobGraph是通过 Flink 各类API构建起来的一张任务执行图。 当 JobGraph 提交给 Flink 集群后,能够以 Local、Standalone、Yarn 和 Kubernetes 四种模式运行。

JobManager

JobManager 相当于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理。

TaskManager

TaskManager 相当于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。一个TM对应一个JVM。

任务槽是Flink计算资源的基本单位. Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。 每个任务槽可以在同一时间执行一个Task,而TaskManager可以拥有一个或者多个任务槽。 任务槽可以实现TaskManager中不同Task的资源隔离,不过是逻辑隔离,并且只隔离内存,亦即在调度层面认为每个任务槽“应该”得到taskmanager.heap.size的N分之一大小的内存。CPU资源不算在内。

Checkpoint 和 Savepoint

Flink中的每个方法或算子都可以是有状态的,我们称之为state。Checkpoint是把State数据定时持久化存储

这两者都是用于恢复作用。尤其是checkpoint用于恢复异常失败的作业。

异步快照

在Flink做分布式快照过程中核心是Barriers的使用。这些Barriers是在数据接入到Flink之初就注入到数据流中,并随着数据流向每个算子。

Barrierr 会周期性地注入数据流中,作为数据流的一部分,从上游到下游被算子处理。Barrier 会严格保证顺序,不会超过其前边的数据。Barrier 将记录分割成记录集,两个 Barrier 之间的数据流中的数据隶属于同一个检查点。每一个 Barrier 都携带一个其所属快照的 ID 编号。

Barrier 随着数据向下流动,不会打断数据流,因此非常轻量。

作业 Failover、 容错、 灾备

Checkpoint 在作业failover的时候自动使用。 Flink 的容错机制主要分为从 checkpoint 恢复状态和重流数据两步,这也是为什么 Flink 通常要求数据源的数据是可以重复读取的。对于重启后的新 Task 来说,它可以通过读取 checkpoint 很容易地恢复状态信息,但是却不能独立地重流数据,因为 checkpoint 是不包含数据的。

通过 Flink 配置文件 flink-conf.yaml 中的 jobmanager.execution.failover-strategy 配置项进行配置Failover策略:

  1. 全图重启 full
  2. 基于Region的局部重启 region

如何从checkpoint恢复启动作业? 常用的重启策略 (1)固定间隔策略 (Fixed delay) (2)失败率策略 (Failure rate) (3)无重启 (No restart)

What happens if a task manager is lost? https://ververica.zendesk.com/hc/en-us/articles/360002262919-What-happens-if-a-task-manager-is-lost-

Task-Local Recovery

Flink 单点恢复 https://segmentfault.com/a/1190000025168779

Flink和HDFS打交道主要有两类情况(当然这不是唯一方式),一类是Checkpoint,一类是Hdfs-Sink

flink的cp会备份到hdfs去,当作业并发量大(TM多)时,HDFS的压力会大: 1)大量的 RPC 请求会影响 RPC 的响应时间; 2)大量文件对 NameNode 内存造成很大压力; 3) 大量产出小文件,其他任务读取小文件数据的成本也增加;

减小Checkpointing对HDFS的压力

参考:https://www.infoq.cn/article/OLlJNzQpTOHfyrgOG8xq

HDFS-Sink避免小文件过多

批处理和流处理都支持,批流一体

batch application stream application

Flink如何以Table的概念支持批流一体

Flink 数据处理流水线开始于 source 表。source 表产生在查询执行期间可以被操作的行;它们是查询时 FROM 子句中引用的表。这些表可能是 Kafka 的 topics,数据库,文件系统,或者任何其它 Flink 知道如何消费的系统。

流到动态表的转换

动态表 是 Flink 的支持流数据的Table的核心概念。动态表的查询是一种永不停止的查询,动态表的查询结果也是一种永在变更的结果。

查询动态表将生成一个 连续查询 。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。

动态表 (Dynamic Table): https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/streaming/dynamic_tables.html

在创建Table和从Datatream转换为Table时,可以有一个处理时间字段(以 .proctime 为后缀),可以有一个事件时间字段(以 .rowtime 为后缀)。

动态表到流的转换

Retraction 机制

又名 Changelog 机制。Retraction 是流式数据处理中撤回过早下发(Early Firing)数据的一种机制,类似于传统数据库的Update 操作。 retract是流式计算场景下对数据更新的处理方式。

keygroup

Flink的状态分为两类:Keyed State和Operator State。前者与每个键相关联,后者与每个算子的并行实例(即Sub-Task)相关联。

Key Group是Flink状态机制中的一个重要设计. Key Group是Keyed State分配的原子单位,且Flink作业内Key Group的数量与最大并行度相同,也就是说Key Group的索引位于[0, maxParallelism - 1]的区间内。 从这里可以看到key-group与最大并发数有关系,如果key-group分配不均匀的话,状态分配也会不均匀。

Catalog

Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

Metastore 即元数据服务,是Hive用来管理库表元数据的一个服务

HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hadoopConfDir, String hiveVersion)

Catalog 函数

Flink功能

官网是最好的,你会发现百分之99的网页内容其实都出自于官网:https://nightlies.apache.org/flink/flink-docs-master/zh/ 不过请注意,不同大版本之间(比如1.12/1.13/1.14/1.15)之间的API可能有明显差异,版本对应的官方文档内容也有差异。

Environment

不管是本地执行还是集群执行,不管是流式模式还是批式模式,都需要特定的environment

LocalEnvironment vs RemoteEnvironment

StreamExecutionEnvironment

每一个flink应用都需要一个执行环境, 对于流处理程序,使用的执行环境类是 StreamExecutionEnvironment. 这是Flink单纯做流式处理所用的执行环境。 当 env.execute() 方法执行之后,代码所设计的一张执行图就会被打包发送到Flink Master,进行任务拆解和并行化,分配到TaskManager执行。

env.setStateBackend 设置状态后端的存储机制

TableEnvironment

TableEnvironment是 Table/SQL API的运行环境。

TableConfig 对象用于设置Table/SQL API程序的配置。

StatementSet 对象,把几个sql 放到一个 statement 里面,以一个作业的方式去执行,能够实现节点的复用。 Table (通过 StatementSet.addInsert() 输出给某个 Sink)和 INSERT 语句 (通过调用 StatementSet.addInsertSql())会先被缓存到 StatementSet 中。

真正的执行发生在sink时, 当 ?? 执行之后, 执行图会发送到master端。 StatementSet.execute() 方法被调用时,所有的 sink 会被优化成一张有向无环图。

StreamTableEnvironment

DataStream 和 Table 之间的转换(面向 StreamTableEnvironment )

一旦 Table 被转化为 DataStream,必须使用 StreamExecutionEnvironment 的 execute 方法执行该 DataStream 作业。

几种stream

DataStream

SingleOutputStreamOperator

Flink特有的数据结构

数据重分布

keyBy、broadcast、rebalance、rescale、shuffle、global

partitionCustom

StatusBackend

Flink 提供了内存、文件系统、RocksDB 三种 StateBackends.

关于超大状态存储策略选择,生产环境状态存储 Backend 有两种方式:

参考: https://cloud.tencent.com/developer/article/1592441

参考: https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/datastream/fault-tolerance/checkpointing/

Checkpointed Function

import org.apache.flink.api.common.state.ReducingStateDescriptor

import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor;

Checkpoint

Checkpoint 配置

Checkpoint 状态

问题:怎么判断flink任务是从cp启动的还是从0启动的? Dashboard的checkpoints页面里有一个”Latest Restore”:

Checkpoint 外部化文件

外部化checkpoint文件的构成: 很长的一个jobID作为一个的文件名,这个jobid与flinkdashboard上的任务运行的ID一致(每一次手工启动时产生),目录下三个文件夹:

需要注意的是,不同jobid下的文件可能是有依赖关系的,如果使用增量快照(state.backend.incremental : true)因为rockdb增量存储,如果一次手动启动是从前一次cp文件的话,那么这两个jobid的文件就是有依赖关系的。 详见 Flink 清理过期 Checkpoint 目录的正确姿势: https://blog.csdn.net/tzs_1041218129/article/details/104421686

Operator算子 DataStream Transformations

ProcessFunction / KeyedProcessFunction /

filter 过滤器,对数据流中的每个元素进行过滤判断,判断为true的元素进入下一个数据流 RichFilterFunction

flatmap 可以理解为将元素摊平,每个元素可以变为0个、1个、或者多个元素。 RichFlatMapFunction

map 可以理解为映射,对每个元素进行一定的变换后,映射为另一个元素。

mapPartition 维护每个并行分区内记录的顺序, RichMapPartitionFunction

name 方法Sets the name of the current data stream.

returns 方法Adds a type information hint about the return type of this operator.

keyby DataStream → KeyedStream

key Agg

getSideOutput 侧输出

Evictor:可以译为“驱逐者”。在Trigger触发之后,在窗口被处理之前,Evictor(如果有Evictor的话)会用来剔除窗口中不需要的元素,相当于一个filter。

数据关联算子 join / coGroup / intervalJoin

https://developer.aliyun.com/article/778485

Timer 定时器

Time Characteristic

setStreamTimeCharacteristic

onTimer

https://help.aliyun.com/document_detail/470392.html

window

Window是无限数据流处理的核心。 Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。 在有限批处理的概念里看起来没有窗口的概念,但可以看做整个一批就是一次窗口。 官网对window的介绍 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html 中文介绍 https://blog.csdn.net/dafei1288/article/details/98919202

典型模式: 分组的流 vs 非分组的流。唯一的区别是分组的stream调用keyBy(…)和window(…),而非分组的stream中window()换成了windowAll(…) stream.keyby(…).window(….).trigger(new XXX).apply(…)

窗口内处理

ProcessWindowFunction

窗口的触发器

Trigger抽象类,定义了窗口是何时被触发并同时决定触发行为(对窗口进行清理或者计算)。

TriggerContext 接口(定义在Trigger类中),用于维持状态,注册定时器等:

TriggerResult 枚举类,用于决定窗口在触发后的行为:

多次触发的用法: 实现一个Trigger的派生类XXX,将其作用在窗口操作后的.trigger(new XXX)中。比如stream.keyby(…).window(….).trigger(new XXX).apply(…)

窗口的驱逐器

在窗口apply前后允许删除窗口里特定的元素。

watermark

watermark是和Event Time一起使用的一个概念。由于消息自身的时间和消息被flink处理的时间往往是不同的,为了准确地表达数据的处理进度,出现了水印的概念。

水印就是一个时间戳,可以给每个消息添加一个 允许一定延迟 的时间戳。

watermark是用于处理乱序事件的,通常用watermark机制结合window来实现。

水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。

DataStream.assignTimestampsAndWatermarks()方法来提取事件时间并同时产生水印。

当我们把消息生产的时间戳赋值给水印值,就意味着水印值一定能够表示消息生产的先后顺序。

AscendingTimestampExtractor 的作用? AscendingTimestampExtractor 产生的时间戳和水印必须是单调非递减的,用户通过覆写extractAscendingTimestamp()方法抽取时间戳.

BoundedOutOfOrdernessTimestampExtractor 的作用? BoundedOutOfOrdernessTimestampExtractor 产生的时间戳和水印是允许“有界乱序”的,构造它时传入的参数maxOutOfOrderness就是乱序区间的长度,而实际发射的水印为通过覆写extractTimestamp()方法提取出来的时间戳减去乱序区间,相当于让水印把步调“放慢一点”。这是Flink为迟到数据提供的第一重保障。

需要深刻理解这几个概念才行: https://www.jianshu.com/p/c612e95a5028

source

理论上,flink任务的输入可以从任何介质来。

流式source

flink中的source作为整个stream中的入口,而sink作为整个stream的终点。

SourceFunction为所有flink中source的根接口,其定义了run()方法和cancel()方法。

addSource(sourceFunction)

Source表

source表的出处可能是 Kafka 的 topics,数据库,文件系统,或者任何其它 Flink 知道如何消费的系统。

一个table必须被注册(createTemporaryView)到 TableEnvironment 里去才可以被后续查询使用。

一个Table可以来自于 TableSource, Table, CREATE TABLE statement, DataStream. 或者可以register catalogs in a TableEnvironment to specify the location of the data sources.

sink

理论上,flink任务的输出可以写入到任何介质去。

流式sink

Sink是流的重点,根接口是sinkFunction。

其重要的方法是invoke()方法,用以实现结果数据的处理逻辑

SinkFunction 是一个接口,类似于SourceFunction接口。SinkFunction中主要包含一个方法,那就是用于数据输出的invoke 方法,每条记录都会执行一次invoke方法,用于执行输出操作。

addSink(sinkFunction)

表sink

TableSink 是一个通用接口,用于支持以多种文件格式(如 CSV、Apache Parquet、Apache Avro),向不同存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如 Apache Kafka、RabbitMQ)输出。

Flink BulkWriter

Connector 与外部系统的连接器

用于支持与其他组件数据连接的 source 和 sink。比如和kafka连接,比如和Hadoop连接,比如和RaddbitMQ连接。

其中最为常用的当属Flink kafka connector。 此外,Apache Bahir 项目中也提供了更多连接器。

针对不同的Flink API也有不同的连接器, Datastream Connector 和 Table API Connector。

FlinkKafkaConsumer

1.12官方文档: https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html

kafka 中数据都是以二进制 byte 形式存储的。读到 Flink 系统中之后,需要将二进制数据转化为具体的 java、scala 对象。所以需要实现一个 schema 类,定义如何序列化和反序列数据。

反序列化时需要实现 DeserializationSchema 接口,并重写 deserialize(byte[] message) 函数。 如果是反序列化 kafka 中 kv 的数据时,需要实现 KeyedDeserializationSchema 接口,并重写 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) 函数。

问题:kafka生产限速的时候,flink sink是什么效果? ERROR-Expiring xx record(s) for xxx:600346 ms has passed since batch creation

kafka offset checkpoint

读kafka重要的是设置offset:

Flink的kafka consumer一共有三种offset commit模式(FlinkKafkaConsumer基类的成员):

StreamingFileSink checkpoint

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/streamfile_sink/

文件系统重要的是bucketassign和rollingPolicy

TableFactory

TableFactory 用来创建与table相关的实例工厂接口,实例的来源来自字符串形式的properties。

实现该接口的类应该被这样添加: Classes that implement this interface can be added to the “META_INF/services/org.apache.flink.table.factories.TableFactory” file of a JAR file in the current classpath to be found.

和传统SQL不同,Flink SQL设计成的是一个批流一体的SQL。 使得查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。 一直以来SQL都是用来处理关系型批量数据的,而不是处理流式数据。尽管存在这些差异,但是使用关系查询和 SQL 处理流并不是不可能的。

Flink SQL 工作机制: https://zhuanlan.zhihu.com/p/150473300

Flink SQL如何实现数据流的join: http://www.whitewood.me/2019/12/15/Flink-SQL-%E5%A6%82%E4%BD%95%E5%AE%9E%E7%8E%B0%E6%95%B0%E6%8D%AE%E6%B5%81%E7%9A%84-Join/

Streaming SQL

批式模型和流式模式

Table API 和 SQL API使用的差别

批流一体的概念中,SQL是真正一体的,Environment、Source、Sink并不是一体的。

将流式概念转为批流一体的Table概念 不管流式数据源还是批式数据源,进入到Flink SQL里,都以table的概念来表达。

table概念还可以再转回datastream

tableEnv.connect(new FileSystem().path(filePath))
        .withFormat(new Csv()) //withFormat 是用来告诉flink我应该怎么处理来源用的每一条数据 比如按csv的格式,号分割
        .withSchema(new Schema() //withSchema 是声明创建表的表结构 要按照解析得到的数据的先后顺序对应
        .field("id", DataTypes.STRING())
        .field("time", DataTypes.BIGINT())
        .field("temp", DataTypes.DOUBLE()))
        .createTemporaryTable("inputTable");

https://www.cnblogs.com/21airuirui1123/p/14644933.html

自定义 SQL Connector

  1. 自定义Factory,根据需要实现StreamTableSourceFactory和StreamTableSinkFactory
  2. 根据需要继承 ConnectorDescriptorValidator ,定义自己的connector参数(with 后面跟的那些)
  3. Factory中的requiredContext、supportedProperties都比较重要,框架中对Factory的过滤和检查需要他们
  4. 需要自定义个TableSink,根据你需要连接的中间件选择是AppendStreamTableSink、Upsert、Retract,并重写consumeDataStream方法
  5. 自定义一个SinkFunction,在invoke方法中实现将数据写入到外部中间件。

CollectionTableFactory

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sourcessinks/

SQL语法扩展

https://www.jianshu.com/p/623266b941de

DataSet API

从flink-1.12版本开始DataSetAPI正在废弃中。

Python API

PythonAPI,就是 PyFlink, 换了开发语言,其API也分为 PyFlink Table API 和 PyFlink DataStream API 两种。

UDF

Table API/SQL UDF

DataStreamAPI UDF

Flink部署

standalone mode

mac环境下: 用brew安装flink, $ brew install apache-flink

yarn mode

用户任务通过flink命令提交到 yarn 管理的集群上

kubernetes mode

用户任务通过flink命令提交到 k8s 管理的集群上

java/python/sql 任务提交方式

Flink监控

Flink自带的dashboard

flink自定义metric

flink metric类型分为Counter、Gauge、Histogram、Meter

第一,常用的如 Counter,写过 mapreduce 作业的开发人员就应该很熟悉 Counter,其实含义都是一样的,就是对一个计数器进行累加,即对于多条数据和多兆数据一直往上加的过程。 第二,Gauge,Gauge 是最简单的 Metrics,它反映一个值。比如要看现在 Java heap 内存用了多少,就可以每次实时的暴露一个 Gauge,Gauge 当前的值就是 heap 使用的量。 第三,Meter,Meter 是指统计吞吐量和单位时间内发生“事件”的次数。它相当于求一种速率,即事件次数除以使用的时间。 第四,Histogram,Histogram 比较复杂,也并不常用,Histogram 用于统计一些数据的分布,比如说 Quantile、Mean、StdDev、Max、Min 等。

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html

日志

JM日志

TM日志

flink cluster log

flink jobs log

configuration trace log

Flink资源规划

CPU和内存

Task slot

每个Flink TaskManager在集群中提供处理槽。 插槽的数量通常与每个TaskManager的可用CPU内核数成比例。一般情况下你的slot数是你每个TM的cpu的核数。

apus.slotmanager.slot-placement-policy SLOT

并行度设定

设置parallelism的防范优先级是:算子(operator)级别 > 运行环境级别 > 客户端级别 > 系统级别

setParallelism 设置一个job或一个算子op的并发度。

setMaxParallelism 控制的是状态后端中keyed-state可以被分配的task最大个数。

特殊配置项

集群配置项

参考: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html 参考: https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/config/

Table 环境配置项

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/config/

https://cloud.tencent.com/developer/article/2026134

Flink程序调试

问题:Flink程序出了问题怎么定位原因? 现象->本质

背压/反压的原因

反压和checkpoint的关联

https://stackoverflow.com/questions/61311010/flink-checkpoints-causes-backpressure

checkpoint失败原因

checkpoint慢/超时的原因

https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/ 任何在Flink算子之间流转的数据、任何在算子状态内缓存的数据,都必须提供序列化和反序列化机制。

Flink HelloWorld

下面是一些最为简单的例子程序

Basic Commands

cd /usr/local/Cellar/apache-flink/1.9.1 && ./libexec/bin/start-cluster.sh

./bin/flink run -c com.aaa.worldcount xxx.jar --host localhost --port 7777

./bin/flink list --all

./bin/flink cancel :job_id

./bin/flink run -s :savepointPath

scala code

import org.apache.flink.api.scala._

object FlinkWordCount {
  def main(args:Array[String]):Unit = {
    //val env = ExecutionEnvironment.getExecutionEnvironment;
    val env = ExecutionEnvironment.createRemoteEnvironment("flink-master", 6123, "D:\\CodeBase\\jvmlearning\\flink-learning\\target\\flink-learning-1.0-SNAPSHOT.jar")
    
	val text = env.readTextFile("hdfs://flink-master:9000/user/flink/input/SogouQ.sample")
    
	println(text.count())
    
	val counts = text.flatMap {  _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)
    
	//env.execute()
    println(counts.count())
    //println(env.getExecutionPlan());
    //counts.print()
  }
}

java code

这是我在IDEA上编译运行的第一个flink程序(maven构建)。

可以通过在命令行 $ nc -lk 9000 来往flink程序里输入字节流。

package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {
        // Create the execution environment.
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Get the input data by connecting the socket.
        // Here it is connected to the local port 9000.
        // If the port 9000 has been already occupied, change to another port.
        DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");
        // Parse the data, and group, windowing and aggregate it by word.
        DataStream<Tuple2<String, Integer>> windowCounts = text
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(10))
                .sum(1);
        // Print the results to the console, note that here it uses the single-threaded printing instead of multi-threading
        windowCounts.print().setParallelism(1);
        env.execute("Socket Window WordCount");
    }
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 添加自定义数据源
        DataStreamSource<Person> data = env.addSource(new MyMysqlSource());
        data.print().setParallelism(2);
        data.addSink(new MyMysqlSink());
        // 提交执行任务
env.execute("MySourceMysql");

首先你得把java或scala程序变成jar包,直接使用mave的package功能(注意maven指定的jdk版本要和运行时的版本一致)。

打开 http://localhost:8081/#/overview ,在Web界面提交job。

然后在Task Manager里面就可以看到自己提交的job,其日志和标准输出都可以看到。

Flink源码剖析

从 StreamGraph 转化为 JobGrpah 的过程 : 节点(StreamNode)合并,避免无意义的跨节点通信

StreamingJobGraphGenerator: 生成 JobGraph

Flink SQL 利用 Apache Calcite 将 SQL 翻译为关系代数表达式,使用表达式折叠(Expression Reduce),下推优化(Predicate / Projection Pushdown )等优化技术生成物理执行计划(Physical Plan),利用 Codegen 技术生成高效执行代码。

基于calcite的编译流程

SQL背后对应的执行算子

Flink背后的依赖

Flink流控机制

https://www.jianshu.com/p/c8b3e32a9fa3

Flink周边

Alink是基于Flink的通用算法平台 https://github.com/alibaba/Alink

Alink 最大的亮点是有流式算法和在线学习

AlgoOperator: AlgoOperator有两个子类,BatchOperator和StreamOperator:所有的批式机器学习算法都会继承BatchOperator,所有的流式机器学习算法都会继承StreamOperator

Alink算法库中最重要的是 Iterative Communication/Computation Queue (简称ICQ),是我们面向迭代计算场景总结的一套迭代通信计算框架,它集成了内存缓存技术和内存数据通信技术。我们把每个迭代步抽象为多个ComQueueItem(通信模块与计算模块)串联形成的队列。

BatchOperator

FlatMapStreamOp

FlinkML是1.8之前的一个机器学习组件,在1.9开始已经不存在了。 https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/ml/index.html

https://zhuanlan.zhihu.com/p/114717285 https://github.com/uncleguanghui/pyflink_learn/blob/master/examples/README.md

PyFlink是 apache-flink项目里的一个还未成熟的部分。

Tensorflow 官方有java版本的调用库,JVM单机训练、单机预测都没问题。如何分布式训练是个问题。 flink-ai-extended

参考学习

https://www.flink-forward.org/

failover单点恢复而不进行全局恢复

实时数仓

hudi 支持基于主键的upsert/delete

数据湖框架

PyFlink

Flink AL Extended

应用API

14支持同一个应用中混合使用有界流和无界流

批执行模式现在支持在同一应用中混合使用 DataStream API 和 SQL/Table API(此前仅支持单独使用 DataStream API 或 SQL/Table API)

状态

state-backend优化

cp、snapshot优化: 不被流动缓慢的中间数据阻塞 unaligned checkpoint ; 更少的中间数据 buffer debloating; Log Based(类似WAL) 将snapshot和uploading剥离开, cp不再受限于具体的state backend;

容错处理:机器学习场景对数据一致性的要求是弱化的, Task-Local Recovery; rescaling vs failover

流批一体

面向流批一体的 Flink Runtime: 流执行模式(增量处理)、批执行模式(算子逐级允许)

流批一体API

流批一体存储: iceberg

流批一体认知: 只实现流批计算统一、只实现流批存储统一、 计算和存储均统一

插件化shuffle api, remote shuffle

从各种数据库中获取变更流并接入到Flink

上游 mysql、mongodb

下游 clickhouse、hudi、iceberg

实时物化视图

实时数仓→流式数仓

基于 Flink Dynamic Table 构建流批一体数仓

Flink tableStore 流批一体动态表存储 具体是怎么样的实现?

Retry Lookup

*****
Written by Lu.dev on 05 March 2020