diff --git a/doc/_toc.yml b/doc/_toc.yml index e493f19..2efcf09 100644 --- a/doc/_toc.yml +++ b/doc/_toc.yml @@ -44,6 +44,12 @@ subtrees: - file: ch-state-checkpoint/savepoint - file: ch-state-checkpoint/state - file: ch-state-checkpoint/exercise-state + - file: ch-flink-connectors/index + entries: + - file: ch-flink-connectors/Exactly-Once-guarantee + - file: ch-flink-connectors/custom-source-and-sink + - file: ch-flink-connectors/flink-connector + - file: ch-flink-connectors/exercise-stock-price-data-stream - file: ch-table-sql/index entries: - file: ch-table-sql/table-overview @@ -53,4 +59,12 @@ subtrees: - file: ch-table-sql/sql-ddl - file: ch-table-sql/system-function - file: ch-table-sql/catalog-function - - file: ch-table-sql/exercise-iot \ No newline at end of file + - file: ch-table-sql/exercise-iot + - file: ch-deployment-and-configuration/index + entries: + - file: ch-deployment-and-configuration/flink-deployment-and-configuration + - file: ch-deployment-and-configuration/configuration-file + - file: ch-deployment-and-configuration/operator-chaining-and-slot-sharing + - file: ch-deployment-and-configuration/flink-command-line-interface-guide + - file: ch-deployment-and-configuration/hadoop-flink + - file: ch-deployment-and-configuration/experiment-job-coding-packaging-and-submission \ No newline at end of file diff --git a/doc/ch-big-data-intro/batch-processing-and-stream-processing.md b/doc/ch-big-data-intro/batch-processing-and-stream-processing.md index 76e9414..588b81a 100644 --- a/doc/ch-big-data-intro/batch-processing-and-stream-processing.md +++ b/doc/ch-big-data-intro/batch-processing-and-stream-processing.md @@ -5,7 +5,7 @@ 在大数据的5个“V”中我们已经提到,数据量大且产生速度快。从时间维度来讲,数据源源不断地产生,形成一个无界的数据流(Unbounded Data Stream)。如图1-5所示,单条数据被称为事件(Event),事件按照时序排列会形成一个数据流。例如,我们每时每刻的运动数据都会累积到手机传感器上,金融交易随时随地都在发生,物联网(Internet of Things,IoT)传感器会持续监控并生成数据。 -![图1-5 数据和数据流](./img/) +![图1-5 数据和数据流](./img/data-and-data-stream.png) 数据流中的某段有界数据流(Bounded Data Stream)可以组成一个数据集。我们通常所说的对某份数据进行分析,指的是对某个数据集进行分析。随着数据的产生速度越来越快,数据源越来越多,人们对时效性的重视程度越来越高,如何处理数据流成了大家更为关注的问题。 @@ -35,7 +35,7 @@ 处理流数据一般使用“生产者-消费者”(Producer-Consumer)模型来解决问题。如图1-6所示,生产者生成数据,将数据发送到一个缓存区域(Buffer),消费者从缓存区域中消费数据。这里我们暂且不关心生产者如何生产数据,以及数据如何缓存,我们只关心如何实现消费者。 -![图1-6 生产者-消费者模型](./img/) +![图1-6 生产者-消费者模型](./img/producer-consumer.png) 在股票交易的场景中,我们可以启动一个进程来实现消费者,该进程以10秒为一个时间窗口,统计时间窗口内的交易情况,找到波动最大的那些股票。同时,该进程也对新流入的媒体文本进行分析。这个逻辑看起来很容易实现,但深挖之后会发现问题繁多。 diff --git a/doc/ch-big-data-intro/bigdata.md b/doc/ch-big-data-intro/bigdata.md index fdbd3bc..50b1edd 100644 --- a/doc/ch-big-data-intro/bigdata.md +++ b/doc/ch-big-data-intro/bigdata.md @@ -5,7 +5,7 @@ 大数据,顾名思义,就是拥有庞大体量的数据。关于什么是大数据、如何定义大数据、如何使用大数据等一系列问题,拥有不同领域背景的读者的理解各不相同。通常,业界将大数据的特点归纳为图1-1所示的5个“V”。 -![图1-1 大数据的5个"V"](./img/) +![图1-1 大数据的5个"V"](./img/5V.png) - **Volume**:指数据量大。数据量单位从TB(1 024 GB)、PB(1 024 TB)、EB(1 024 PB)、ZB(1 024 EB)甚至到YB(1 024 ZB)。纽约证券交易所每天产生的交易数据大约在TB级,瑞士日内瓦附近的大型强子对撞机每年产生的数据约为PB级,而目前全球数据总量已经在ZB级,相当于1 000 000 PB。基于更大规模的数据,我们可以对某个研究对象的历史、现状和未来有更加全面的了解。 - **Velocity**:指数据产生速度快。数据要求的处理速度更快和时效性更强,因为时间就是金钱。金融市场的交易数据必须以秒级的速度进行处理,搜索和推荐引擎需要以分钟级速度将实时新闻推送给用户。更快的数据处理速度可让我们基于最新的数据做出更加实时的决策。 diff --git a/doc/ch-big-data-intro/img/5V.png b/doc/ch-big-data-intro/img/5V.png new file mode 100644 index 0000000..3d2502a Binary files /dev/null and b/doc/ch-big-data-intro/img/5V.png differ diff --git a/doc/ch-big-data-intro/img/data-and-data-stream.png b/doc/ch-big-data-intro/img/data-and-data-stream.png new file mode 100644 index 0000000..11f9db2 Binary files /dev/null and b/doc/ch-big-data-intro/img/data-and-data-stream.png differ diff --git a/doc/ch-big-data-intro/img/hadoop.png b/doc/ch-big-data-intro/img/hadoop.png new file mode 100644 index 0000000..88013b7 Binary files /dev/null and b/doc/ch-big-data-intro/img/hadoop.png differ diff --git a/doc/ch-big-data-intro/img/spark-streaming-mini-batch.png b/doc/ch-big-data-intro/img/spark-streaming-mini-batch.png new file mode 100644 index 0000000..cc707b8 Binary files /dev/null and b/doc/ch-big-data-intro/img/spark-streaming-mini-batch.png differ diff --git a/doc/ch-big-data-intro/img/spark.png b/doc/ch-big-data-intro/img/spark.png new file mode 100644 index 0000000..791292d Binary files /dev/null and b/doc/ch-big-data-intro/img/spark.png differ diff --git a/doc/ch-big-data-intro/representative-big-data-technologies.md b/doc/ch-big-data-intro/representative-big-data-technologies.md index f7c6408..2dfcbd3 100644 --- a/doc/ch-big-data-intro/representative-big-data-technologies.md +++ b/doc/ch-big-data-intro/representative-big-data-technologies.md @@ -20,7 +20,7 @@ Hadoop生态圈的核心组件主要有如下3个。 - **Kafka**:Kafka是一款流处理框架,主要用作消息队列。 - **ZooKeeper**:Hadoop生态圈中很多组件使用动物来命名,形成了一个大型“动物园”,ZooKeeper是这个动物园的管理者,主要负责分布式环境的协调。 -![图1-7 Hadoop生态圈](./img/) +![图1-7 Hadoop生态圈](./img/hadoop.png) ## 1.3.2 Spark @@ -35,11 +35,11 @@ Spark的核心在于计算,主要目的在于优化Hadoop MapReduce计算部 Spark并不能完全取代Hadoop,实际上,从图1-7可以看出,Spark融入了Hadoop生态圈,成为其中的重要一员。一个Spark任务很可能依赖HDFS上的数据,向YARN申请计算资源,将结果输出到HBase上。当然,Spark也可以不用依赖这些组件,独立地完成计算。 -![图1-8 Spark生态圈](./img/) +![图1-8 Spark生态圈](./img/spark.png) Spark主要面向批处理需求,因其优异的性能和易用的接口,Spark已经是批处理界绝对的“王者”。Spark的子模块Spark Streaming提供了流处理的功能,它的流处理主要基于mini-batch的思想。如图1-9所示,Spark Streaming将输入数据流切分成多个批次,每个批次使用批处理的方式进行计算。因此,Spark是一款集批处理和流处理于一体的处理框架。 -![图1-9 Spark Streaming mini-batch处理](./img/) +![图1-9 Spark Streaming mini-batch处理](./img/spark-streaming-mini-batch.png) ## 1.3.3 Apache Kafka @@ -49,7 +49,7 @@ Kafka也是一种面向大数据领域的消息队列框架。在大数据生态 如图1-10所示,企业中不同的应用系统作为数据生产者会产生大量数据流,这些数据流还需要进入不同的数据消费者,Kafka起到数据集成和系统解耦的作用。系统解耦是让某个应用系统专注于一个目标,以降低整个系统的维护难度。在实践上,一个企业经常拆分出很多不同的应用系统,系统之间需要建立数据流管道(Stream Pipeline)。假如没有Kafka的消息队列,M个生产者和N个消费者之间要建立M×N个点对点的数据流管道,Kafka就像一个中介,让数据管道的个数变为M+N,大大减小了数据流管道的复杂程度。 -![图1-10 Kafka可以连接多个应用系统](./img/) +![图1-10 Kafka可以连接多个应用系统](./img/kafka.png) 从批处理和流处理的角度来讲,数据流经Kafka后会持续不断地写入HDFS,积累一段时间后可提供给后续的批处理任务,同时数据流也可以直接流入Flink,被用于流处理。 diff --git a/doc/ch-deployment-and-configuration/configuration-file.md b/doc/ch-deployment-and-configuration/configuration-file.md new file mode 100644 index 0000000..9b36bd0 --- /dev/null +++ b/doc/ch-deployment-and-configuration/configuration-file.md @@ -0,0 +1,129 @@ +(configuration-file)= +# 配置文件 + +在前文的介绍中,我们曾多次提到Flink主目录下的`conf/flink-conf.yaml`文件,这个文件在作业配置中起到了至关重要的作用。 + +`flink-conf.yaml`是一个YAML配置文件,文件里使用Key-Value来设置一些参数。这个文件会被很多Flink进程读取,文件改动后,相关进程必须重启才能生效。例如,9.1节中提到,Standalone集群使用`bin/start-cluster.sh`脚本启动时,会读取Master的IP地址等。从官网下载的`flink-conf.yaml`文件已经对一些参数做了配置,这些配置主要针对的是单机环境,如果用户在集群环境中使用它,就需要修改一些配置。 + +本节将从Java、CPU、内存、磁盘等几大方向来介绍一些常用的配置。由于配置众多,无法一一列举,用户需要阅读Flink官方文档来进行更多个性化配置。 + +## 9.2.1 Java和类加载 + +在安装Java时,我们一般会将Java的路径以`$JAVA_HOME`的形式添加到环境变量`$PATH`中,默认情况下,Flink使用环境变量中的Java来运行程序。或者在`flink-conf.yaml`中设置`env.java.home`参数,使用安装到某个位置的Java。 + +`env.java.opts`设置所有Flink JVM进程参数,`env.java.opts.jobmanager`和`env.java.opts.taskmanager`分别设置JobManager和TaskManager的JVM进程参数。下面的配置使得所有Flink JVM进程使用并发垃圾回收器。 + +```yaml +env.java.opts: -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 +``` + +类加载(Classloading)对于很多应用开发者来说可能不需要过多关注,但是对于框架开发者来说需要非常小心。类加载的具体作用是将Java的`.class`文件加载到JVM虚拟机中。我们知道,当Java程序启动时,要使用`-classpath`参数设置从某些路径上加载所需要的依赖包。 + +```sh +$ java -classpath ".;./lib/*" +``` + +上面的命令将当前目录`.`和当前目录下的文件夹`./lib`两个路径加载进来,两个路径中的包都能被引用。一个Java程序需要引用的类库和包很多,包括JDK核心类库和各种第三方类库,JVM启动时,并不会一次性加载所有JAR包中的`.class`文件,而是动态加载。一个Flink作业一般主要加载下面两种类。 + +- **Java Classpath**:包括JDK核心类库和Flink主目录下`lib`文件夹中的类,其中`lib`文件夹中一般包含一些第三方依赖,比如Hadoop依赖。 +- **用户类(User Code)**:用户编写的应用作业中的类,这些用户源码被打成JAR包,每提交一个作业时,相应的JAR包会被提交。 + +向集群提交一个Flink作业时,Flink会动态加载这些类,隐藏一些不必要的依赖,以尽量避免依赖冲突。常见的类依赖加载策略有两种:子类优先(Child-first)和父类优先(Parent-first)。 + +- **Child-first**:Flink会优先加载用户编写的应用作业中的类,然后再加载Java Classpath中的类。Parent-first:Flink会优先加载Java Classpath中的类。Flink默认使用Child-first策略,`flink-conf.yaml`的配置为:`classloader.resolve-order: child-first`。这种策略的好处是,用户在自己的应用作业中所使用的类库可以和Flink核心类库不一样,在一定程度上避免依赖冲突。这种策略适合绝大多数情况。 + +但是,Child-first策略在个别情况下也有可能出问题,这时候需要使用Parent-first策略,`flink-conf.yaml`的配置为:`classloader.resolve-order: parent-first`。Parent-first也是Java默认的类加载策略。 + +**注意** + +有些类加载的过程中总会使用Parent-first策略。`classloader.parent-first-patterns.default`配置了必须使用Parent-first策略的类,如下。 + +```yaml +java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.; +javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache. +commons.logging;ch.qos.logback;org.xml;javax.xml;org.apache.xerces;org.w3c +``` + +`classloader.parent-first-patterns.default`列表最好不要随便改动,如果想要添加一些需要使用Parent-first的类,应该将那些类放在`classloader.parent-first-patterns.additional`中,类之间用分号`;`隔开。在加载过程中,`classloader.parent-first-patterns.additional`列表中的类会追加到`classloader.parent-first-patterns.default`列表后。 + +## 9.2.2 并行度与槽位划分 + +在第3章中我们已经介绍过Flink的JobManager和TaskManager的功能,其中TaskManager运行具体的计算。每个TaskManager占用一定的CPU和内存资源,一个TaskManager会被切分为一到多个Slot,Slot是Flink运行具体计算任务的最小单元。如果一个作业不进行任何优化,作业中某个算子的子任务会被分配到一个Slot上,这样会产生资源浪费。9.3节会介绍,多个子任务可以连接到一起,放在一个Slot中运行。最理想的情况下,一个Slot上运行着一个作业所有算子组成的流水线(Pipeline)。 + +前文中我们曾多次提到并行度的概念:如果一个作业的并行度为`parallelism`,那么该作业的每个算子都会被切分为`parallelism`个子任务。如果作业开启了算子链和槽位共享,那么这个作业需要`parallelism`个Slot。所以说,一个并行度为`parallelism`的作业至少需要`parallelism`个Slot。 + +在`flink-conf.yaml`中,`taskmanager.numberOfTaskSlots`配置一个TaskManager可以划分成多少个Slot。默认情况下它的值为1。对于Standalone集群来说,官方建议将参数值配置为与CPU核心数相等或成比例。例如,这个参数值可以配置为CPU核心数或CPU核心数的一半。TaskManager的内存会平均分配给每个Slot,但并没有将某个CPU核心绑定到某个Slot上。或者说,TaskManager中的多个Slot是共享多个CPU核心的,每个Slot获得TaskManager中内存的一部分。 + +关于如何配置`taskmanager.numberOfTaskSlots`参数,其实并没有一个绝对的准则。每个TaskManager下有一个Slot,那么该Slot会独立运行在一个JVM进程中;每个TaskManager下有多个Slot,那么多个Slot同时运行在一个JVM进程中。TaskManager中的多个Slot可以共享TCP连接、“心跳信息”以及一些数据结构,这在一定程度上减少了一些不必要的消耗。但是,我们要知道,Slot是以线程为基本计算单元的,线程的隔离性相对较差,一个线程中的错误可能导致整个JVM进程崩溃,运行在其上的其他Slot也会被波及。假如一个TaskManager下只有一个Slot,因为TaskManager是一个进程,进程之间的隔离度较好,但这种方式下,作业性能肯定会受到影响。 + +Standalone集群一般部署在物理机或多核虚拟机上,对CPU的资源划分粒度比较粗,所以官方建议把`taskmanager. numberOfTaskSlots`参数值配置为CPU核心数。YARN和Kubernetes这些调度平台对资源的划分粒度更细,可以精确地将CPU核心分配给Container,比如可以配置单CPU的Container节点给只有一个Slot的TaskManager使用。Flink YARN可以使用`yarn.containers.vcores`配置每个Container中CPU核心的数量,默认情况下它的值等于`taskmanager.numberOfTaskSlots`。Flink Kubernetes可以使用`kubernetes.taskmanager.cpu`配置单个TaskManager的CPU数量,默认情况下它的值等于`taskmanager.numberOfTaskSlots`。 + +总结下来,关于计算的并行划分,有两个参数是可以配置的:作业并行度和TaskManager中Slot的数量。作业的并行度可以在用户代码中配置,也可以在提交作业时通过命令行参数配置。TaskManager中Slot数量通过`taskManager.numberOfTaskSlots`配置。假设作业开启了算子链和槽位共享,该作业的TaskManager数量为: + +```markdown +可以肯定的是,作业并行划分并不能一蹴而就,需要根据具体情况经过一些调优后才能达到最佳状态。这包括使用何种部署方式、部署时如何给TaskManager划分资源、如何配置`taskManager. +numberOfTaskSlots`,以及如何进行JVM调优等。在YARN和Kubernetes这样的部署环境上,一个简单、易上手的部署方式是:配置`taskmanager.numberOfTaskSlots`为1,给每个Container申请的CPU数量也为1,提交作业时根据作业的数据量大小配置并行度。Flink会根据上述参数分配足够的TaskManager运行该作业。 +``` + +## 9.2.3 内存 + +### 1. 堆区内存和堆外内存 + +内存管理是每个Java开发者绕不开的话题。在JVM中,内存一般分为堆区(On-heap或Heap)内存和堆外(Off-heap)内存。在一个JVM程序中,堆区是被JVM虚拟化之后的内存空间,里面存放着绝大多数Java对象的实例,被所有线程共享。Java使用垃圾回收(Garbage Collection,GC)机制来清理内存中的不再使用的对象,堆区是垃圾回收的主要工作区域。内存经过垃圾回收之后,会产生大量不连续的空间,在某个时间点,JVM必须进行一次彻底的垃圾回收(Full GC)。Full GC时,垃圾回收器会对所有分配的堆区内存进行完整的扫描,扫描期间,绝大多数正在运行的线程会被暂时停止。这意味着,一次Full GC对一个Java应用造成的影响,跟堆区内存所存储的数据多少是成正比的,过大的堆区内存会影响Java应用的性能。例如,一个Java应用的堆区内存大小大于100GB,Full GC会产生分钟级的卡顿。 + +然而,在大数据时代,一个Java应用的堆区内存需求会很大,使用超过100GB大小的内存的情况比比皆是。因此,如果一个程序只使用堆区内存会产生一个悖论,即如果开辟的堆区内存过小,数据超过了内存限制,会抛出OutOfMemoryError异常(简称OOM问题),影响系统的稳定;如果堆区内存过大,GC时会经常卡顿,影响系统的性能。一种解决方案是将一部分内存对象迁移到堆外内存上。堆外内存直接受操作系统管理,可以被其他进程和设备访问,可以方便地开辟一片很大的内存空间,又能解决GC带来的卡顿问题,特别适合读/写操作比较频繁的场景。堆外内存虽然强大,但也有其负面影响,比如:堆外内存的使用、监控和调试更复杂,一些操作在堆外内存上会比较慢。 + +图9-5是对Flink内存模型的划分示意图。无论Flink的JobManager还是TaskManager都是一个JVM进程,整个Flink JVM进程的内存(见图9-5中Total Process Memory部分)包括两大部分:Flink占用的内存(见图9-5中Total Flink Memory部分)和JVM相关内存(见图9-5中JVM Specific Memory部分)。JVM Specific Memory是绝大多数Java程序都需要的一块内存区域,比如各个类的元数据会放在该区域。Total Flink Memory是Flink能使用到的内存,Total Flink Memory又包括JVM堆区内存(见图9-5中JVM Heap部分)和堆外内存(见图9-5中Off-heap Memory部分)。Off-heap Memory包括一些Flink所管理的内存(见图9-5中Flink Managed Memory部分),一般主要在TaskManager上给个别场景使用,Off-heap Memory另外一部分主要给网络通信缓存使用的内存(见图9-5中Direct Memory)。 + +![图9-5 Flink内存模型](./img/memory-model.png) + +**注意** + +Flink 1.10开始对内存管理和设置进行了一次较大改动,相关的配置与之前的版本有明显不同,这里只介绍Flink 1.10版本以后的内存配置方法。从老版本迁移过来的朋友也应该注意修改内存配置,否则会出现错误。 + +从框架的角度来看,Flink将内存管理部分做了封装,用户在绝大多数情况下其实可以不用关注数据到底是如何写入内存的。但对于一些数据量较大的作业,了解Flink的内存模型还是非常有必要的。 + +### 2. Master的内存配置 + +在具体的内存管理问题上,Flink的Master和TaskManager有所区别。Master中的组件虽然比较多,但是整体来说占用内存不大。ResourceManager主要负责计算资源的管理、Dispatcher负责作业分发、JobManager主要协调某个作业的运行,这些组件无须直接处理数据。TaskManager主要负责数据处理。相比之下,Master对内存的需求没有那么苛刻,TaskManager对内存的需求很高。 + +一个最简单的配置方法是设置Master进程的Total Process Memory,参数项为`jobmanager.memory.process.size`。配置好Total Process Memory后,Flink有一个默认的分配比例,会将内存分配给各个子模块。另一个比较方便的方式是设置Total Flink Memory,即Flink可用内存,参数项为`jobmanager.memory.flink.size`。Total Flink Memory主要包括了堆区内存和堆外内存。堆区内存包含了Flink框架运行时本身所占用的内存空间,也包括JobManager运行过程中占用的内存。如果Master进程需要管理多个作业(例如Session部署模式下),或者某个作业比较复杂,作业中有多个算子,可以考虑增大Total Flink Memory。 + +### 3. TaskManager的内存配置 + +因为TaskManager涉及大规模数据处理,TaskManager的内存配置需要用户花费更多的精力。TaskManager的内存模型主要包括图9-6所示的组件。 + +![图9-6 TaskManager内存模型](./img/taskmanager-memory-model.png) + +如图9-6右侧所示单独对Total Flink Memory做了拆解。Total Flink Memory又包括JVM堆区内存和堆外内存。无论是堆区内存还是堆外内存,一部分是Flink框架所占用的,即Framework Heap和Framework Off-heap,这部分内存在计算过程中是给Flink框架使用的,作业实际所用到的Slot无法占用这部分资源。Flink框架所占用的内存一般比较固定。另一部分是当前计算任务所占用的,即Task Heap、Task Off-heap、Flink Managed Memory和Network。一个用户作业的绝大多数用户代码都运行在Task Heap区,因此Task Heap区的大小需要根据用户作业调整。 + +Flink专门开辟了一块堆外内存(见图9-6所示的Flink Managed Memory部分),用来管理一部分特殊的数据。Flink Managed Memory主要用途为:流处理下RocksDB的State Backend,批处理下排序、中间数据缓存等。RocksDB是第三方的插件,它不占用堆区内存。而MemoryStateBackend和FsStateBackend的本地状态是基于Task Heap区域的。如果流处理作业没有使用RocksDB,或者流处理作业没有状态数据,Flink Managed Memory这部分内存可以为零,以避免资源浪费。 + +Flink的网络传输基于Netty库,Netty以一块堆外内存(见图9-6所示的Network部分)作为缓存区。当TaskManager进程之间需要进行数据交换时,例如进行数据重分布或广播操作,数据会先缓存在Network区。假如数据量大,数据交换操作多,Network区的内存压力会明显增大。 + +可以看到,Flink的TaskManager内存模型并不简单。尽管Flink社区希望提供给用户最简单易用的默认配置,但使用一套配置处理各式各样的用户作业并不现实。Flink将内存配置分为不同粒度。 + +- **粗粒度的内存配置方法**:直接配置整个TaskManager JVM进程的内存。确切地说,是配置Total Process Memory或Total Flink Memory两者中的任意一个。这就相当于,我们配置好一个总量,其余各个子模块根据默认的比例获得其相应的内存大小。从图9-6中也可以看到,Total Process Memory比Total Flink Memory多了JVM Specific Memory。对于YARN或Kubernetes这种容器化的部署方式,给Total Process Memory申请内存更精确,相应的内存直接由资源管理器交付给了Container。对于Standalone集群,给Total Flink Memory申请内存更合适,相应的内存直接交付给了Flink本身。其中,Total Process Memory使用参数`taskmanager.memory.process.size`,Total Flink Memory使用参数`taskmanager.memory.flink.size`。 + +- **细粒度的内存配置方法**:同时配置Task Heap和Flink Managed Memory两个内存。根据前文的介绍,Task Heap和Flink Managed Memory不涉及Flink框架所需内存,不涉及JVM所需内存,它们只服务于某个计算任务。这个方法可以更明确地为最需要动态调整内存的地方分配资源,而其他组件会根据比例自动调整。其中,Task Heap由`taskmanager.memory.task.heap.size`参数配置,Flink Managed Memory由`taskmanager.memory.managed.size`参数配置。 + +至此,我们介绍了3种内存配置方法:两种方法从宏观角度配置内存总量,一种方法从用户作业角度配置该作业所需量。涉及下面几个参数。 + +- `taskmanager.memory.process.size`:Total Process Memory,包括Flink内存和JVM内存,是一个进程内存消耗的总量,各子模块会按照比例配置,常用在容器化部署方式上。 +- `taskmanager.memory.flink.size`:Total Flink Memory,不包括JVM内存,只关乎Flink部分,其他模块会按照比例配置,常用在Standalone集群部署方式上。 +- `taskmanager.memory.task.heap.size`和`taskmanager.memory.managed.size`:两个参数必须同时配置,细粒度地配置了一个作业所需内存,其他模块会按照比例配置。 + +**注意** + +这3个参数不要同时配置,否则会引起冲突,导致作业运行失败。我们应该在这3个参数中选择一个来配置。 + +综上,FlinkFlink提供了大量的配置参数帮用户处理内存问题,但是实际场景千变万化,很难一概而论,内存的配置和调优也需要用户不断摸索和尝试。 + + +## 9.2.4 磁盘 +Flink进程会将一部分数据写入本地磁盘,比如:日志信息、RocksDB数据等。 +io.tmp.dirs参数配置了数据写入本地磁盘的位置。该参数所指目录中存储了RocksDB创建的文件、缓存的JAR包,以及一些中间计算结果。默认使用了JVM的参数java.io.tmpdir,而该参数在Linux操作系统一般指的是/tmp目录。YARN、Kubernetes等会使用Container平台的临时目录作为该参数的默认值。 + +**注意** + +io.tmp.dirs中存储的数据并不是用来做故障恢复的,但是如果这里的数据被清理,会对故障恢复产生较大影响。很多Linux发行版默认会定期清理/tmp目录,如果要在该操作系统上部署长期运行的Flink流处理作业,一定要记得将定期清理的开关关掉。 diff --git a/doc/ch-deployment-and-configuration/experiment-job-coding-packaging-and-submission.md b/doc/ch-deployment-and-configuration/experiment-job-coding-packaging-and-submission.md new file mode 100644 index 0000000..2dba468 --- /dev/null +++ b/doc/ch-deployment-and-configuration/experiment-job-coding-packaging-and-submission.md @@ -0,0 +1,36 @@ +(experiment-job-coding-packaging-and-submission)= +# 实验 作业编码、打包与提交 + +本章的重点是集群部署与作业提交,本实验也与此相关。 + +## 一、实验目的 + +熟悉Flink程序打包、部署、参数设置与作业提交流程。 + +## 二、实验内容 + +如果读者有超过一台节点的实验环境,可以按照9.1.1小节中Standalone集群的部署方式来部署一个多节点的集群;如果读者实验环境有限,可以继续使用单机Standalone集群。在集群的配置上,我们需要设置一些重要的参数,例如:`conf/flink-conf.yaml`文件中的`jobmanager.rpc.address`和`taskmanager.numberOfTaskSlots`,`conf/slaves`文件。 + +我们继续在第7章实验的基础上,完善该程序,并在Standalone集群上提交作业。 + +## 三、实验要求 + +- **要求1**: + 对Standalone集群进行配置,记录Standalone集群必要参数的值,启动这个集群。 + +- **要求2**: + 在本节中,我们修改第7章实验中的程序,允许该程序接收来自命令行工具的参数,参数为股票价格数据集文件的绝对路径。 + +- **要求3**: + 使用Maven对程序打包,生成JAR包,在命令行中使用Flink命令行工具提交作业:使用`-p`控制并行度,使用`-c`选择主类,增加数据集绝对路径的参数。在Flink WebUI中查看作业的详细信息:尝试不同的并行度,查看不同作业的执行区别。 + +- **要求4**: + 修改代码,尝试对不同的算子设置Slot Sharing Group。在Flink WebUI中查看作业与之前运行情况的区别,尤其是Slot数量等信息。 + +## 四、实验报告 + +将上述运行过程、程序代码以及Flink WebUI的截图整理并撰写成实验报告。 + +## 本章小结 + +通过本章的学习,读者应该掌握了Flink集群的部署模式、常用配置和作业提交方式。这些操作可以帮助读者在生产环境中执行Flink作业。 \ No newline at end of file diff --git a/doc/ch-deployment-and-configuration/flink-command-line-interface-guide.md b/doc/ch-deployment-and-configuration/flink-command-line-interface-guide.md new file mode 100644 index 0000000..7daa1a8 --- /dev/null +++ b/doc/ch-deployment-and-configuration/flink-command-line-interface-guide.md @@ -0,0 +1,115 @@ +(flink-command-line-interface-guide)= +# 命令行工具 + +在生产环境中,Flink使用命令行工具(Command Line Interface)来管理作业的执行。命令行工具本质上是一个可执行脚本,名为flink,放置在Flink的主目录下的bin文件夹中。它的功能主要包括:提交、取消作业,罗列当前正在执行和排队的作业、获取某个作业的信息,设置Savepoint等。 + +命令行工具完成以上功能的前提是,我们已经启动了一个Flink集群,命令行工具能够直接连接到这个集群上。默认情况下,命令行工具会从conf/flink-conf.yaml里读取配置信息。 + +进入Flink主目录,在Linux命令行中输入`./bin/flink`,屏幕上会输出命令行工具的使用方法。其使用方法如下面的语法所示。 + +```bash +./bin/flink [OPTIONS] [ARGUMENTS] +``` + +其中,`ACTION`包括`run`、`stop`等,分别对应提交和取消作业。`OPTIONS`为一些预置的选项,`ARGUMENTS`是用户传入的参数。由于命令行工具的参数很多,我们只介绍一些经常使用的参数,其他参数可以参考Flink官方文档。 + +## 9.4.1 提交作业 + +提交作业的语法如下。 + +```bash +$ ./bin/flink run [OPTIONS] [ARGUMENTS] +``` + +我们要提供一个打包好的用户作业JAR包。打包需要使用Maven,在自己的Java工程目录下执行`mvn package`,在`target`文件夹下找到相应的JAR包。 + +我们使用Flink给我们提供的WordCount程序来演示。它的JAR包在Flink主目录下:`./examples/streaming/WordCount.jar`。提交作业的命令如下。 + +```bash +$ ./bin/flink run ./examples/streaming/WordCount.jar +``` + +任何一个Java程序都需要一个主类和main方法作为入口,启动WordCount程序时,我们并没有提及主类,因为程序在`pom.xml`文件中设置了主类。确切地说,经过Maven打包生成的JAR包有文件`META-INF/MANIFEST.MF`,该文件里定义了主类。如果我们想明确使用自己所需要的主类,可以使用`-c ` 或`--class `来指定程序的主类。在一个包含众多`main()`方法的JAR包里,必须指定一个主类,否则会报错。 + +```bash +$ ./bin/flink run \ + -c org.apache.flink.streaming.examples.wordcount.WordCount \ + ./examples/streaming/WordCount.jar +``` + +我们也可以往程序中传入参数。 + +```bash +$ ./bin/flink run \ + -c org.apache.flink.streaming.examples.wordcount.WordCount \ + ./examples/streaming/WordCount.jar \ + --input '/tmp/a.log' \ + --output '/tmp/b.log' +``` + +其中,`--input '/tmp/a.log' --output '/tmp/b.log'`为我们传入的参数,和其他Java程序一样,这些参数会写入`main()`方法的参数`String[]`中,以字符串数组的形式存在。参数需要程序代码解析,因此命令行工具与程序代码中的参数要保持一致,否则会出现参数解析错误的情况。 + +我们也可以在命令行中用`-p`选项设置这个作业的并行度。下面的命令给作业设置的并行度为2。 + +```bash +$ ./bin/flink run -p 2 ./examples/streaming/WordCount.jar +``` + +如果用户在代码中使用`setParallelism()`方法明确设置并行度,或有给某个算子设置并行度,那么用户代码中的设置会覆盖命令行中的`-p`设置。 + +提交作业本质上是向Flink的Master提交JAR包,可以用`-m`选项来设置向具体哪个Master提交。下面的命令将作业提交到Hostname为`myJMHost`的节点上,端口号为8081。 + +```bash +$ ./bin/flink run \ + -m myJMHost:8081 \ + ./examples/streaming/WordCount.jar +``` + +如果我们已经启动了一个YARN集群,且当前节点可以连接到YARN集群上,`-m yarn-cluster`会将作业以Per-Job模式提交到YARN集群上。如果我们已经启动了一个Flink YARN Session,可以不用设置`-m`选项,Flink会记住Flink YARN Session的连接信息,默认向这个Flink YARN Session提交作业。 + +因为Flink支持不同类型的部署方式,为了避免提交作业的混乱、设置参数过多,Flink提出了`-e `或`--executor `选项,用户可以通过这两个选项选择使用哪种执行模式(Executor Mode)。可选的执行模式有:`remote`、`local`、`kubernetes-session`、`yarn-per-job`、 `yarn-session`。例如,一个原生Kubernetes Session中提交作业的命令如下。 + +```bash +$ ./bin/flink run \ + -e kubernetes-session \ + -Dkubernetes.cluster-id= \ + examples/streaming/WindowJoin.jar +``` + +上面命令的`-D`用于设置参数。我们用`-D`形式来设置一些配置信息,这些配置的含义和内容和`conf/flink-conf.yaml`中的配置是一致的。 + +无论用以上哪种方式提交作业,Flink都会将一些信息输出到屏幕上,最重要的信息就是作业的ID。 + +## 9.4.2 管理作业 + +罗列当前的作业的命令如下。 + +```bash +$ ./bin/flink list +``` + +触发一个作业执行Savepoint的命令如下。 + +```bash +$ ./bin/flink savepoint [savepointDirectory] +``` + +这行命令会通知作业ID为`jobId`的作业执行Savepoint,可以在后面添加路径,Savepoint会写入对应目录,该路径必须是Flink Master可访问到的目录,例如一个HDFS路径。 + +关停一个Flink作业的命令如下。 + +```bash +$ ./bin/flink cancel +``` + +关停一个带Savepoint的作业的命令如下。 + +```bash +$ ./bin/flink stop +``` + +从一个Savepoint恢复一个作业的命令如下。 + +```bash +$ ./bin/flink run -s [OPTIONS] +``` diff --git a/doc/ch-deployment-and-configuration/flink-deployment-and-configuration.md b/doc/ch-deployment-and-configuration/flink-deployment-and-configuration.md new file mode 100644 index 0000000..2891d06 --- /dev/null +++ b/doc/ch-deployment-and-configuration/flink-deployment-and-configuration.md @@ -0,0 +1,174 @@ +(flink-deployment-and-configuration)= +# Flink集群部署模式 + +当前,信息系统基础设施正在飞速发展,常见的基础设施包括物理机集群、虚拟机集群、容器集群等。为了兼容这些基础设施,Flink曾在1.7版本中做了重构,提出了第3章中所示的Master-Worker架构,该架构可以兼容几乎所有主流信息系统的基础设施,包括Standalone集群、Hadoop YARN集群或Kubernetes集群。 + +## 9.1.1 Standalone集群 + +一个Standalone集群包括至少一个Master进程和至少一个TaskManager进程,每个进程作为一个单独的Java JVM进程。其中,Master节点上运行Dispatcher、ResourceManager和JobManager,Worker节点将运行TaskManager。图9-1展示了一个4节点的Standalone集群,其中,IP地址为192.168.0.1的节点为Master节点,其他3个为Worker节点。 + +![图9-1 Flink Standalone集群](./img/Flink-Standalone-cluster.png) + +第2章的实验中,我们已经展示了如何下载和解压Flink,该集群只部署在本地,结合图9-1,本节介绍如何在一个物理机集群上部署Standalone集群。我们可以将解压后的Flink主目录复制到所有节点的相同路径上;也可以在一个共享存储空间(例如NFS)的路径上部署Flink,所有节点均可以像访问本地目录那样访问共享存储上的Flink主目录。此外,节点之间必须实现免密码登录:基于安全外壳协议(Secure Shell,SSH),将公钥拷贝到待目标节点,可以实现节点之间免密码登录。所有节点上必须提前安装并配置好JDK,将$JAVA_HOME放入环境变量。 + +我们需要编辑`conf/flink-conf.yaml`文件,将`jobmanager.rpc.address`配置为Master节点的IP地址192.168.0.1;编辑`conf/slaves`文件,将192.168.0.2、192.168.0.3和192.168.0.4等Worker节点的IP地址加入该文件中。如果每个节点除了IP地址外,还配有主机名(Hostname),我们也可以用Hostname替代IP地址来做上述配置。 + +综上,配置一个Standalone集群需要注意以下几点: +- 为每台节点分配固定的IP地址,或者配置Hostname,节点之间设置免密码SSH登录。 +- 在所有节点上提前安装配置JDK,将`$JAVA_HOME`添加到环境变量中。 +- 配置`conf/flink-conf.yaml`文件,设置`jobmanager.rpc.address`为Master节点的IP地址或Hostname。配置`conf/slaves`文件,将Worker节点的IP地址或Hostname添加进去。 +- 将Flink主目录同步到所有节点的相同目录下,或者部署在一个共享目录上,共享目录可被所有节点访问。 + +接着,我们回到Master节点,进入Flink主目录,运行`bin/start-cluster.sh`。该脚本会在Master节点启动Master进程,同时读取`conf/slaves`文件,脚本会帮我们SSH登录到各节点上,启动TaskManager。至此,我们启动了一个Flink Standalone集群,我们可以使用Flink Client向该集群的Master节点提交作业。 + +```bash +$ ./bin/flink run -m 192.168.0.1:8081 ./examples/batch/WordCount.jar +``` + +可以使用`bin/stop-cluster.sh`脚本关停整个集群。 + +## 9.1.2 Hadoop YARN集群 + +Hadoop一直是很多公司首选的大数据基础架构,YARN也是经常使用的资源调度器。YARN可以管理一个集群的CPU和内存等资源,MapReduce、Hive或Spark都可以向YARN申请资源。YARN中的基本调度资源是容器(Container)。 + +注意: +YARN Container和Docker Container有所不同。YARN Container只适合JVM上的资源隔离,Docker Container则是更广泛意义上的Container。 + +为了让Flink运行在YARN上,需要提前配置Hadoop和YARN,这包括下载针对Hadoop的Flink,设置`HADOOP_CONF_DIR`和`YARN_CONF_DIR`等与Hadoop相关的配置,启动YARN等。网络上有大量相关教程,这里不赘述Hadoop和YARN的安装方法,但是用户需要按照9.5节介绍的内容来配置Hadoop相关依赖。 + +在YARN上使用Flink有3种模式:Per-Job模式、Session模式和Application模式。Per-Job模式指每次向YARN提交一个作业,YARN为这个作业单独分配资源,基于这些资源启动一个Flink集群,该作业运行结束后,相应的资源会被释放。Session模式在YARN上启动一个长期运行的Flink集群,用户可以向这个集群提交多个作业。Application模式在Per-Job模式上做了一些优化。图9-2展示了Per-Job模式的作业提交流程。 + +![图9-2 Per-Job模式的作业提交流程](./img/Per-Job.png) + +Client首先将作业提交给YARN的ResourceManager,YARN为这个作业生成一个ApplicationMaster以运行Fink Master,ApplicationMaster是YARN中承担作业资源管理等功能的组件。ApplicationMaster中运行着JobManager和Flink-YARN ResourceManager。JobManager会根据本次作业所需资源向Flink-YARN ResourceManager申请Slot资源。 + +注意: +这里有两个ResourceManager,一个是YARN的ResourceManager,它是YARN的组件,不属于Flink,它负责整个YARN集群全局层面的资源管理和任务调度;一个是Flink-YARN ResourceManager,它是Flink的组件,它负责当前Flink作业的资源管理。 + +Flink-YARN ResourceManager会向YARN申请所需的Container,YARN为之分配足够的Container作为TaskManager。TaskManager里有Flink计算所需的Slot,TaskManager将这些Slot注册到Flink-YARN ResourceManager中。注册成功后,JobManager将作业的计算任务部署到各TaskManager上。 + +下面的命令使用Per-Job模式启动单个作业。 + +```bash +$ ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar +``` + +`-m yarn-cluster`表示该作业使用Per-Job模式运行在YARN上。 + +图9-3展示了Session模式的作业提交流程。 + +![图9-3 Session模式的作业提交流程](./img/session.png) + +Session模式将在YARN上启动一个Flink集群,用户可以向该集群提交多个作业。 + +首先,我们在Client上,用`bin/yarn-session.sh`启动一个YARN Session。Flink会先向YARN ResourceManager申请一个ApplicationMaster,里面运行着Dispatcher和Flink-YARN ResourceManager,这两个组件将长期对外提供服务。当提交一个具体的作业时,作业相关信息被发送给了Dispatcher,Dispatcher会启动针对该作业的JobManager。 + +接下来的流程就与Per-Job模式几乎一模一样:JobManager申请Slot,Flink-YARN ResourceManager向YARN申请所需的Container,每个Container里启动TaskManager,TaskManager向Flink-YARN ResourceManager注册Slot,注册成功后,JobManager将计算任务部署到各TaskManager上。如果用户提交下一个作业,那么Dispatcher启动新的JobManager,新的JobManager负责新作业的资源申请和任务调度。 + +下面的命令本启动了一个Session,该Session的JobManager内存大小为1024MB,TaskManager内存大小为4096MB。 + +```bash +$ ./bin/yarn-session.sh -jm 1024m -tm 4096m +``` + +启动后,屏幕上会显示Flink WebUI的连接信息。例如,在一个本地部署的YARN集群上创建一个Session后,假设分配的WebUI地址为:`http://192.168.31.167:54680/`。将地址复制到浏览器,打开即显示Flink WebUI。 + +之后我们可以使用`bin/flink`在该Session上启动一个作业。 + +```bash +$ ./bin/flink run ./examples/batch/WordCount.jar +``` + +上述提交作业的命令没有特意指定连接信息,所提交的作业会直接在Session中运行,这是因为Flink已经将Session的连接信息记录了下来。从 Flink WebUI 页面上可以看到,刚开始启动时,UI上显示 Total/Available Task Slots 为0,Task Managers也为0。随着作业的提交,资源会动态增加:每提交一个新的作业,Flink-YARN ResourceManager会动态地向YARN ResourceManager申请资源。 + +比较Per-Job模式和Session模式发现:Per-Job模式下,一个作业运行完后,JobManager、TaskManager都会退出,Container资源会释放,作业会在资源申请和释放上消耗时间;Session模式下,Dispatcher和Flink-YARN ResourceManager是可以被多个作业复用的。无论哪种模式,每个作业都有一个JobManager与之对应,该JobManager负责单个作业的资源申请、任务调度、Checkpoint等协调性功能。Per-Job模式更适合长时间运行的作业,作业对启动时间不敏感,一般是长期运行的流处理任务。Session模式更适合短时间运行的作业,一般是批处理任务。 + +除了Per-Job模式和Session模式,Flink还提供了一个Application模式。Per-Job和Session模式作业提交的过程比较依赖Client,一个作业的main()方法是在Client上执行的。main()方法会将作业的各个依赖下载到本地,生成JobGraph,并将依赖以及JobGraph发送到Flink集群。在Client上执行main()方法会导致Client的负载很重,因为下载依赖和将依赖打包发送到Flink集群都对网络带宽有一定要求,执行main()方法会加重CPU的负担。而且在很多企业,多个用户会共享一个Client,多人共用加重了Client的压力。为了解决这个问题,Flink的Application模式允许main()方法在JobManager上执行,这样可以分担Client的压力。在资源隔离层面上,Application模式与Per-Job模式基本一样,相当于为每个作业应用创建一个Flink集群。 + +具体而言,我们可以用下面的代码,基于Application模式提交作业。 + +```bash +$ ./bin/flink run-application -t yarn-application \ + -Djobmanager.memory.process.size=2048m \ + -Dtaskmanager.memory.process.size=4096m \ + -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \ + ./examples/batch/WordCount.jar +``` + +在上面这段提交作业的代码中,`run-application`表示使用Application模式,`-D`前缀加上参数配置来设置一些参数,这与Per-Job模式和Session模式的参数设置稍有不同。为了让作业下载各种依赖,可以向HDFS上传一些常用的JAR包,本例中上传路径是`hdfs://myhdfs/my-remote-flink-dist-dir`,然后使用`-Dyarn.provided.lib.dirs`告知Flink上传JAR包的地址,Flink的JobManager会前往这个地址下载各种依赖。 + +## 9.1.3 Kubernetes集群 + +Kubernetes(简称K8s)是一个开源的Container编排平台。近年来,Container以及Kubernetes大行其道,获得了业界的广泛关注,很多信息系统正在逐渐将业务迁移到Kubernetes上。 + +在Flink 1.10之前,Flink的Kubernetes部署需要用户对Kubernetes各组件和工具有一定的了解,而Kubernetes涉及的组件和概念较多,学习成本较高。和YARN一样,Flink Kubernetes部署方式支持Per-Job和Session两种模式。为了进一步减小Kubernetes部署的难度,Flink 1.10提出了原生Kubernetes部署,同时也保留了之前的模式。新的Kubernetes部署非常简单,将会成为未来的趋势,因此本小节只介绍这种原生Kubernetes部署方式。 + +注意: +原生Kubernetes部署是Flink 1.10推出的新功能,还在持续迭代中,一些配置文件和命令行参数有可能在未来的版本迭代中发生变化,读者使用前最好阅读最新的官方文档。 + +在使用Kubernetes之前,需要确保Kubernetes版本为1.9以上,配置`~/.kube/config`文件,提前创建用户,并赋予相应权限。 + +Flink原生Kubernetes部署目前支持Session模式和Application模式。Session模式是在Kubernetes集群上启动Session,然后在Session中提交多个作业。未来的版本将支持原生Kubernetes Per-Job模式。图9-4所示为一个原生Kubernetes Session模式的作业提交流程。 + +![图9-4 原生Kubernetes Session模式的作业提交流程](./img/Kubernetes-Session.png) + +如图9-4中所示的第1步,我们用`bin/kubernetes-session.sh`启动一个Kubernetes Session,Kubernetes相关组件将进行初始化,Kubernetes Master、ConfigMap和Kubernetes Service等模块生成相关配置,剩下的流程与YARN的Session模式几乎一致。Client提交作业到Dispatcher,Dispatcher启动一个JobManager,JobManager向Flink-Kubernetes ResourceManager申请Slot,Flink-Kubernetes ResourceManager进而向Kubernetes Master申请资源。Kubernetes Master分配资源,启动Kubernetes Pod,运行TaskManager,TaskManager向Flink-Kubernetes ResourceManager注册Slot,这个作业可以基于这些资源进行部署。 + +如图9-4中所示的第1步,我们需要启动一个Flink Kubernetes Session,其他参数需要参考Flink官方文档中的说明,相关命令如下。 + +```bash +$ ./bin/kubernetes-session.sh \ + -Dkubernetes.cluster-id= \ + -Dkubernetes.container.image= \ + -Dtaskmanager.memory.process.size=4096m \ + -Dkubernetes.taskmanager.cpu=2 \ + -Dtaskmanager.numberOfTaskSlots=4 \ + -Dresourcemanager.taskmanager-timeout=3600000 +``` + +上面的命令启动了一个名为ClusterId的Flink Kubernetes Session集群,集群中的每个TaskManager有2个CPU、4096MB的内存、4个Slot。ClusterId是该Flink Kubernetes Session集群的标识,实际使用时我们需要设置一个名字,如果不进行设置,Flink会给我们分配一个名字。 + +为了使用Flink WebUI,可以使用下面的命令进行端口转发。 + +```bash +$ kubectl port-forward service/ 8081 +``` + +在浏览器中打开地址`http://127.0.0.1:8001`,就能看到Flink的WebUI了。与Flink YARN Session一样,刚开始所有的资源都是0,随着作业的提交,Flink会动态地向Kubernetes申请更多资源。 + +我们继续使用`bin/flink`向这个Session集群中提交作业。 + +```bash +$ ./bin/flink run -d -e kubernetes-session \ + -Dkubernetes.cluster-id= examples/streaming/WindowJoin.jar +``` + +可以使用下面的命令关停这个Flink Kubernetes Session集群。 + +```bash +$ echo 'stop' | ./bin/kubernetes-session.sh \ + -Dkubernetes.cluster-id= \ + -Dexecution.attached=true +``` + +原生Kubernetes也有Application模式,Kubernetes Application模式与YARN Application模式类似。使用时,需要先将作业打成JAR包,放到Docker镜像中,代码如下。 + +```dockerfile +FROM flink +RUN mkdir -p $FLINK_HOME/usrlib +COPY /path/of/my-flink-job-*.jar /opt/flink/usrlib/my-flink-job.jar +``` + +然后使用下面的代码行提交作业。 + +```bash +$ ./bin/flink run-application -p 8 -t kubernetes-application \ + -Dkubernetes.cluster-id= \ + -Dtaskmanager.memory.process.size=4096m \ + -Dkubernetes.taskmanager.cpu=2 \ + -Dtaskmanager.numberOfTaskSlots=4 \ + -Dkubernetes.container.image= \ + local:///opt/flink/usrlib/my-flink-job.jar +``` + +其中,`-Dkubernetes.container.image`用来配置自定义的镜像,`local:///opt/flink/usrlib/my-flink-job.jar`表示JAR包在镜像中的位置。 diff --git a/doc/ch-deployment-and-configuration/hadoop-flink.md b/doc/ch-deployment-and-configuration/hadoop-flink.md new file mode 100644 index 0000000..5db9cbd --- /dev/null +++ b/doc/ch-deployment-and-configuration/hadoop-flink.md @@ -0,0 +1,64 @@ +(hadoop-flink)= +# 与Hadoop集成 + +Flink可以和Hadoop生态圈的组件紧密结合,比如9.1节中提到,Flink可以使用YARN作为资源调度器,或者读取HDFS、HBase中的数据。在使用Hadoop前,我们需要确认已经安装了Hadoop,并配置了环境变量`HADOOP_CONF_DIR`,如下环境变量配置是Hadoop安装过程所必需的。 + +```bash +HADOOP_CONF_DIR=/path/to/etc/hadoop +``` + +此外,Flink与Hadoop集成时,需要将Hadoop的依赖包添加到Flink中,或者说让Flink能够获取到Hadoop类。比如,使用`bin/yarn-session.sh`启动一个Flink YARN Session时,如果没有设置Hadoop依赖,将会出现下面的报错。 + +```java +java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException +``` + +这是因为Flink源码中引用了Hadoop YARN的代码,但是在Flink官网提供的Flink下载包中,新版本的Flink已经不提供Hadoop集成,或者说,Hadoop相关依赖包不会放入Flink包中。Flink将Hadoop剔除的主要原因是Hadoop发布和构建的时间过长,不利于Flink的迭代。Flink鼓励用户自己根据需要引入Hadoop依赖包,具体有如下两种方式。 + +1. 在环境变量中添加Hadoop Classpath,Flink从Hadoop Classpath中读取所需依赖包。 +2. 将所需的Hadoop 依赖包添加到Flink主目录下的lib目录中。 + +## 9.5.1 添加Hadoop Classpath + +Flink使用环境变量`$HADOOP_CLASSPATH`来存储Hadoop相关依赖包的路径,或者说,`$HADOOP_CLASSPATH`中的路径会添加到`-classpath`参数中。很多Hadoop发行版以及一些云环境默认情况下并不会设置这个变量,因此,执行Hadoop的各节点应该在其环境变量中设置`$HADOOP_CLASSPATH`。 + +```bash +export HADOOP_CLASSPATH=`hadoop classpath` +``` + +上面的命令中,`hadoop`是Hadoop提供的二进制命令工具,使用前必须保证`hadoop`命令添加到了环境变量`$PATH`中,`classpath`是`hadoop`命令的一个参数选项。`hadoop classpath`可以返回Hadoop所有相关的依赖包,将这些路径输出。如果在一台安装了Hadoop的节点上执行`hadoop classpath`,下面是部分返回结果。 + +```plaintext +/path/to/hadoop/etc/hadoop:/path/to/hadoop/share/hadoop/common/lib/*:/path/to/hadoop/share/hadoop/yarn/lib/*:... +``` + +Flink启动时,会从`$HADOOP_CLASSPATH`中寻找所需依赖包。这些依赖包来自节点所安装的Hadoop,也就是说Flink可以和已经安装的Hadoop紧密结合起来。但Hadoop的依赖错综复杂,Flink所需要的依赖和Hadoop提供的依赖有可能发生冲突。 +该方式只需要设置`$HADOOP_CLASSPATH`,简单快捷,缺点是有依赖冲突的风险。 + +## 9.5.2 将Hadoop依赖包添加到lib目录中 + +Flink主目录下有一个`lib`目录,专门存放各类第三方的依赖包。Flink程序启动时,会将`lib`目录加载到Classpath中。我们可以将所需的Hadoop 依赖包添加到`lib`目录中。具体有两种获取Hadoop 依赖包的方式:一种是从Flink官网下载预打包的Hadoop依赖包,一种是从源码编译。 + +Flink社区帮忙编译生成了常用Hadoop版本的Flink依赖包,比如Hadoop 2.8.3、Hadoop 2.7.5等,使用这些Hadoop版本的用户可以直接下载这些依赖包,并放置到`lib`目录中。例如,Hadoop 2.8.3的用户可以下载`flink-shaded-Hadoop-2-uber-2.8.3-10.0.jar`,将这个依赖包添加到Flink主目录下的`lib`目录中。 + +如果用户使用的Hadoop版本比较特殊,不在下载列表里,比如是Cloudera等厂商发行的Hadoop,用户需要自己下载`flink-shaded`工程源码,基于源码和自己的Hadoop版本自行编译生成依赖包。编译命令如下。 + +```bash +$ mvn clean install -Dhadoop.version=2.6.1 +``` + +上面的命令编译了针对Hadoop 2.6.1的`flink-shaded`工程。编译完成后,将名为`flink-shaded-hadoop-2-uber`的依赖包添加到Flink主目录的`lib`目录中。 +该方式没有依赖冲突的风险,但源码编译需要用户对Maven和Hadoop都有一定的了解。 + +## 9.5.3 本地调试 + +9.5.1小节和9.5.2小节介绍的是针对Flink集群的Hadoop依赖设置方式,如果我们仅想在本地的IntelliJ IDEA里调试Flink Hadoop相关的程序,我们可以将下面的Maven依赖添加到`pom.xml`中。 + +```xml + + org.apache.hadoop + hadoop-client + 2.8.3 + provided + +``` \ No newline at end of file diff --git a/doc/ch-deployment-and-configuration/img/Flink-Standalone-cluster.png b/doc/ch-deployment-and-configuration/img/Flink-Standalone-cluster.png new file mode 100644 index 0000000..9d7454c Binary files /dev/null and b/doc/ch-deployment-and-configuration/img/Flink-Standalone-cluster.png differ diff --git a/doc/ch-deployment-and-configuration/img/Kubernetes-Session.png b/doc/ch-deployment-and-configuration/img/Kubernetes-Session.png new file mode 100644 index 0000000..e4706a0 Binary files /dev/null and b/doc/ch-deployment-and-configuration/img/Kubernetes-Session.png differ diff --git a/doc/ch-deployment-and-configuration/img/Per-Job.png b/doc/ch-deployment-and-configuration/img/Per-Job.png new file mode 100644 index 0000000..ae017ac Binary files /dev/null and b/doc/ch-deployment-and-configuration/img/Per-Job.png differ diff --git a/doc/ch-deployment-and-configuration/img/default-slot-sharing-pipeline.png b/doc/ch-deployment-and-configuration/img/default-slot-sharing-pipeline.png new file mode 100644 index 0000000..a3f9b15 Binary files /dev/null and b/doc/ch-deployment-and-configuration/img/default-slot-sharing-pipeline.png differ diff --git a/doc/ch-deployment-and-configuration/img/memory-model.png b/doc/ch-deployment-and-configuration/img/memory-model.png new file mode 100644 index 0000000..c170ffc Binary files /dev/null and b/doc/ch-deployment-and-configuration/img/memory-model.png differ diff --git a/doc/ch-deployment-and-configuration/img/session.png b/doc/ch-deployment-and-configuration/img/session.png new file mode 100644 index 0000000..1a3c544 Binary files /dev/null and b/doc/ch-deployment-and-configuration/img/session.png differ diff --git a/doc/ch-deployment-and-configuration/img/taskmanager-memory-model.png b/doc/ch-deployment-and-configuration/img/taskmanager-memory-model.png new file mode 100644 index 0000000..9c4d875 Binary files /dev/null and b/doc/ch-deployment-and-configuration/img/taskmanager-memory-model.png differ diff --git a/doc/ch-deployment-and-configuration/img/window-aggregation-slot-sharing-group.png b/doc/ch-deployment-and-configuration/img/window-aggregation-slot-sharing-group.png new file mode 100644 index 0000000..f633a5b Binary files /dev/null and b/doc/ch-deployment-and-configuration/img/window-aggregation-slot-sharing-group.png differ diff --git a/doc/ch-deployment-and-configuration/index.md b/doc/ch-deployment-and-configuration/index.md new file mode 100644 index 0000000..4b1b6fa --- /dev/null +++ b/doc/ch-deployment-and-configuration/index.md @@ -0,0 +1,6 @@ +# Flink的部署和配置 + +通过对前文的学习,我们已经学习了如何编写Flink程序,包括使用DataStream API和使用Table API & SQL来编写程序。本章将重点介绍如何部署和配置Flink作业,主要内容如下: + +```{tableofcontents} +``` \ No newline at end of file diff --git a/doc/ch-deployment-and-configuration/operator-chaining-and-slot-sharing.md b/doc/ch-deployment-and-configuration/operator-chaining-and-slot-sharing.md new file mode 100644 index 0000000..9a1fc9e --- /dev/null +++ b/doc/ch-deployment-and-configuration/operator-chaining-and-slot-sharing.md @@ -0,0 +1,62 @@ +(operator-chaining-and-slot-sharing)= +# 算子链与槽位共享 + +在第3章中我们曾介绍了算子链和槽位共享的概念。默认情况下,这两个功能都是开启的。 + +## 9.3.1 设置算子链 + +Flink会使用算子链将尽可能多的上、下游算子链接到一起,链接到一起的上、下游算子会被捆绑到一起,作为一个线程执行。假如两个算子不进行链接,那么这两个算子间的数据通信存在序列化和反序列化,通信成本较高,所以说算子链可以在一定程度上提高资源利用率。 + +**注意** + +Flink无法把所有算子都链接到一起。上游算子将所有数据前向传播到下游算子上,数据不进行任何交换,那么这两个算子可以被链接到一起。比如,先进行`filter()`,再进行`map()`,这两个算子可以被链接到一起。Flink源码`org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator`中的`isChainable()`方法定义了何种情况可以进行链接,感兴趣的读者可以阅读一下相关代码。 + +另外一些情况下,算子不适合链接在一起,比如两个算子的负载都很高,这时候应该让两个算子拆分到不同的Slot上执行。下面的代码从整个执行环境层面关闭了算子链。 + +```java +StreamExecutionEnvironment env = ... + +env.disableOperatorChaining(); +``` + +关闭算子链之后,我们可以使用`startNewChain()`方法,根据需要对特定的算子进行链接。 + +```java +DataStream result = input + .filter(new Filter1()) + .map(new Map1()) + // 开启新的算子链 + .map(new Map2()).startNewChain() + .filter(new Filter2()); +``` + +上面的例子中,`Filter1`和`Map1`被链接到了一起,`Map2`和`Filter2`被链接到了一起。 +也可以使用`disableChaining()`方法,对当前算子禁用算子链。 + +```java +DataStream result = input + .filter(new Filter1()) + .map(new Map1()) + // 禁用算子链 + .map(new Map2()).disableChaining(); +``` + +上面的例子中,`Filter1`和`Map1`被链接到了一起,`Map2`被分离出来。 + +## 9.3.2 设置槽位共享 + +第3章中我们提到,Flink默认开启了槽位共享,从Source到Sink的所有算子子任务可以共享一个Slot,共享计算资源。或者说,从Source到Sink的所有算子子任务组成的Pipeline共享一个Slot。我们仍然以第3章使用的WordCount程序为例,整个TaskManager下有4个Slot,我们设置作业的并行度为2,其作业的执行情况如图9-7所示。可以看到,一个Slot中包含了从Source到Sink的整个Pipeline。图9-7中Source和FlatMap两个算子被放在一起是因为默认开启了算子链。 + +![图9-7 默认情况下,槽位共享使得整个算子的Pipeline可以放在一个Slot中执行](./img/default-slot-sharing-pipeline.png) + +跟算子链一样,过多的计算任务集中在一个Slot,有可能导致该Slot的负载过大。每个算子都有一个槽位共享组(Slot Sharing Group)。默认情况下,算子都会被分到`default`组中,也就意味着在最终的物理执行图中,从Source到Sink上、下游的算子子任务可以共享一个Slot。我们可以用`slotSharingGroup(String)`方法将某个算子分到特定的组中。例如,下面的代码把WordCount程序中的`WindowAggregation`算子划分到名为`A`的组中。 + +```java +stream.timeWindow(...).sum(...).slotSharingGroup("A"); +``` + +图9-8 展示了这个作业的执行情况,Window Aggregation和Sink都被划分到另外的Slot里执行。这里需要注意的是,我们没有明确给Sink设置Slot Sharing Group,Sink继承了前序算子(Window Aggregation)的Slot Sharing Group,与之一起划分到同一组。 + +第3章中我们提到,未开启算子链和槽位共享的情况下,一个算子子任务应该占用一个Slot。算子链和槽位共享可以让更多算子子任务共享一个Slot。默认情况下算子链和槽位共享是开启的,所以可以让图9-7中所示的从Source到Sink的Pipeline都共享一个Slot。如果一个作业的并行度为`parallelism`,该作业至少需要个数为`parallelism`的Slot。自定义算子链和槽位共享会打断算子子任务之间的共享,当然也会使该作业所需要的Slot数量大于`parallelism`。 + +![图9-8 给Window Aggreagtion设置Slot Sharing Group后,该算子及之后的算子被划分到其他Slot](./img/window-aggregation-slot-sharing-group.png) \ No newline at end of file diff --git a/doc/ch-flink-connectors/Exactly-Once-guarantee.md b/doc/ch-flink-connectors/Exactly-Once-guarantee.md new file mode 100644 index 0000000..1c6e99c --- /dev/null +++ b/doc/ch-flink-connectors/Exactly-Once-guarantee.md @@ -0,0 +1,36 @@ +(Exactly-Once-guarantee)= +# Flink端到端的Exactly-Once保障 + +## 7.1.1 故障恢复与一致性保障 + +在流处理系统中,确保每条数据只被处理一次(Exactly-Once)是一种理想情况。然而,现实中的系统经常因各种意外因素发生故障,如流量激增、网络抖动等。Flink通过重启作业、读取Checkpoint数据、恢复状态和重新执行计算来处理这些故障。 + +Checkpoint和故障恢复过程保证了内部状态的一致性,但可能导致数据重发。如图7-1所示,假设最近一次Checkpoint的时间戳是3,系统在时间戳10处发生故障。在3到10之间处理的数据(如时间戳5和8的数据)需要重新处理。 + +Flink的Checkpoint过程保证了作业内部的数据一致性,主要通过备份以下两类数据: +1. 作业中每个算子的状态。 +2. 输入数据的偏移量Offset。 + +![图7-1 Checkpoint和故障恢复过程会有数据重发问题](./img/data-redundancy-issues.png) + +数据重发类似于观看直播比赛的重播(Replay),但这可能导致时间戳3至10之间的数据被重发,从而引发At-Least-Once问题。为了实现端到端的Exactly-Once保障,需要依赖Source的重发功能和Sink的幂等写或事务写。 + +## 7.1.2 幂等写 + +幂等写(Idempotent Write)是指多次向系统写入数据只产生一次结果影响。例如,向HashMap插入同一个(Key, Value)二元组,只有第一次插入会改变HashMap,后续插入不会改变结果。 + +Key-Value数据库如Cassandra、HBase和Redis常作为Sink来实现端到端的Exactly-Once保障。但幂等写要求(Key, Value)必须是确定性计算的。例如,如果Key是`name + curTimestamp`,每次重发时生成的Key不同,导致多次结果。如果Key是`name + eventTimestamp`,则即使重发,Key也是确定的。 + +Key-Value数据库作为Sink可能遇到时间闪回问题。例如,重启后,之前提交的数据可能被错误地认为是新的操作,导致数据不一致。只有当所有数据重发完成后,数据才恢复一致性。 + +## 7.1.3 事务写 + +事务(Transaction)是数据库系统解决的核心问题。Flink借鉴了数据库中的事务处理技术,结合Checkpoint机制来保证Sink只对外部输出产生一次影响。 + +Flink的事务写(Transaction Write)是指,Flink先将待输出的数据保存,暂时不提交到外部系统;等到Checkpoint结束,所有算子的数据一致时,再将之前保存的数据提交到外部系统。如图7-2所示,使用事务写可以避免时间戳5的数据多次产生输出并提交到外部系统。 + +![图7-2 Flink的事务写](./img/transactional-write.png) + +Flink提供了两种事务写实现方式:预写日志(Write-Ahead-Log,WAL)和两阶段提交(Two-Phase-Commit,2PC)。这两种方式的主要区别在于:WAL方式使用Operator State缓存待输出的数据;如果外部系统支持事务(如Kafka),可以使用2PC方式,待输出数据被缓存在外部系统。 + +事务写能提供端到端的Exactly-Once保障,但牺牲了延迟,因为输出数据不再实时写入外部系统,而是分批次提交。开发者需要权衡不同需求。 \ No newline at end of file diff --git a/doc/ch-flink-connectors/custom-source-and-sink.md b/doc/ch-flink-connectors/custom-source-and-sink.md new file mode 100644 index 0000000..ab1541e --- /dev/null +++ b/doc/ch-flink-connectors/custom-source-and-sink.md @@ -0,0 +1,389 @@ +(custom-source-and-sink)= +# 自定义Source和Sink + +本节将从原理和实现两个方面来介绍Flink的Source和Sink。 + +## 7.2.1 Flink 1.11之前的Source + +Flink 1.11重构了Source接口,是一个非常大的改动,新的Source接口提出了一些新的概念,在使用方式上与老Source接口有较大区别。这里将先重点介绍老的Source接口,因为老的Source接口更易于理解和实现,之后会简单介绍新的Source接口的原理。 + +### 实现SourceFunction + +在本书提供的示例程序中曾大量使用各类自定义的Source,Flink提供了自定义Source的公开接口:SourceFunction的接口和RichSourceFunction的Rich函数类。自定义Source时必须实现两个方法。 + +```java +// Source启动后调用run()方法,生成数据并将其向下游发送 +void run(SourceContext ctx) throws Exception; + +// 停止 +void cancel(); +``` + +run()方法在Source启动后开始执行,一般都会在方法中使用循环,在循环内不断向下游发送数据,发送数据时使用SourceContext.collect()方法。cancel()方法停止向下游继续发送数据。由于run()方法内一般会使用循环,可以使用一个boolean类型的标志位来标记Source是否在执行。当停止Source时,也要修改这个标志位。代码清单 7-1自定义Source,从0开始计数,将数字发送到下游。 + +```java +private static class SimpleSource +implements SourceFunction> { + + private int offset = 0; + private boolean isRunning = true; + + @Override + public void run(SourceContext> ctx) throws Exception { + while (isRunning) { + Thread.sleep(500); + ctx.collect(new Tuple2<>("" + offset, offset)); + offset++; + if (offset == 1000) { + isRunning = false; + } + } + } + + @Override + public void cancel() { + isRunning = false; + } +} +``` + +在主逻辑中调用这个Source。 + +```java +DataStream> countStream = env.addSource(new SimpleSource()); +``` + +与第4章中介绍的DataStream API类似,RichSourceFunction提供了RuntimeContext,以及增加了open()方法用来初始化资源,close()方法用来关闭资源。RuntimeContext指运行时上下文,包括并行度、监控项MetricGroup等。比如,我们可以使用getRuntimeContext().getIndexOfThisSubtask()获取当前子任务是多个并行子任务中的哪一个。 + +### 可恢复的Source + +对于代码清单7-1所示的示例中,假如遇到故障,整个作业重启,Source每次从0开始,没有记录遇到故障前的任何信息,所以它不是一个可恢复的Source。我们在7.1节中讨论过,Source需要支持数据重发才能支持端到端的Exactly-Once保障。如果想支持数据重发,需要满足如下两点。 + +1. Flink开启Checkpoint机制,Source将数据Offset定期写到Checkpoint中。作业重启后,Flink Source从最近一次的Checkpoint中恢复Offset数据。 +2. Flink所连接的上游系统支持从某个Offset开始重发数据。如果上游是Kafka,它是支持Offset重发的。如果上游是一个文件系统,读取文件时可以直接跳到Offset所在的位置,从该位置重新读取数据。 + +在第6章中我们曾详细讨论Flink的Checkpoint机制,其中提到Operator State经常用来在Source或Sink中记录Offset。我们在代码清单7-1的基础上做了一些修改,让整个Source能够支持Checkpoint,即使遇到故障,也可以根据最近一次Checkpoint中的数据进行恢复,如代码清单 7-2所示。 + +```java +private static class CheckpointedSource + extends RichSourceFunction> + implements CheckpointedFunction { + + private int offset; + private boolean isRunning = true; + private ListState offsetState; + + @Override + public void run(SourceContext> ctx) throws Exception { + while (isRunning) { + Thread.sleep(100); + // 使用同步锁,当触发某次Checkpoint时,不向下游发送数据 + synchronized (ctx.getCheckpointLock()) { + ctx.collect(new Tuple2<>("" + offset, 1)); + offset++; + } + if (offset == 1000) { + isRunning = false; + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public void snapshotState(FunctionSnapshotContext snapshotContext) throws +Exception { + // 清除上次状态 + offsetState.clear(); + // 将最新的Offset添加到状态中 + offsetState.add(offset); + } + + @Override + public void initializeState(FunctionInitializationContext initializationContext) throws Exception { + // 初始化offsetState + ListStateDescriptor desc = new +ListStateDescriptor("offset", Types.INT); + offsetState = +initializationContext.getOperatorStateStore().getListState(desc); + + Iterable iter = offsetState.get(); + if (iter == null || !iter.iterator().hasNext()) { + // 第一次初始化,从0开始计数 + offset = 0; + } else { + // 从状态中恢复Offset + offset = iter.iterator().next(); + } + } +} +``` + +代码清单 7-2继承并实现了CheckpointedFunction,可以使用Operator State。整个作业第一次执行时,Flink会调用initializeState()方法,offset被设置为0,之后每隔一定时间触发一次Checkpoint,触发Checkpoint时会调用snapshotState()方法来更新状态到State Backend。如果遇到故障,重启后会从offsetState状态中恢复上次保存的Offset。 + +在run()方法中,我们增加了一个同步锁ctx.getCheckpointLock(),是为了当触发这次Checkpoint时,不向下游发送数据。或者说,等本次Checkpoint触发结束,snapshotState()方法执行完,再继续向下游发送数据。如果没有这个步骤,有可能会导致run()方法中Offset和snapshotState()方法中Checkpoint的Offset不一致。 + +需要注意的是,主逻辑中需要开启Checkpoint机制,如代码清单 7-3所示。 + +```java +public static void main(String[] args) throws Exception { + + Configuration conf = new Configuration(); + // 访问 http://localhost:8082 可以看到Flink WebUI + conf.setInteger(RestOptions.PORT, 8082); + // 设置本地执行环境,并行度为1 + StreamExecutionEnvironment env = +StreamExecutionEnvironment.createLocalEnvironment(1, conf); + // 每隔2秒触发一次Checkpoint + env.getCheckpointConfig().setCheckpointInterval(2 * 1000); + + DataStream> countStream = env.addSource(new +CheckpointedSource()); + // 每隔一定时间模拟一次故障 + DataStream> result = countStream.map(new +FailingMapper(20)); + result.print(); + env.execute("checkpointed source"); +} +``` + +上述代码使用FailingMapper模拟了一次故障。即使发生了故障,Flink仍然能自动重启,并从最近一次的Checkpoint数据中恢复状态。 + +### 时间戳和Watermark + +在5.1.3小节,我们曾经介绍过如何设置一个基于Event Time数据流的时间戳和Watermark,其中一种办法就是在Source中设置。在自定义Source的过程中,SourceFunction.SourceContext提供了相应的方法。 + +```java +// 设置element的时间戳为timestamp,并将element发送出去 +void collectWithTimestamp(T element, long timestamp); + +// 发送一个Watermark +void emitWatermark(Watermark mark); +``` + +其中,SourceContext.collectWithTimestamp()是一种针对Event Time的发送数据的方法,它是SourceContext.collect()的一种特例。比如,我们可以将计数器Source中的run()方法修改如下。 + +```java +@Override +public void run(SourceContext> ctx) throws Exception { + while (isRunning) { + Thread.sleep(100); + // 将系统当前时间作为该数据的时间戳,并发送出去 + ctx.collectWithTimestamp(new Tuple2<>("" + offset, offset), +System.currentTimeMillis()); + offset++; + // 每隔一段时间,发送一个Watermark + if (offset % 100 == 0) { + ctx.emitWatermark(new Watermark(System.currentTimeMillis())); + } + if (offset == 1000) { + isRunning = false; + } + } +} +``` + +如果使用Event Time时间语义,越早设置时间戳和Watermark,越能保证整个作业在时间序列上的准确性和健壮性。 + +我们在5.1.3小节也曾介绍过,对于Event Time时间语义,算子有一个Watermark对齐的过程,某些上游数据源没有数据,将导致下游算子一直等待,无法继续处理新数据。这时候要及时使用SourceContext.markAsTemporarilyIdle()方法将该Source标记为空闲。比如,在实现Flink Kafka Source时,源码如下。 + +```java +public void run(SourceContext sourceContext) throws Exception { + ... + // 如果当前Source没有数据,将当前Source标记为空闲 + // 如果当前Source发现有新数据流入,会自动回归活跃状态 + if (subscribedPartitionsToStartOffsets.isEmpty()) { + sourceContext.markAsTemporarilyIdle(); + } + ... +} +``` + +### 并行版本 + +上面提到的Source都是并行度为1的版本,或者说启动后只有一个子任务在执行。如果需要在多个子任务上并行执行的Source,可以实现ParallelSourceFunction和RichParallelSourceFunction两个类。 + +## 7.2.2 Flink 1.11之后的Source + +仔细分析上面的Source接口,可以发现这样的设计只适合进行流处理,批处理需要另外的接口。Flink在1.11之后提出了一个新的Source接口,主要目的是统一流处理和批处理两大计算模式,提供更大规模并行处理的能力。新的Source接口仍然处于实验阶段,一些Connnector仍然基于老的Source接口来实现的,本书只介绍大概的原理,暂时不从代码层面做具体展示。相信在不久的未来,更多Connector将使用新的Source接口来实现。 + +新的Source接口提出了3个重要组件。 + +- **分片(Split)**:Split是将数据源切分后的一小部分。如果数据源是文件系统上的一个文件夹,Split可以是文件夹里的某个文件;如果数据源是一个Kafka数据流,Split可以是一个Kafka Partition。因为对数据源做了切分,Source就可以启动多个实例并行地读取。 +- **读取器(SourceReader)**:SourceReader负责Split的读取和处理,SourceReader运行在TaskManager上,可以分布式地并行运行。比如,某个SourceReader可以读取文件夹里的单个文件,多个SourceReader实例共同完成读取整个文件夹的任务。 +- **分片枚举器(SplitEnumerator)**:SplitEnumerator负责发现和分配Split。SplitEnumerator运行在JobManager上,它会读取数据源的元数据并构建Split,然后按照负载均衡策略将多个Split分配给多个SourceReader。 + +图7-3展示了这3个组件之间的关系。其中,Master进程中的JobManager运行着SplitEnumerator,各个TaskManager中运行着SourceReader,SourceReader每次向SplitEnumerator请求Split,SplitEnumerator会分配Split给各个SourceReader。 + +![图7-3 新Source接口中的3个重要组件](./img/three-key-components.png) + +## 7.2.3 自定义Sink + +对于Sink,Flink提供的API为SinkFunction接口和RichSinkFunction函数类。使用时需要实现下面的虚方法。 + +```java +// 每条数据到达Sink后都会调用invoke()方法,发送到下游外部系统 +// value为待输出数据 +void invoke(IN value, Context context) +``` + +如7.1节所讨论的问题,如果想提供端到端的Exactly-Once保障,需要使用幂等写和事务写两种方式。 + +### 幂等写 + +幂等写需要综合考虑业务系统的设计和下游外部系统的选型等多方面因素。数据流的一条数据经过Flink可能产生一到多次计算(因为故障恢复),但是最终输出的结果必须是可确定的,不能因为多次计算,导致一些变化。比如我们在前文中提到的,结果中使用系统当前时间戳作为Key就不是一个可确定的计算,因为每次计算的结果会随着系统当前时间戳发生变化。另外,写入外部系统一般是采用更新插入(Upsert)的方式,即将原有数据删除,将新数据插入,或者说将原有数据覆盖。一些Key-Value数据库经常被用来实现幂等写,幂等写也是一种实现成本相对比较低的方式。 + +### 事务写 + +另外一种提供端到端Exactly-Once保障的方式是事务写,并且有两种具体的实现方式:Write-Ahead-Log和Two-Phase-Commit。两者非常相似,下面分别介绍两种方式的原理,并重点介绍Two-Phase-Commit的具体实现。 + +#### Write-Ahead-Log协议的原理 + +Write-Ahead-Log是一种广泛应用在数据库和分布式系统中的保证事务一致性的协议。Write-Ahead-Log的核心思想是,在数据写入下游系统之前,先把数据以日志(Log)的形式缓存下来,等收到明确的确认提交信息后,再将Log中的数据提交到下游系统。由于数据都写到了Log里,即使出现故障恢复,也可以根据Log中的数据决定是否需要恢复、如何进行恢复。图7-4所示为Flink的Write-Ahead-Log流程。 + +![图7-4 Flink的Write-Ahead-Log流程](./img/write-ahead-log.png) + +在Flink中,上游算子会不断向Sink发送待输出数据,这些待输出数据暂时存储在状态中,如图7-4的第0步所示。两次Checkpoint之间的待输出数据组成一个待输出的批次,会以Operator State的形式保存和备份。当Sink接收到一个新Checkpoint Barrier时,意味着Sink需要执行新一次Checkpoint,它会开启一个新的批次,新流入数据都进入该批次。同时,Sink准备将之前未提交的批次提交给外部系统。图7-4所示的第1步和第2步展示了这个过程。数据提交的过程又分为如下3步。 + +1. Sink向CheckpointCommitter查询某批次是否已经提交,通常CheckpointCommitter是一个与外部系统紧密相连的插件,里面存储了各批次数据是否已经写入外部系统的信息。比如,Cassandra的CassandraCommitter使用了一个单独的表存储某批次数据是否已经提交。如果还未提交,则返回false。如果外部系统是一个文件系统,我们用一个文件存储哪些批次数据已经提交。总之,CheckpointCommitter依赖外部系统,它依靠外部系统存储了是否提交的信息。这个过程如图7-4的第3步所示。 +2. Sink得知某批次数据还未提交,则使用sendValues()方法,提交待输出数据到外部系统,即图7-4的第4步。此时,数据写入外部系统,同时也要在CheckpointCommitter中更新本批次数据已被提交的确认信息。 +3. 数据提交成功后,Sink会删除Operator State中存储的已经提交的数据。 + +Write-Ahead-Log仍然无法提供百分之百的Exactly-Once保障,原因如下。 + +1. sendValues()中途可能崩溃,导致部分数据已提交,部分数据还未提交。 +2. sendValues()成功,但是本批次数据提交的确认信息未能更新到CheckpointCommitter中。 + +这两种原因会导致故障恢复后,某些数据可能会被多次写入外部系统。 + +Write-Ahead-Log的方式相对比较通用,目前Flink的Cassandra Sink使用这种方式提供Exactly-Once保障。 + +#### Two-Phase-Commit协议的原理和实现 + +Two-Phase-Commit是另一种广泛应用在数据库和分布式系统中的事务协议。与刚刚介绍的Write-Ahead-Log相比,Flink中的Two-Phase-Commit协议不将数据缓存在Operator State,而是将数据直接写入外部系统,比如支持事务的Kafka。图7-4为Flink的Two-Phase-Commit流程图。 + +![图7-5 Flink的Two-Phase-Commit流程图](./img/two-phase-commit.png) + +如图7-5所示,上游算子将数据发送到Sink后,Sink直接将待输出数据写入外部系统的第k次事务(Transaction)中。接着Checkpoint Barrier到达,新一次Checkpoint开始执行。如图7-5的第2步所示,Flink执行preCommit(),将第k次Transaction的数据预提交到外部系统中,预提交时,待提交数据已经写入外部系统,但是为了保证数据一致性,这些数据由于还没有得到确认提交的信息,对于外部系统的使用者来说,还是不可见的。之所以使用预提交而非提交,是因为Flink无法确定多个并行实例是否都完成了数据写入外部系统的过程,有些实例已经将数据写入,其他实例未将数据写入。一旦发生故障恢复,写入实例的那些数据还有可能再次被写入外部系统,这就影响了Exactly-Once保障的数据一致性。 + +接着,Flink会执行beginTransaction()方法,开启下一次Transaction(Transaction k+1),之后上游算子流入的待输出数据都将流入新的Transaction,如图7-5的第3步。当所有并行实例都执行图7-5中的第2步和第3步之后,本次Checkpoint已经完成,Flink将预提交的数据最终提交到外部系统,至此待输出数据在外部系统最终可见。 + +接下来我们使用具体的例子来演示整个数据写入的过程,这里继续使用本章之前一直使用的数据流DataStream>,我们将这个数据流写入文件。为此,我们准备两个文件夹,一个名为flink-sink-commited,这是数据最终要写入的文件夹,需要保证一条数据从Source到Sink的Exactly-Once一致性;第二个文件夹名为flink-sink-precommit,存储临时文件,主要为事务机制所使用。数据先经过flink-sink-precommit,等得到确认后,再将数据从此文件夹写入flink-sink-commited。结合上面所述的数据写入过程,我们需要继承TwoPhaseCommitSinkFunction,并实现下面的4个方法。 +1. beginTransaction():开启一次新的Transaction。我们为每次Transaction创建一个新的文件缓存,文件缓存名以当前时间命名,新流入数据都写入这个文件缓存。假设当前为第k次Transaction,文件名为k。文件缓存的数据在内存中,还未写入磁盘。 +2. preCommit():数据预提交。文件缓存k从内存写入flink-sink-precommit文件夹,数据持久化到磁盘中。一旦preCommit()方法被执行,Flink会调用beginTransaction()方法,开启下一次Transaction,生成名为k+1的文件缓存。 +3. commit():得到确认后,提交数据。将文件k从flink-sink-precommit文件夹移动到flink-sink-commited。 +4. abort():遇到异常,操作终止。将flink-sink-precommit中的文件删除。 + +除此之外,还需要实现Sink最基本的数据写入方法invoke(),将数据写入文件缓存。代码清单 7-4展示了整个过程。 +```java +public static class TwoPhaseFileSink +extends TwoPhaseCommitSinkFunction, String, Void> { +// 缓存 +private BufferedWriter transactionWriter; +private String preCommitPath; +private String commitedPath; + + public TwoPhaseFileSink(String preCommitPath, String commitedPath) { + super(StringSerializer.INSTANCE, VoidSerializer.INSTANCE); + this.preCommitPath = preCommitPath; + this.commitedPath = commitedPath; + } + + @Override + public void invoke(String transaction, Tuple2 in, Context context) throws Exception { + transactionWriter.write(in.f0 + " " + in.f1 + "\n"); + } + + @Override + public String beginTransaction() throws Exception { + String time = +LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME); +int subTaskIdx = getRuntimeContext().getIndexOfThisSubtask(); +String fileName = time + "-" + subTaskIdx; +Path preCommitFilePath = Paths.get(preCommitPath + "/" + fileName); +// 创建一个存储本次Transaction的文件 +Files.createFile(preCommitFilePath); +transactionWriter = Files.newBufferedWriter(preCommitFilePath); +System.out.println("transaction File: " + preCommitFilePath); + + return fileName; + } + + @Override + public void preCommit(String transaction) throws Exception { + // 将当前数据由内存写入磁盘 + transactionWriter.flush(); + transactionWriter.close(); + } + + @Override + public void commit(String transaction) { + Path preCommitFilePath = Paths.get(preCommitPath + "/" + transaction); + if (Files.exists(preCommitFilePath)) { + Path commitedFilePath = Paths.get(commitedPath + "/" + transaction); + try { + Files.move(preCommitFilePath, commitedFilePath); + } catch (Exception e) { + System.out.println(e); + } + } + } + + @Override + public void abort(String transaction) { + Path preCommitFilePath = Paths.get(preCommitPath + "/" + transaction); + + // 如果中途遇到异常,将文件删除 + if (Files.exists(preCommitFilePath)) { + try { + Files.delete(preCommitFilePath); + } catch (Exception e) { + System.out.println(e); + } + } + } +} +``` + +代码清单 7-4 实现了TwoPhaseCommitSinkFunction的Sink + +TwoPhaseCommitSinkFunction接收如下3个泛型。 +- IN为上游算子发送过来的待输出数据类型。 +- TXN为Transaction类型,本例中是类型String,Kafka中是一个封装了Kafka Producer的数据类型,我们可以往Transaction中写入待输出的数据。 +- CONTEXT为上下文类型,是个可选选项。本例中我们没有使用上下文,所以这里使用了Void,即空类型。 +TwoPhaseCommitSinkFunction的构造函数需要传入TXN和CONTEXT的序列化器。在主逻辑中,我们创建了两个目录,一个为预提交目录,一个为最终的提交目录。我们可以比较使用未加任何保护的print()和该Sink:print()直接将结果输出到标准输出,会有数据重发现象;而使用了Two-Phase-Commit协议,待输出结果写到了目标文件夹内,即使发生了故障恢复,也不会有数据重发现象,代码清单 7-5展示了在主逻辑中使用Two-Phase-Commit的Sink。 + +```java +// 每隔5秒进行一次Checkpoint +env.getCheckpointConfig().setCheckpointInterval(5 * 1000); + +DataStream> countStream = env.addSource(new + CheckpointedSourceExample.CheckpointedSource()); +// 每隔一定时间模拟一次失败 +DataStream> result = countStream.map(new + CheckpointedSourceExample.FailingMapper(20)); + +// 类UNIX操作系统的临时文件夹在/tmp下 +// Windows用户需要修改该目录 +String preCommitPath = "/tmp/flink-sink-precommit"; +String commitedPath = "/tmp/flink-sink-commited"; + +if (!Files.exists(Paths.get(preCommitPath))) { + Files.createDirectory(Paths.get(preCommitPath)); + } + if (!Files.exists(Paths.get(commitedPath))) { + Files.createDirectory(Paths.get(commitedPath)); + } +// 使用Exactly-Once语义的Sink,执行本程序时可以查看相应的输出目录 + result.addSink(new TwoPhaseFileSink(preCommitPath, commitedPath)); +//输出数据,无Exactly-Once保障,有数据重发现象 + result.print(); +``` + +代码清单 7-5 在主逻辑中使用Two-Phase-Commit的Sink + +Flink的Kafka Sink中的FlinkKafkaProducer.Semantic.EXACTLY_ONCE选项就使用这种方式实现,因为Kafka提供了事务机制,开发者可以通过“预提交-提交”的两阶段提交方式将数据写入Kafka。但是需要注意的是,这种方式理论上能够提供百分之百的Exactly-Once保障,但实际执行过程中,这种方式比较依赖Kafka和Flink之间的协作,如果Flink作业的故障恢复时间过长会导致超时,最终会导致数据丢失。因此,这种方式只能在理论上提供百分之百的Exactly-Once保障。 +将转化为markdown格式,输出源代码: \ No newline at end of file diff --git a/doc/ch-flink-connectors/exercise-stock-price-data-stream.md b/doc/ch-flink-connectors/exercise-stock-price-data-stream.md new file mode 100644 index 0000000..fc5a6a3 --- /dev/null +++ b/doc/ch-flink-connectors/exercise-stock-price-data-stream.md @@ -0,0 +1,26 @@ +(exercise-stock-price-data-stream)= +# 实验 读取并输出股票价格数据流 + +经过本章的学习,读者应该基本了解了Flink Connector的使用方法,本节我们继续以股票交易场景来模拟数据流的输入和输出。 + +## 一、实验目的 + +结合股票交易场景,学习如何使用Source和Sink,包括如何自定义Source、如何调用Kafka Sink。 + +## 二、实验内容 + +在第4章和第5章的实验中,我们都使用了股票交易数据,其中使用了StockPrice的数据结构,读取数据集中的数据来模拟一个真实数据流。这里我们将修改第4章实验中的Source,在读取数据集时使用一个Offset,保证Source有故障恢复的能力。 + +基于第5章中的对股票数据xVWAP的计算程序,使用Kafka Sink,将结果输出到Kafka。输出之前,需要在Kafka中建立对应的Topic。 + +## 三、实验要求 + +整个程序启用Flink的Checkpoint机制,计算xVWAP,需要重新编写Source,使其支持故障恢复,计算结果被发送到Kafka。计算结果可以使用JSON格式进行序列化。在命令行中启动一个Kafka Consumer来接收数据,验证程序输出的正确性。 + +## 四、实验报告 + +将思路和程序撰写成实验报告。 + +## 本章小结 + +通过本章的学习,读者应该可以了解Flink Connector的原理和使用方法,包括:端到端Exactly-Once的含义、自定义Source和Sink以及常用Flink Connector使用方法。相信通过本章的学习,读者已经可以将从Source到Sink的一整套流程串联起来。 diff --git a/doc/ch-flink-connectors/flink-connector.md b/doc/ch-flink-connectors/flink-connector.md new file mode 100644 index 0000000..5042cc7 --- /dev/null +++ b/doc/ch-flink-connectors/flink-connector.md @@ -0,0 +1,331 @@ +(flink-connector)= +# Flink中常用的Connector + +本节将对Flink常用的Connector做一些概括性的介绍,主要包括内置输入/输出(Input/Output,I/O)接口、flink-connector项目所涉及的Connector、Apache Bahir所提供的Connector等,如图7-5所示。 + +![图7-6 Flink中常用的Connector](./img/Connector.png) + +Flink支持了绝大多数的常见大数据系统,从系统的类型上,包括了消息队列、数据库、文件系统等;从具体的技术上,包括了Kafka、Elasticsearch、HBase、Cassandra、JDBC、Kinesis、Redis等。各个大数据系统使用起来略有不同,接下来将重点介绍一下Flink内置I/O接口和Flink Kafka Connector,这两类Connector被广泛应用在很多业务场景中,具有很强的代表性。 + +## 7.3.1 内置I/O接口 + +之所以给这类Connector起名为内置I/O接口,是因为这些接口直接集成在了Flink的核心代码中,无论在任何环境中,我们都可以调用这些接口进行数据输入/输出操作。与内置I/O接口相对应的是flink-connector子项目以及Apache Bahir项目中的Connector,flink-connector虽然是Flink开源项目的一个子项目,但是并没有直接集成到二进制包(我们在第2章下载安装的Flink安装包)中。因此,使用Flink的内置I/O接口,一般不需要额外添加依赖,使用其他Connector需要添加相应的依赖。 + +Flink的内置I/O接口如下: + +- 基于Socket的Source和Sink。 +- 基于内存集合的Source。 +- 输出到标准输出的Sink。 +- 基于文件系统的Source和Sink。 + +在前文中,我们其实已经使用过这里提到的接口,比如从内存集合中创建数据流并将结果输出到标准输出。像Socket、内存集合和打印这3类接口非常适合调试。此外,文件系统被广泛用于大数据的持久化,是大数据架构中经常涉及的一种组件。下面我们将再次梳理一下这些接口,并重点介绍一下基于文件系统的Source和Sink。 + +### 1. 基于Socket的Source和Sink + +我们可以从Socket数据流中读取和写入数据。 + +```java +// 读取Socket中的数据,数据流数据之间用\n来切分 +env.socketTextStream(hostname, port, "\n"); + +// 向Socket中写数入据,数据以SimpleStringSchema序列化 +stream.writeToSocket(outputHost, outputPort, new SimpleStringSchema()); +``` + +由于Socket不能保存Offset,也无法实现数据重发,因此以它作为Connector可能会导致故障恢复时的数据丢失,只能提供At-Most-Once的投递保障。这种方式非常适合用来调试,开源工具nc可以创建Socket数据流,结合Flink的Socket接口可以用来快速验证一些逻辑。 + +此外,Socket Source输入数据具有时序性,适合用来调试与时间和窗口有关的程序。 + +注意,使用Socket时,需要提前启动相应的Socket端口,以便Flink能够建立Socket连接,否则将抛出异常。 + +### 2. 基于内存集合的Source + +最常见调试方式是在内存中创建一些数据列表,并直接写入Flink的Source。 + +```java +DataStream sourceDataStream = env.fromElements(1, 2, 3); +``` + +它内部调用的是:`fromCollection(Collection data, TypeInformation typeInfo)`。`fromCollection()`基于Java的Collection接口。对于一些复杂的数据类型,我们用Java的Collection来创建数据,并写到Flink的Source里。 + +```java +// 获取数据类型 +TypeInformation typeInfo = ... +DataStream collectionStream = env.fromCollection(Arrays.asList(data), typeInfo); +``` + +### 3. 输出到标准输出的Sink + +`print()`和`printToErr()`分别将数据流输出到标准输出流(STDOUT)和标准错误流(STDERR)。这两个方法会调用数据的`toString()`方法,将内存对象转换成字符串,因此如果想进行调试、查看结果,一定要实现数据的`toString()`方法。Java的POJO类要重写`toString()`方法,Scala的case class已经有内置的`toString()`方法,无须实现。 + +```java +public class StockPrice { + public String symbol; + public double price; + public long ts; + public int volume; + public String mediaStatus; + + ... + + @Override + public String toString() { + return "(" + this.symbol + "," + + this.price + "," + this.ts + + "," + this.volume + "," + + this.mediaStatus + ")"; + } +} +``` + +`print()`和`printToErr()`方法实际在TaskManager上执行,如果并行度大于1,Flink会将算子子任务的ID一起输出。比如,在IntelliJ IDEA中执行程序,可以得到类似下面的结果,每行输出前都有一个数字,该数字表示相应方法实际在哪个算子子任务上执行。 + +``` +1> 490894,1061719,4874384,pv,1512061207 +1> 502030,4129946,1567637,pv,1512061207 +4> 226011,4228265,3159480,pv,1512057930 +4> 228530,3404444,64179,pv,1512057930 +6> 694940,4531940,4217906,pv,1512058952 +... +``` + +### 4. 基于文件系统的Source和Sink + +#### (1) 基于文件系统的Source + +文件系统一般用来存储数据,为批处理提供输入或输出,是大数据架构中最为重要的组件之一。比如,消息队列可能将一些日志写入文件系统进行持久化,批处理作业从文件系统中读取数据进行分析等。在Flink中,基于文件系统的Source和Sink可以从文件系统中读取和输出数据。 + +Flink对各类文件系统都提供了支持,包括本地文件系统以及挂载到本地的网络文件系统(Network File System,NFS)、Hadoop HDFS、Amazon S3、阿里云OSS等。Flink通过路径中的文件系统描述符来确定该文件路径使用什么文件系统,例如`file:///some/local/file`或者`hdfs://host:port/file/path`。 + +下面的代码从一个文件系统中读取一个文本文件,文件读入后以字符串的形式存在,并生成一个`DataStream`。 + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +String textPath = ... +// readTextFile()方法默认以UTF-8编码格式读取文件 +DataStream text = env.readTextFile(textPath); +``` + +Flink在内部实际调用的是一个支持更多参数的接口。 + +```java +/** + * 从filePath文件中读取数据 + * FileInputFormat 定义文件的格式 + * watchType 检测文件路径下的内容是否有更新 + * interval 检测间隔 + */ +public DataStreamSource readFile( + FileInputFormat inputFormat, + String filePath, + FileProcessingMode watchType, + long interval); +``` + +上述方法可以读取一个路径下的所有文件。`FileInputFormat`定义了输入文件的格式,比如一个纯文本文件`TextInputFormat`,后文还将详细介绍这个接口。参数`filePath`是文件路径。如果这个路径指向一个文件,Flink将读取这个文件,如果这个路径是一个目录,Flink将读取目录下的文件。基于`FileProcessingMode`,Flink提供了如下两种不同的读取文件的模式。 + +- `FileProcessingMode.PROCESS_ONCE`模式只读取一遍某个目录下的内容,读取完后随即退出。 +- `FileProcessingMode.PROCESS_CONTINUOUSLY`模式每隔`interval`毫秒周期性地检查`filePath`路径下的内容是否有更新,如果有更新,重新读取里面的内容。 + +下面的代码展示了如何调用`FileInputFormat`接口。 + +```java +// 文件路径 +String filePath = ... + +// 文件为纯文本格式 +TextInputFormat textInputFormat = new TextInputFormat(new org.apache.flink.core.fs.Path(filePath)); + +// 每隔100毫秒检测一遍 +DataStream inputStream = env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100); +``` + +Flink在实现文件读取时,增加了一个专门检测文件路径的线程。这个线程启动后定时检测路径下的任何修改,比如是否有文件被修改,或文件夹是否添加了新内容。确切地说,这个线程检测文件的修改时间(Modified Time)是否发生了变化。`FileProcessingMode.PROCESS_CONTINUOUSLY`模式下Flink每隔`interval`毫秒周期性地检测文件的修改时间;`FileProcessingMode.PROCESS_ONCE`只检测一次,不周期性地检测。 + +注意 + +重新读取文件内容会影响端到端的Exactly-Once一致性。因为检测更新是基于文件的修改时间,如果我们往一个文件中追加数据,文件的修改时间会发生变化,该文件下次检测时会被重新读取,导致一条数据可能会被多次处理。 + +`FileInputFormat`是读取文件的基类,继承这个基类可以实现不同类型的文件读取,包括纯文本文件。`TextInputFormat`是`FileInputFormat`的一个实现,`TextInputFormat`按行读取文件,文件以纯文本的序列化方式打开。Flink也提供了`AvroInputFormat`、`OrcInputFormat`、`ParquetInputFormat`等其他大数据架构所采用的文件格式,这些文件格式比起纯文本文件的性能更好,它们的读/写方式也各有不同。 + +考虑到数据的容量比较大,在实现文件读取的过程中,Flink会判断`filePath`路径下的文件能否切分。假设这个作业的并行度是`n`,而且文件能够切分,检测线程会将读入的文件切分成`n`份,后续启动`n`个并行的文件读取实例读取这`n`份切分文件。 + +#### (2) 基于文件系统的Sink + +我们可以使用`writeAsText(String path)`、`writeAsText(String path, WriteMode writeMode)`和`writeUsingOutputFormat(OutputFormat format)`等方法来将文件输出到文件系统。`WriteMode`可以为`NO_OVERWRITE`和`OVERWRITE`,即是否覆盖原来路径里的内容。`OutputFormat`与`FileInputFormat`类似,表示目标文件的文件格式。在最新的Flink版本中,这几个输出到文件系统的方法被标记为`@Deprecated`,表示未来将被弃用,主要考虑到这些方法没有参与Flink的Checkpoint过程中,无法提供Exactly-Once保障。这些方法适合用于本地调试。 + +在生产环境中,为了保证数据的一致性,官方建议使用`StreamingFileSink`接口。下面这个例子展示了如何将一个文本数据流输出到一个目标路径上。这里用到的是一个非常简单的配置,包括一个文件路径和一个`Encoder`。`Encoder`可以将数据编码以便对数据进行序列化。 + +```java +DataStream
stream = env.addSource(...); + +// 使用StreamingFileSink将DataStream输出为一个文本文件 +StreamingFileSink fileSink = StreamingFileSink + .forRowFormat(new Path("/file/base/path"), new SimpleStringEncoder("UTF-8")) + .build(); +stream.addSink(fileSink); +``` + +`StreamingFileSink`主要支持两类文件,一种是行式存储,一种是列式存储。我们平时见到的很多数据是行式存储的,即在文件的末尾追加新的行。列式存储在某些场景下的性能很高,它将一批数据收集起来,批量写入。行式存储和列式存储的接口如下。 + +- 行式存储:`StreamingFileSink.forRowFormat(basePath, rowEncoder)`。 +- 列式存储:`StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)`。 + +回到刚才的例子上,它使用了行式存储,`SimpleStringEncoder`是Flink提供的预定义的`Encoder`,它通过数据流的`toString()`方法将内存数据转换为字符串,将字符串按照UTF-8编码写入输出中。`SimpleStringEncoder`可以用来编码转换字符串数据流,`SimpleStringEncoder`可以用来编码转换长整数数据流。 + +如果数据流比较复杂,我们需要自己实现一个`Encoder`。代码清单 7-7中的数据流是一个`DataStream>`,我们需要实现`encode()`方法,将每个数据编码。 + +```java +// 将一个二元组数据流编码并序列化 +static class Tuple2Encoder implements Encoder> { + @Override + public void encode(Tuple2 element, OutputStream stream) throws IOException { + stream.write((element.f0 + '@' + element.f1).getBytes(StandardCharsets.UTF_8)); + stream.write('\n'); + } +} +``` + +对于列式存储,也需要一个类似的`Encoder`,Flink称之为`BulkWriter`,本质上将数据序列化为列式存储所需的格式。比如我们想使用Parquet格式,代码如下。 + +```java +DataStream stream = ...; + +StreamingFileSink fileSink = StreamingFileSink + .forBulkFormat(new Path("/file/base/path"), ParquetAvroWriters.forReflectRecord(Datum.class)) + .build(); + +stream.addSink(fileSink); +``` + +考虑到大数据场景下,输出数据量会很大,而且流处理作业需要长时间执行,`StreamingFileSink`的具体实现过程中使用了桶的概念。桶可以理解为输出路径的一个子文件夹。如果不做其他设置,Flink按照时间来将输出数据分桶,会在输出路径下生成类似下面的文件夹结构。 + +``` +/file/base/path +└── 2020-02-25--15 + ├── part-0-0.inprogress.92c7be6f-8cfc-4ca3-905b-91b0e20ba9a9 + ├── part-1-0.inprogress.18f9fa71-1525-4776-a7bc-fe02ee1f2dda +``` + +目录和文件名实际上是按照下面的结构来命名的。 + +``` +[base-path]/[bucket-path]/part-[task-id]-[id] +``` + +最顶层的文件夹是我们设置的输出目录,第二层是桶,Flink将当前的时间作为`bucket-path`桶名。实际输出时,Flink会启动多个并行的实例,每个实例有自己的`task-id`,`task-id`被添加在了`part`之后。 + +我们也可以自定义数据分配的方式,将某一条数据分配到相应的桶中。 + +```java +StreamingFileSink fileSink = StreamingFileSink + .forRowFormat(new Path("/file/path"), new SimpleStringEncoder("UTF-8")) + .withBucketAssigner(new DateTimeBucketAssigner<>()) + .build(); +``` + +上述的文件夹结构中,有“inprogress”字样,这与`StreamingFileSink`能够提供的Exactly-Once保障有关。一份数据从生成到最终可用需要经过3个阶段:进行中(In-progress)、等待(Pending)和结束(Finished)。当数据刚刚生成时,文件处于In-progress阶段;当数据已经准备好(比如单个part文件足够大),文件被置为Pending阶段;下次Checkpoint执行完,整个作业的状态数据是一致的,文件最终被置为Finished阶段,Finished阶段的文件名没有“inprogress”的字样。从这个角度来看,`StreamingFileSink`和Checkpoint机制结合,能够提供Exactly-Once保障。 + +## 7.3.2 Flink Kafka Connector + +在第1章中我们曾提到,Kafka是一个消息队列,它可以在Flink的上游向Flink发送数据,也可以在Flink的下游接收Flink的输出。Kafka是一个很多公司都采用的消息队列,因此非常具有代表性。 + +Kafka的API经过不断迭代,已经趋于稳定,我们接下来主要介绍基于稳定版本的Kafka Connector。如果仍然使用较旧版本的Kafka(0.11或更旧的版本),可以通过官方文档来了解具体的使用方法。由于Kafka Connector并没有内置在Flink核心程序中,使用之前,我们需要在Maven中添加依赖。 + +```xml + + org.apache.flink + flink-connector-kafka_${scala.binary.version} + ${flink.version} + +``` + +### 1. Flink Kafka Source + +Kafka作为一个Flink作业的上游,可以为该作业提供数据,我们需要一个可以连接Kafka的Source读取Kafka中的内容,这时Kafka是一个Producer,Flink作为Kafka的Consumer来消费Kafka中的数据。代码清单 7-8展示了如何初始化一个Kafka Source Connector。 + +```java +// Kafka参数 +Properties properties = new Properties(); +properties.setProperty("bootstrap.servers", "localhost:9092"); +properties.setProperty("group.id", "flink-group"); +String inputTopic = "Shakespeare"; + +// Source +FlinkKafkaConsumer consumer = + new FlinkKafkaConsumer(inputTopic, new SimpleStringSchema(), properties); +DataStream stream = env.addSource(consumer); +``` + +代码清单 7-8 初始化Kafka Source Consumer + +代码清单7-8创建了一个FlinkKafkaConsumer,它需要3个参数:Topic、反序列化方式和Kafka相关参数。Topic是我们想读取的具体内容,是一个字符串,并且可以支持正则表达式。Kafka中传输的是二进制数据,需要提供一个反序列化方式,将数据转化为具体的Java或Scala对象。Flink已经提供了一些序列化实现,比如:SimpleStringSchema按照字符串进行序列化和反序列化,JsonNodeDeserializationSchema使用Jackson对JSON数据进行序列化和反序列化。如果数据类型比较复杂,我们需要实现DeserializationSchema或者KafkaDeserializationSchema接口。最后一个参数Properties是Kafka相关的设置,用来配置Kafka的Consumer,我们需要配置bootstrap.servers和group.id,其他的参数可以参考Kafka的文档进行配置。 + +Flink Kafka Consumer可以配置从哪个位置读取消息队列中的数据。默认情况下,从Kafka Consumer Group记录的Offset开始消费,Consumer Group是根据group.id所配置的。其他配置可以参考下面的代码。 + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(...); +consumer.setStartFromGroupOffsets(); // 默认从Kafka记录中的Offset开始 +consumer.setStartFromEarliest(); // 从最早的数据开始 +consumer.setStartFromLatest(); // 从最近的数据开始 +consumer.setStartFromTimestamp(...); // 从某个时间戳开始 + +DataStream stream = env.addSource(consumer); +``` + +**注意** + +上述代码中配置消费的起始位置只影响作业第一次启动时所应读取的位置,不会影响故障恢复时重新消费的位置。 + +如果作业启用了Flink的Checkpoint机制,Checkpoint时会记录Kafka Consumer消费到哪个位置,或者说记录了Consumer Group在该Topic下每个分区的Offset。如果遇到故障恢复,Flink会从最近一次的Checkpoint中恢复Offset,并从该Offset重新消费Kafka中的数据。可见,Flink Kafka Consumer是支持数据重发的。 + +### 2. Flink Kafka Sink + +Kafka作为Flink作业的下游,可以接收Flink作业的输出,这时我们可以通过Kafka Sink将处理好的数据输出到Kafka中。在这种场景下,Flink是生成数据的Producer,向Kafka输出。 +比如我们将WordCount程序结果输出到一个Kafka数据流中。 + +```java +DataStream> wordCount = ... + +FlinkKafkaProducer> producer = new +FlinkKafkaProducer> ( + outputTopic, + new KafkaWordCountSerializationSchema(outputTopic), + properties, + FlinkKafkaProducer.Semantic.EXACTLY_ONCE); +wordCount.addSink(producer); +``` + +上面的代码创建了一个FlinkKafkaProducer,它需要4个参数:Topic、序列化方式、连接Kafka的相关参数以及选择什么样的投递保障。这些参数中,Topic和连接的相关Kafka参数与前文所述的内容基本一样。 + +序列化方式与前面提到的反序列化方式相对应,它主要将Java或Scala对象转化为可在Kafka中传输的二进制数据。这个例子中,我们要传输的是一个Tuple2,需要提供对这个数据类型进行序列化的代码,例如代码清单 7-9的序列化代码。 + +```java +public static class KafkaWordCountSerializationSchema implements +KafkaSerializationSchema> { + + private String topic; + + public KafkaWordCountSerializationSchema(String topic) { + super(); + this.topic = topic; + } + + @Override + public ProducerRecord serialize(Tuple2 element, Long timestamp) { + return new ProducerRecord(topic, (element.f0 + ": " + +element.f1).getBytes(StandardCharsets.UTF_8)); + } +} +``` + +代码清单 7-9 将数据写到Kafka Sink时,需要进行序列化 + +最后一个参数决定了Flink Kafka Sink以什么样的语义来保障数据写入Kafka,它接受FlinkKafkaProducer.Semantic的枚举类型,有3种类型:NONE、AT_LEAST_ONCE和EXACTLY_ONCE。 +- None:不提供任何保障,数据可能会丢失也可能会重复。 +- AT_LEAST_ONCE:保证不丢失数据,但是有可能会重复。 +- EXACTLY_ONCE:基于Kafka提供的事务写功能,一条数据最终只写入Kafka一次。 + +其中,EXACTLY_ONCE基于Kafka提供的事务写功能,使用了我们提到的Two-Phase-Commit协议,它保证了数据端到端的Exactly-Once保障。当然,这个类型的代价是输出延迟会增大。实际执行过程中,这种方式比较依赖Kafka和Flink之间的协作,如果Flink作业的故障恢复时间过长,Kafka不会长时间保存事务中的数据,有可能发生超时,最终也可能会导致数据丢失。AT_LEAST_ONCE是默认的,它不会丢失数据,但数据有可能是重复的。 diff --git a/doc/ch-flink-connectors/img/Connector.png b/doc/ch-flink-connectors/img/Connector.png new file mode 100644 index 0000000..f622432 Binary files /dev/null and b/doc/ch-flink-connectors/img/Connector.png differ diff --git a/doc/ch-flink-connectors/img/data-redundancy-issues.png b/doc/ch-flink-connectors/img/data-redundancy-issues.png new file mode 100644 index 0000000..5e48731 Binary files /dev/null and b/doc/ch-flink-connectors/img/data-redundancy-issues.png differ diff --git a/doc/ch-flink-connectors/img/three-key-components.png b/doc/ch-flink-connectors/img/three-key-components.png new file mode 100644 index 0000000..b72710b Binary files /dev/null and b/doc/ch-flink-connectors/img/three-key-components.png differ diff --git a/doc/ch-flink-connectors/img/transactional-write.png b/doc/ch-flink-connectors/img/transactional-write.png new file mode 100644 index 0000000..2f55a7c Binary files /dev/null and b/doc/ch-flink-connectors/img/transactional-write.png differ diff --git a/doc/ch-flink-connectors/img/two-phase-commit.png b/doc/ch-flink-connectors/img/two-phase-commit.png new file mode 100644 index 0000000..4c96de1 Binary files /dev/null and b/doc/ch-flink-connectors/img/two-phase-commit.png differ diff --git a/doc/ch-flink-connectors/img/write-ahead-log.png b/doc/ch-flink-connectors/img/write-ahead-log.png new file mode 100644 index 0000000..a35b334 Binary files /dev/null and b/doc/ch-flink-connectors/img/write-ahead-log.png differ diff --git a/doc/ch-flink-connectors/index.md b/doc/ch-flink-connectors/index.md new file mode 100644 index 0000000..47671ef --- /dev/null +++ b/doc/ch-flink-connectors/index.md @@ -0,0 +1,6 @@ +# Flink连接器 + +经过前文的学习,我们已经了解了Flink如何对一个数据流进行有状态的计算。在实际生产环境中,数据可能存放在不同的系统中,比如文件系统、数据库或消息队列。一个完整的Flink作业包括Source和Sink两大模块,Source和Sink肩负着Flink与外部系统进行数据交互的重要功能,它们又被称为外部连接器(Connector)。本章将详细介绍Flink的Connector相关知识,主要内容如下。 + +```{tableofcontents} +``` \ No newline at end of file diff --git a/doc/ch-programming-basics/exercise-Flink-development-environment.md b/doc/ch-programming-basics/exercise-Flink-development-environment.md index feeee0e..08cf75c 100644 --- a/doc/ch-programming-basics/exercise-Flink-development-environment.md +++ b/doc/ch-programming-basics/exercise-Flink-development-environment.md @@ -1,5 +1,5 @@ (exercise-Flink-development-environment)= -# 2.4 案例实战 Flink开发环境搭建 +# 案例实战 Flink开发环境搭建 本案例实战主要带领读者完成对Flink开发环境的搭建。 @@ -35,6 +35,8 @@ $ ./bin/start-cluster.sh # 启动 Flink 集群 成功启动后,打开浏览器,输入`http://localhost:8081`,可以进入Flink集群的仪表盘(WebUI),如图2-4所示。Flink WebUI可以对Flink集群进行管理和监控。 +![图2-4 Flink WebUI](./img/flink-WebUI.png) + ## 2.4.3 创建Flink工程 我们使用Maven从零开始创建一个Flink工程。 @@ -57,16 +59,28 @@ archetype是Maven提供的一种项目模板,是别人提前准备好了的项 如图2-5所示,在IntelliJ IDEA里依次单击“File”→“New”→“Project”,创建一个新工程。 +![图2-5 在IntelliJ IDEA中创建新工程](./img/new-project.png) + 如图2-6所示,选择左侧的“Maven”,并勾选“Create from archetype”,并单击右侧的“Add Archetype”按钮。 +![图2-6 添加Maven项目](./img/Maven.png) + 如图2-7所示,在弹出的窗口中填写archetype信息。其中GroupId为org.apache.flink,ArtifactId为flink-quickstart-java,Version为1.11.2,然后单击“OK”。这里主要是告诉Maven去资源库中下载哪个版本的模板。随着Flink的迭代开发,Version也在不断更新,读者可以在Flink的Maven资源库中查看最新的版本。GroupId、ArtifactId、Version可以唯一表示一个发布出来的Java程序包。配置好后,单击Next按钮进入下一步。 +![图2-7 填写archetype信息](./img/archetype.png) + 如图2-8所示,这一步是建立你自己的Maven工程,以区别其他Maven工程,GroupId是你的公司或部门名称(可以随意填写),ArtifactId是工程发布时的Java归档(Java Archive,JAR)包名,Version是工程的版本。这些配置主要用于区别不同公司所发布的不同包,这与Maven和版本控制相关,Maven的教程中都会介绍这些概念,这里不赘述。 +![图2-8 配置你的工程信息](./img/project-info.png) + 接下来可以继续单击“Next”按钮,注意最后一步选择你的工程所在的磁盘位置,单击“Finish”按钮,如图2-9所示。至此,一个Flink模板就下载好了。 +![图2-9 配置本工程的位置](./img/project-location.png) + 工程结构如图2-10所示。左侧的“Project”栏是工程结构,其中src/main/java文件夹是Java代码文件存放位置,src/main/scala是Scala代码文件存放位置。我们可以在StreamingJob这个文件上继续修改,也可以重新创建一个新文件。 +![图2-10 工程结构](./img/project-structure.png) + 注意,开发前要单击右下角的“Import Changes”,让Maven导入所依赖的包。 ## 2.4.4 调试和运行Flink程序 @@ -215,6 +229,10 @@ StreamExecutionEnvironment.getExecutionEnvironment(); - 在IntelliJ IDEA中,单击绿色运行按钮,运行这个程序。图2-11所示的两个绿色运行按钮中的任意一个都可以运行这个程序。 - IntelliJ IDEA下方的“Run”栏会显示程序的输出,包括本次需要输出的结果,如图2-12所示。 +![图2-11 在IntelliJ IDEA中运行Flink程序](./img/run.png) + +![图2-12 WordCount程序运行结果](./img/result.png) + 恭喜你,你的第一个Flink程序运行成功! **提示** @@ -242,6 +260,8 @@ com.flink.tutorials.java.api.projects.wordcount.WordCountKafkaInStdOut 如图2-13所示,这时,Flink WebUI上就多了一个Flink作业。 +![图2-13 Flink WebUI中多了一个Flink作业](./img/flink-WebUI-job.png) + 程序的输出会保存到Flink主目录下面的log目录下的.out文件中,可以使用下面的命令查看结果。 ```bash @@ -260,3 +280,6 @@ Flink开发和调试过程中,一般有如下几种方式运行程序。 - 使用Flink提供的其他命令行工具,比如针对Scala、Python和SQL的交互式环境。 对于新手,可以先使用IntelliJ IDEA提供的内置运行按钮,熟练后再使用命令行工具。 + +## 本章小结 +本章中,我们回顾了Flink开发经常用到的继承和多态、泛型和函数式编程等概念,在本地搭建了一个Flink集群,创建了第一个Flink工程,并学会了如何运行Flink程序。 diff --git a/doc/ch-programming-basics/img/Maven.png b/doc/ch-programming-basics/img/Maven.png new file mode 100644 index 0000000..517b60b Binary files /dev/null and b/doc/ch-programming-basics/img/Maven.png differ diff --git a/doc/ch-programming-basics/img/archetype.png b/doc/ch-programming-basics/img/archetype.png new file mode 100644 index 0000000..88ee86f Binary files /dev/null and b/doc/ch-programming-basics/img/archetype.png differ diff --git a/doc/ch-programming-basics/img/flink-WebUI-job.png b/doc/ch-programming-basics/img/flink-WebUI-job.png new file mode 100644 index 0000000..8ff5668 Binary files /dev/null and b/doc/ch-programming-basics/img/flink-WebUI-job.png differ diff --git a/doc/ch-programming-basics/img/flink-WebUI.png b/doc/ch-programming-basics/img/flink-WebUI.png new file mode 100644 index 0000000..fc652e9 Binary files /dev/null and b/doc/ch-programming-basics/img/flink-WebUI.png differ diff --git a/doc/ch-programming-basics/img/new-project.png b/doc/ch-programming-basics/img/new-project.png new file mode 100644 index 0000000..4c528d3 Binary files /dev/null and b/doc/ch-programming-basics/img/new-project.png differ diff --git a/doc/ch-programming-basics/img/project-info.png b/doc/ch-programming-basics/img/project-info.png new file mode 100644 index 0000000..e7a15ff Binary files /dev/null and b/doc/ch-programming-basics/img/project-info.png differ diff --git a/doc/ch-programming-basics/img/project-location.png b/doc/ch-programming-basics/img/project-location.png new file mode 100644 index 0000000..d17a5bd Binary files /dev/null and b/doc/ch-programming-basics/img/project-location.png differ diff --git a/doc/ch-programming-basics/img/project-structure.png b/doc/ch-programming-basics/img/project-structure.png new file mode 100644 index 0000000..0fa1917 Binary files /dev/null and b/doc/ch-programming-basics/img/project-structure.png differ diff --git a/doc/ch-programming-basics/img/result.png b/doc/ch-programming-basics/img/result.png new file mode 100644 index 0000000..6444615 Binary files /dev/null and b/doc/ch-programming-basics/img/result.png differ diff --git a/doc/ch-programming-basics/img/run.png b/doc/ch-programming-basics/img/run.png new file mode 100644 index 0000000..cd317f4 Binary files /dev/null and b/doc/ch-programming-basics/img/run.png differ