Flink 程序开发步骤(Java语言)

开发流程

Flink 的流处理 API(DataStream API)位于 org.apache.flink.streaming.api.scala 包(Scala语言)或 org.apache.flink.streaming.api 包(Java语言)中。本文以 java 为例。

我们如果要使用Flink进行计算开发,一个完整的开发步骤是怎样的呢?

什么叫有界数据流,什么叫无界数据流(何为流处理,何为批处理)?

  • Batch Analytics 批量计算。统一收集数据->存储到DB->对数据进行批量处理,对数据实时性邀请不高,比如生成离线报表、月汇总,支付宝年度账单(一年结束批处理计算)

  • Streaming Analytics 流式计算,顾名思义,就是对数据流进行处理,如使用流式分析引擎如 Storm,Flink 实时处理分析数据,应用较多的场景如 实时报表、车辆实时报警计算等等。

每个Flink程序都由相同的基本部分组成:

  • 获取一个执行环境;
  • 加载/创建初始数据;
  • 指定对该数据的转换;
  • 指定计算结果放在哪里;
  • 触发程序执行。

1、开发程序所需依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.12.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置jar包的入口类(可选) -->
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

2、获取执行环境

Flink 程序开发,首要的便是需要获取其执行环境!根据我们的目的不同,Flink支持以下几种方式:

  • 获得一个已经存在的Flink环境
  • 创建本地环境
  • 创建远程环境

Flink 流程序的入口点是 StreamExecutionEnvironment 类的一个实例,它定义了程序执行的上下文。StreamExecutionEnvironment 是所有 Flink 程序的基础。可以能过使用以下这些静态方法获得一个StreamExecutionEnvironment 的实例:

1
2
3
StreamExecutionEnvironment.getExecutionEnvironment()
StreamExecutionEnvironment.createLocalEnvironment()
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)

要获得执行环境,通常只需要调用 getExecutionEnvironment() 方法。这将根据我们的上下文选择正确的执行环境。如果正在 IDE 中的本地环境上执行,那么它将启动一个本地执行环境。如果是从程序中创建了一个JAR文件,并通过命令行调用它,那么 Flink 集群管理器将执行 main() 方法, getExecutionEnvironment() 将返回用于在集群上以分布式方式执行程序的执行环境。

我们使用以下语句来获得流程序的执行环境:

1
2
3
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 或者:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

使用StreamExecutionEnvironment 默认是流式处理环境,但是flink1.12 开始,流批一体,我们可以自己指定当前计算程序的环境模式指定为自动模式:AUTOMATIC,此设置后,flink将会自动识别数据源类型

有界数据流,则会采用批方式进行数据处理

无界束流,则会采用流方式进行数据处理

1
2
3
4
5
6
7
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

// 强制指定为批数据处理模式:`BATCH`
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

// 强制指定为流数据处理模式:`STREAMING`
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

注意:在 Flink 中,有界与无界数据流都可以强指定为流式运行环境,但是,如果明知一个数据来源为流式数据,就必须设置环境为AUTOMATICSTREAMING,不可以指定为BATCH否则程序会报错!

3、加载/创建数据源

Flink 可以从多种场景读取加载数据,例如 各类DB 如 MysqlSQL SERVERMongoDB、各类MQ 如 KafkaRabbitMQ、以及很多常用数据存储场景 如 redis文件(本地文件/HDFS)scoket

我们在加载数据源的时候,便知道,该数据是有界还是无界了!

Flink 读取rabbitMQ消息,是有界还是无界呢?当然是无界!因为 Flink 程序启动时,能通过连接知道什么时候 MQ 中有数据,什么时候没有数据吗?不知道,因为本身 MQ 中是否有消息或者消息有多少就是一个不能肯定确定的因素,因此其不得不保持一个类似于长连接的形式,一直等待 MQ 中有数据到来,然后处理。

从集合中读取数据,是无界数据还是有界数据呢?很明显,有界数据!因为数据就这么多,当前数据源在读取时不会再凭空产生数据了。

1
DataStream<String> elementsSource = env.fromElements("java,scala,php,c++","java,scala,php", "java,scala", "java");

scoket中读取数据,是无界数据还是有界数据呢?很明显,无界数据!因为scoket一旦连接,Flink不会知道其数据源什么时候会数据结束,其不得不保持一个类似于长连接的状态,一直等待Scoket中有数据到来,然后处理。

1
DataStreamSource<String> elementsSource= env.socketTextStream("10.50.40.131", 9999);

文件 中读取数据,是有界还是无界呢?当然是有界!因为文件中数据,flink读取会做记录,当文件内容读完了,数据源就相当于没有新的数据来到了嘛!

1
DataStream<String> text = env.readTextFile("file:///path/to/file");

数据被读取到内存后,Flink会将它们组织到DataStream中,这是Flink中用来表示流数据的特殊类。

Flink有特殊的类DataSet和DataStream来表示程序中的数据。可以将它们视为不可变的[分布式数据集合],其中可能包含重复的数据。当数据有限时,使用DataSet;当数据无限时(元素的数量是无界的),使用DataStream。可以在其上应用转换来创建新的派生DataSet或DataStream。

