开发流程
Flink 的流处理 API(DataStream API)位于 org.apache.flink.streaming.api.scala
包(Scala语言)或 org.apache.flink.streaming.api
包(Java语言)中。本文以 java 为例。
我们如果要使用Flink进行计算开发,一个完整的开发步骤是怎样的呢?
什么叫有界数据流,什么叫无界数据流(何为流处理,何为批处理)?
每个Flink程序都由相同的基本部分组成:
- 获取一个执行环境;
- 加载/创建初始数据;
- 指定对该数据的转换;
- 指定计算结果放在哪里;
- 触发程序执行。
1、开发程序所需依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| <properties> <encoding>UTF-8</encoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <java.version>1.8</java.version> <scala.version>2.12</scala.version> <flink.version>1.12.2</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> {project.build.sourceEncoding} </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
|
2、获取执行环境
Flink 程序开发,首要的便是需要获取其执行环境!根据我们的目的不同,Flink支持以下几种方式:
- 获得一个已经存在的Flink环境
- 创建本地环境
- 创建远程环境
Flink 流程序的入口点是 StreamExecutionEnvironment 类的一个实例,它定义了程序执行的上下文。StreamExecutionEnvironment 是所有 Flink 程序的基础。可以能过使用以下这些静态方法获得一个StreamExecutionEnvironment 的实例:
1 2 3
| StreamExecutionEnvironment.getExecutionEnvironment() StreamExecutionEnvironment.createLocalEnvironment() StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)
|
要获得执行环境,通常只需要调用 getExecutionEnvironment()
方法。这将根据我们的上下文选择正确的执行环境。如果正在 IDE 中的本地环境上执行,那么它将启动一个本地执行环境。如果是从程序中创建了一个JAR文件,并通过命令行调用它,那么 Flink 集群管理器将执行 main()
方法, getExecutionEnvironment()
将返回用于在集群上以分布式方式执行程序的执行环境。
我们使用以下语句来获得流程序的执行环境:
1 2 3
| ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
使用StreamExecutionEnvironment
默认是流式处理环境,但是flink1.12
开始,流批一体,我们可以自己指定当前计算程序的环境模式指定为自动模式:AUTOMATIC
,此设置后,flink将会自动识别数据源类型
有界数据流,则会采用批方式进行数据处理
无界束流,则会采用流方式进行数据处理
1 2 3 4 5 6 7
| env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
|
注意:在 Flink 中,有界与无界数据流都可以强指定为流式运行环境,但是,如果明知一个数据来源为流式数据,就必须设置环境为AUTOMATIC
或STREAMING
,不可以指定为BATCH
否则程序会报错!
3、加载/创建数据源
Flink 可以从多种场景读取加载数据,例如 各类DB 如 Mysql
、SQL SERVER
、MongoDB
、各类MQ 如 Kafka
、RabbitMQ
、以及很多常用数据存储场景 如 redis
、文件(本地文件/HDFS)
、scoket
…
我们在加载数据源的时候,便知道,该数据是有界还是无界了!
Flink 读取rabbitMQ
消息,是有界还是无界呢?当然是无界!因为 Flink 程序启动时,能通过连接知道什么时候 MQ 中有数据,什么时候没有数据吗?不知道,因为本身 MQ 中是否有消息或者消息有多少就是一个不能肯定确定的因素,因此其不得不保持一个类似于长连接的形式,一直等待 MQ 中有数据到来,然后处理。
从集合中读取数据,是无界数据还是有界数据呢?很明显,有界数据!因为数据就这么多,当前数据源在读取时不会再凭空产生数据了。
1
| DataStream<String> elementsSource = env.fromElements("java,scala,php,c++","java,scala,php", "java,scala", "java");
|
从scoket
中读取数据,是无界数据还是有界数据呢?很明显,无界数据!因为scoket
一旦连接,Flink不会知道其数据源什么时候会数据结束,其不得不保持一个类似于长连接的状态,一直等待Scoket
中有数据到来,然后处理。
1
| DataStreamSource<String> elementsSource= env.socketTextStream("10.50.40.131", 9999);
|
从文件
中读取数据,是有界还是无界呢?当然是有界!因为文件中数据,flink读取会做记录,当文件内容读完了,数据源就相当于没有新的数据来到了嘛!
1
| DataStream<String> text = env.readTextFile("file:///path/to/file");
|
数据被读取到内存后,Flink会将它们组织到DataStream中,这是Flink中用来表示流数据的特殊类。
Flink有特殊的类DataSet和DataStream来表示程序中的数据。可以将它们视为不可变的[分布式数据集合],其中可能包含重复的数据。当数据有限时,使用DataSet;当数据无限时(元素的数量是无界的),使用DataStream。可以在其上应用转换来创建新的派生DataSet或DataStream。
4、数据转换处理
每个Flink程序都对分布式数据集合执行转换。Flink的DataStream API提供了多种数据转换功能,包括过滤、映射、连接、分组和聚合。
数据转换处理,就是Flink使用算子,对从数据源中获取的数据进行数据加工处理(例如:数据转换,计算等等)
例如:开窗口、低阶处理函数ProcessFuction
、各种算子:map(映射,与java8流中Map效果类似),flatmap(元素摊平,与java8流中Map效果类似)等等。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++", "java,scala,php", "java,scala", "java");
DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String element, Collector<String> out) throws Exception { String[] wordArr = element.split(","); for (String word : wordArr) { out.collect(word); } } });
flatMap.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value.toUpperCase(); } });
|
这里不必了解每个转换的具体含义,后面我们会详细介绍每个转换。需要强调的是,Flink中的转换是惰性的,在调用sink操作之前不会真正执行。
5、处理后数据放置/输出
将计算后的数据,进行放置(输出/存储),有很多地方,例如:输出到文件,输出到控制台,输出到MQ,输出到DB,输出到scoket
…
输出到控制台
1 2 3
| flatMap.print();
flatMap.printToErr("错误打印:")
|
Flink 中的接收器(sink)操作触发流的执行,以生成程序所需的结果,例如将结果保存到文件系统或将其打印到标准输出。上面的示例使用 flatMap.print()将结果打印到任务管理器日志中(在IDE中运行时,任务管理器日志将显示在IDE的控制台中)。这将对流的每个元素调用其toString()。
6、执行计算程序
一旦写好了程序处理逻辑,就需要通过调用 StreamExecutionEnvironment 上的 execute()
来触发程序执行。根据 ExecutionEnvironment 的类型,执行将在本地机器上触发,或将程序提交到集群上执行。
Flink程序需要启动才能执行任务,正如,spring-boot启动程序需要nohup java -jar xxxx.jar &
或者编译器中点击图标按钮启动
示例:
1 2 3 4 5 6 7 8 9
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.execute();
|
在应用程序中进行的 DataStream API 调用将构建一个附加到StreamExecutionEnvironment的作业图。调用env.execute()时,此图被打包并发送到Flink Master,该Master并行化作业并将其片段分发给Task Managers以供执行。作业的每个并行片段将在一个task slot(任务槽)中执行。
这个分布式运行时要求Flink应用程序是可序列化的。它还要求集群中的每个节点都可以使用所有依赖项。
注意,如果不调用execute(),应用程序将不会运行。StreamExecutionEnvironment上的execute()方法将等待作业完成,然后返回一个JobExecutionResult,其中包含执行时间和累加器结果。
如果不想等待作业完成,可以通过调用StreamExecutionEnvironment上的executeAysnc()来触发异步作业执行。它将返回一个JobClient,可以使用它与刚才提交的作业进行通信。例如,下面是如何通过使用executeAsync()实现execute()的语义。
1 2
| final JobClient jobClient = env.executeAsync(); final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
|
完整示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class FlinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++", "java,scala,php", "java,scala", "java"); DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String element, Collector<String> out) throws Exception { String[] wordArr = element.split(","); for (String word : wordArr) { out.collect(word); } } }); SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value.toUpperCase(); } }); source.print(); env.execute("flink-hello-world"); } }
|
web 界面执行 job
1、将写好的 jar 打包
使用命令 mvn package
进行打包
2、上传
- 在 submit new job 界面上传 jar 文件,该 jar 文件需包含依赖。
- 在 Entry Class 里填入类的路径,如com.alert.*
报错
java.lang.NullPointerException
报错如下:
1
| 2021-12-02 10:51:55,644 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source (1/3)#4 (92d97e5b78862c632d32b272b843bf4b) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException
|
解决方案
做一下非空判断
1 2 3
| StringUtils.isNotBlank(str) StringUtils.isNotEmpty(str) object != null
|
在调用对象方法前先判断对象是否为空(不管是实体、变量、常量、map、list还是其他什么)
在判断对象是否为空时注意不要直接调用判空方法,判空方法特么也是方法
尽量不要用 str.toString()
,改用 String.valueOf()
。比如你要判断一个str是否等于”test”,你不要 str.equals(“test”)
,可以反过来 "test".equals(str)
Reference