开发流程
Flink 的流处理 API(DataStream API)位于 org.apache.flink.streaming.api.scala 包(Scala语言)或 org.apache.flink.streaming.api 包(Java语言)中。本文以 java 为例。
我们如果要使用Flink进行计算开发,一个完整的开发步骤是怎样的呢?
什么叫有界数据流,什么叫无界数据流(何为流处理,何为批处理)?
每个Flink程序都由相同的基本部分组成:
- 获取一个执行环境;
 
- 加载/创建初始数据;
 
- 指定对该数据的转换;
 
- 指定计算结果放在哪里;
 
- 触发程序执行。
 
1、开发程序所需依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
   | <properties>     <encoding>UTF-8</encoding>     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>     <maven.compiler.source>1.8</maven.compiler.source>     <maven.compiler.target>1.8</maven.compiler.target>     <java.version>1.8</java.version>     <scala.version>2.12</scala.version>     <flink.version>1.12.2</flink.version> </properties> <dependencies>     <dependency>         <groupId>org.apache.flink</groupId>         <artifactId>flink-clients_2.12</artifactId>         <version>${flink.version}</version>     </dependency>     <dependency>         <groupId>org.apache.flink</groupId>         <artifactId>flink-scala_2.12</artifactId>         <version>${flink.version}</version>     </dependency>     <dependency>         <groupId>org.apache.flink</groupId>         <artifactId>flink-java</artifactId>         <version>${flink.version}</version>     </dependency>     <dependency>         <groupId>org.apache.flink</groupId>         <artifactId>flink-streaming-scala_2.12</artifactId>         <version>${flink.version}</version>     </dependency>     <dependency>         <groupId>org.apache.flink</groupId>         <artifactId>flink-streaming-java_2.12</artifactId>         <version>${flink.version}</version>     </dependency>     <dependency>         <groupId>org.apache.flink</groupId>         <artifactId>flink-table-api-scala-bridge_2.12</artifactId>         <version>${flink.version}</version>     </dependency>     <dependency>         <groupId>org.apache.flink</groupId>         <artifactId>flink-table-api-java-bridge_2.12</artifactId>         <version>${flink.version}</version>     </dependency> </dependencies> <build>     <sourceDirectory>src/main/java</sourceDirectory>     <plugins>                  <plugin>             <groupId>org.apache.maven.plugins</groupId>             <artifactId>maven-compiler-plugin</artifactId>             <version>3.5.1</version>             <configuration>                 <source>1.8</source>                 <target>1.8</target>                 {project.build.sourceEncoding}             </configuration>         </plugin>         <plugin>             <groupId>org.apache.maven.plugins</groupId>             <artifactId>maven-surefire-plugin</artifactId>             <version>2.18.1</version>             <configuration>                 <useFile>false</useFile>                 <disableXmlReport>true</disableXmlReport>                 <includes>                     <include>**/*Test.*</include>                     <include>**/*Suite.*</include>                 </includes>             </configuration>         </plugin>                  <plugin>             <groupId>org.apache.maven.plugins</groupId>             <artifactId>maven-shade-plugin</artifactId>             <version>2.3</version>             <executions>                 <execution>                     <phase>package</phase>                     <goals>                         <goal>shade</goal>                     </goals>                     <configuration>                         <filters>                             <filter>                                 <artifact>*:*</artifact>                                 <excludes>                                                                          zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->                                     <exclude>META-INF/*.SF</exclude>                                     <exclude>META-INF/*.DSA</exclude>                                     <exclude>META-INF/*.RSA</exclude>                                 </excludes>                             </filter>                         </filters>                         <transformers>                             <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">                                                                  <mainClass></mainClass>                             </transformer>                         </transformers>                     </configuration>                 </execution>             </executions>         </plugin>     </plugins> </build>
 
  | 
 
2、获取执行环境
Flink 程序开发,首要的便是需要获取其执行环境!根据我们的目的不同,Flink支持以下几种方式:
- 获得一个已经存在的Flink环境
 
- 创建本地环境
 
- 创建远程环境
 
Flink 流程序的入口点是 StreamExecutionEnvironment 类的一个实例,它定义了程序执行的上下文。StreamExecutionEnvironment 是所有 Flink 程序的基础。可以能过使用以下这些静态方法获得一个StreamExecutionEnvironment 的实例:
1 2 3
   | StreamExecutionEnvironment.getExecutionEnvironment() StreamExecutionEnvironment.createLocalEnvironment() StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)
 
  | 
 
要获得执行环境,通常只需要调用 getExecutionEnvironment() 方法。这将根据我们的上下文选择正确的执行环境。如果正在 IDE 中的本地环境上执行,那么它将启动一个本地执行环境。如果是从程序中创建了一个JAR文件,并通过命令行调用它,那么 Flink 集群管理器将执行 main() 方法, getExecutionEnvironment() 将返回用于在集群上以分布式方式执行程序的执行环境。
我们使用以下语句来获得流程序的执行环境:
1 2 3
   | ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
  | 
 
使用StreamExecutionEnvironment 默认是流式处理环境,但是flink1.12 开始,流批一体,我们可以自己指定当前计算程序的环境模式指定为自动模式:AUTOMATIC,此设置后,flink将会自动识别数据源类型
有界数据流,则会采用批方式进行数据处理
无界束流,则会采用流方式进行数据处理
1 2 3 4 5 6 7
   | env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
 
  env.setRuntimeMode(RuntimeExecutionMode.BATCH);
 
  env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
 
  | 
 
注意:在 Flink 中,有界与无界数据流都可以强指定为流式运行环境,但是,如果明知一个数据来源为流式数据,就必须设置环境为AUTOMATIC 或STREAMING,不可以指定为BATCH否则程序会报错!
3、加载/创建数据源
Flink 可以从多种场景读取加载数据,例如 各类DB 如 Mysql、SQL SERVER、MongoDB、各类MQ 如 Kafka、RabbitMQ、以及很多常用数据存储场景 如 redis、文件(本地文件/HDFS)、scoket…
我们在加载数据源的时候,便知道,该数据是有界还是无界了!
Flink 读取rabbitMQ消息,是有界还是无界呢?当然是无界!因为 Flink 程序启动时,能通过连接知道什么时候 MQ 中有数据,什么时候没有数据吗?不知道,因为本身 MQ 中是否有消息或者消息有多少就是一个不能肯定确定的因素,因此其不得不保持一个类似于长连接的形式,一直等待 MQ 中有数据到来,然后处理。
从集合中读取数据,是无界数据还是有界数据呢?很明显,有界数据!因为数据就这么多,当前数据源在读取时不会再凭空产生数据了。
1
   | DataStream<String> elementsSource = env.fromElements("java,scala,php,c++","java,scala,php", "java,scala", "java");
 
  | 
 
从scoket中读取数据,是无界数据还是有界数据呢?很明显,无界数据!因为scoket一旦连接,Flink不会知道其数据源什么时候会数据结束,其不得不保持一个类似于长连接的状态,一直等待Scoket中有数据到来,然后处理。
1
   | DataStreamSource<String> elementsSource= env.socketTextStream("10.50.40.131", 9999);
 
  | 
 
从文件 中读取数据,是有界还是无界呢?当然是有界!因为文件中数据,flink读取会做记录,当文件内容读完了,数据源就相当于没有新的数据来到了嘛!
1
   | DataStream<String> text = env.readTextFile("file:///path/to/file");
 
  | 
 
数据被读取到内存后,Flink会将它们组织到DataStream中,这是Flink中用来表示流数据的特殊类。
Flink有特殊的类DataSet和DataStream来表示程序中的数据。可以将它们视为不可变的[分布式数据集合],其中可能包含重复的数据。当数据有限时,使用DataSet;当数据无限时(元素的数量是无界的),使用DataStream。可以在其上应用转换来创建新的派生DataSet或DataStream。
4、数据转换处理
每个Flink程序都对分布式数据集合执行转换。Flink的DataStream API提供了多种数据转换功能,包括过滤、映射、连接、分组和聚合。
数据转换处理,就是Flink使用算子,对从数据源中获取的数据进行数据加工处理(例如:数据转换,计算等等)
例如:开窗口、低阶处理函数ProcessFuction、各种算子:map(映射,与java8流中Map效果类似),flatmap(元素摊平,与java8流中Map效果类似)等等。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
   | DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",                                                            "java,scala,php", "java,scala", "java");
  DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {     @Override     public void flatMap(String element, Collector<String> out) throws Exception {         String[] wordArr = element.split(",");         for (String word : wordArr) {             out.collect(word);         }     } });
  flatMap.map(new MapFunction<String, String>() {     @Override     public String map(String value) throws Exception {         return value.toUpperCase();     } });
 
  | 
 
这里不必了解每个转换的具体含义,后面我们会详细介绍每个转换。需要强调的是,Flink中的转换是惰性的,在调用sink操作之前不会真正执行。
5、处理后数据放置/输出
将计算后的数据,进行放置(输出/存储),有很多地方,例如:输出到文件,输出到控制台,输出到MQ,输出到DB,输出到scoket…
输出到控制台
1 2 3
   | flatMap.print();
  flatMap.printToErr("错误打印:")
 
  | 
 
Flink 中的接收器(sink)操作触发流的执行,以生成程序所需的结果,例如将结果保存到文件系统或将其打印到标准输出。上面的示例使用 flatMap.print()将结果打印到任务管理器日志中(在IDE中运行时,任务管理器日志将显示在IDE的控制台中)。这将对流的每个元素调用其toString()。
6、执行计算程序
一旦写好了程序处理逻辑,就需要通过调用 StreamExecutionEnvironment 上的 execute() 来触发程序执行。根据 ExecutionEnvironment 的类型,执行将在本地机器上触发,或将程序提交到集群上执行。
Flink程序需要启动才能执行任务,正如,spring-boot启动程序需要nohup java -jar xxxx.jar & 或者编译器中点击图标按钮启动
示例:
1 2 3 4 5 6 7 8 9
   |  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 
 
 
  env.execute();
 
 
  | 
 
在应用程序中进行的 DataStream API 调用将构建一个附加到StreamExecutionEnvironment的作业图。调用env.execute()时,此图被打包并发送到Flink Master,该Master并行化作业并将其片段分发给Task Managers以供执行。作业的每个并行片段将在一个task slot(任务槽)中执行。
这个分布式运行时要求Flink应用程序是可序列化的。它还要求集群中的每个节点都可以使用所有依赖项。
注意,如果不调用execute(),应用程序将不会运行。StreamExecutionEnvironment上的execute()方法将等待作业完成,然后返回一个JobExecutionResult,其中包含执行时间和累加器结果。
如果不想等待作业完成,可以通过调用StreamExecutionEnvironment上的executeAysnc()来触发异步作业执行。它将返回一个JobClient,可以使用它与刚才提交的作业进行通信。例如,下面是如何通过使用executeAsync()实现execute()的语义。
1 2
   | final JobClient jobClient = env.executeAsync(); final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
 
  | 
 
完整示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
   | public class FlinkDemo {     public static void main(String[] args) throws Exception {                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();                  env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);                  DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",                 "java,scala,php", "java,scala", "java");                  DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {             @Override             public void flatMap(String element, Collector<String> out) throws Exception {                 String[] wordArr = element.split(",");                 for (String word : wordArr) {                     out.collect(word);                 }             }         });                  SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {             @Override             public String map(String value) throws Exception {                 return value.toUpperCase();             }         });                  source.print();                  env.execute("flink-hello-world");     } }
 
  | 
 
web 界面执行 job
1、将写好的 jar 打包
使用命令 mvn package 进行打包
2、上传
- 在 submit new job 界面上传 jar 文件,该 jar 文件需包含依赖。
 
- 在 Entry Class 里填入类的路径,如com.alert.*
 
报错
java.lang.NullPointerException
报错如下:
1
   | 2021-12-02 10:51:55,644 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source (1/3)#4 (92d97e5b78862c632d32b272b843bf4b) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException
 
  | 
 
解决方案
做一下非空判断
1 2 3
   | StringUtils.isNotBlank(str) StringUtils.isNotEmpty(str) object != null
 
  | 
 
在调用对象方法前先判断对象是否为空(不管是实体、变量、常量、map、list还是其他什么)
在判断对象是否为空时注意不要直接调用判空方法,判空方法特么也是方法
尽量不要用 str.toString(),改用 String.valueOf()。比如你要判断一个str是否等于”test”,你不要 str.equals(“test”),可以反过来 "test".equals(str)
Reference