4、数据转换处理

每个Flink程序都对分布式数据集合执行转换。Flink的DataStream API提供了多种数据转换功能,包括过滤、映射、连接、分组和聚合。

数据转换处理,就是Flink使用算子,对从数据源中获取的数据进行数据加工处理(例如:数据转换,计算等等)

例如:开窗口、低阶处理函数ProcessFuction、各种算子:map(映射,与java8流中Map效果类似),flatmap(元素摊平,与java8流中Map效果类似)等等。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
"java,scala,php", "java,scala", "java");
// 数据处理
DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String element, Collector<String> out) throws Exception {
String[] wordArr = element.split(",");
for (String word : wordArr) {
out.collect(word);
}
}
});

flatMap.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
});

这里不必了解每个转换的具体含义,后面我们会详细介绍每个转换。需要强调的是,Flink中的转换是惰性的,在调用sink操作之前不会真正执行。

5、处理后数据放置/输出

将计算后的数据,进行放置(输出/存储),有很多地方,例如:输出到文件,输出到控制台,输出到MQ,输出到DB,输出到scoket

输出到控制台

1
2
3
flatMap.print();
//控制台显示为红色
flatMap.printToErr("错误打印:")

Flink 中的接收器(sink)操作触发流的执行,以生成程序所需的结果,例如将结果保存到文件系统或将其打印到标准输出。上面的示例使用 flatMap.print()将结果打印到任务管理器日志中(在IDE中运行时,任务管理器日志将显示在IDE的控制台中)。这将对流的每个元素调用其toString()。

6、执行计算程序

一旦写好了程序处理逻辑,就需要通过调用 StreamExecutionEnvironment 上的 execute() 来触发程序执行。根据 ExecutionEnvironment 的类型,执行将在本地机器上触发,或将程序提交到集群上执行。

Flink程序需要启动才能执行任务,正如,spring-boot启动程序需要nohup java -jar xxxx.jar & 或者编译器中点击图标按钮启动

示例:

1
2
3
4
5
6
7
8
9
// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置模式 (流、批、自动)
// 2.加载数据源
// 3.数据转换
// 4.数据输出
// 5.执行程序
env.execute();
//或者 env.execute("指定当前计算程序名"); // 参数是程序名称,会显示在Web UI界面上

在应用程序中进行的 DataStream API 调用将构建一个附加到StreamExecutionEnvironment的作业图。调用env.execute()时,此图被打包并发送到Flink Master,该Master并行化作业并将其片段分发给Task Managers以供执行。作业的每个并行片段将在一个task slot(任务槽)中执行。

这个分布式运行时要求Flink应用程序是可序列化的。它还要求集群中的每个节点都可以使用所有依赖项。

注意,如果不调用execute(),应用程序将不会运行。StreamExecutionEnvironment上的execute()方法将等待作业完成,然后返回一个JobExecutionResult,其中包含执行时间和累加器结果。

如果不想等待作业完成,可以通过调用StreamExecutionEnvironment上的executeAysnc()来触发异步作业执行。它将返回一个JobClient,可以使用它与刚才提交的作业进行通信。例如,下面是如何通过使用executeAsync()实现execute()的语义。

1
2
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();

完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class FlinkDemo {
public static void main(String[] args) throws Exception {
// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置运行模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2.加载数据源
DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
"java,scala,php", "java,scala", "java");
// 3.数据转换
DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String element, Collector<String> out) throws Exception {
String[] wordArr = element.split(",");
for (String word : wordArr) {
out.collect(word);
}
}
});
//DataStream 下边为DataStream子类
SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
});
// 4.数据输出
source.print();
// 5.执行程序
env.execute("flink-hello-world");
}
}

web 界面执行 job

1、将写好的 jar 打包

使用命令 mvn package 进行打包

2、上传

  • 在 submit new job 界面上传 jar 文件,该 jar 文件需包含依赖。
  • 在 Entry Class 里填入类的路径,如com.alert.*

报错

java.lang.NullPointerException

报错如下:

1
2021-12-02 10:51:55,644 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source (1/3)#4 (92d97e5b78862c632d32b272b843bf4b) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException

解决方案

做一下非空判断

1
2
3
StringUtils.isNotBlank(str)
StringUtils.isNotEmpty(str)
object != null

在调用对象方法前先判断对象是否为空(不管是实体、变量、常量、map、list还是其他什么)

在判断对象是否为空时注意不要直接调用判空方法,判空方法特么也是方法

尽量不要用 str.toString(),改用 String.valueOf()。比如你要判断一个str是否等于”test”,你不要 str.equals(“test”),可以反过来 "test".equals(str)

Reference


Flink 程序开发步骤(Java语言)
https://flepeng.github.io/044-Flink-31-开发-Flink-程序开发步骤(Java语言)/
作者
Lepeng
发布于
2021年3月8日
许可协议