06-Flink DateStrame API

Flink 官网主页地址:https://flink.apache.org
Flink 官方中文地址:https://nightlies.apache.org/flink/flink-docs-stable/zh/

DataStream API

DataStream API 是 Flink 核心层 API。一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几部分构成:

1、执行环境(Execution Environment)

Flink 程序可以在各种上下文环境中运行;我们可以在本地 JVM 中执行程序,也可以提交到远程集群上运行。

不同的环境,代码提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前 Flink 的运行环境,从而建立起与 Flink 框架之间的联系。

1.1、创建执行环境

我们要获取的执行环境,是 StreamExecutionEnviroment 类的对象,这是所有 Flink 程序的基础。在代码中成绩执行环境的方式,就是调用这个类的静态方法,具体有一下三种。

  1. getExecutionEnviroment

    最简单的方式,就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar 包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。

    1
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    这种方式,用起来简单高效,是最常用的一种创建执行环境的方式。

  2. createLocalEnviroment

    这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则没人并行度就是本地的 CPU 核心数。

    1
    StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
  3. createRemoteEnvironment

    这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的Jar包。

    1
    2
    3
    4
    5
    6
    StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
    .createRemoteEnvironment(
    "host", // JobManager主机名
    1234, // JobManager进程端口号
    "path/to/jarFile.jar" // 提交给JobManager的JAR包
    );

    在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

1.2、执行模式(Execution Mode)

从 Flink1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理。不建议使用 DataSet API。

1
2
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream API 执行模式包括:流执行模式、批执行模式和自动模式

  • 流执行模式(Streaming)
    这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 Streaming 执行模式。

  • 批执行模式(Batch)
    专门用于批处理的执行模式

  • 自动模式(AutoMatic)
    在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
    批执行模式的使用。主要有两种方式:

    1. 通过命令行配置

      1
      bin/flink run -Dexecution.runtime-mode=BATCH ...
    2. 通过代码配置

      1
      2
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setRuntimeMode(RuntimeExecutionMode.BATCH);

1.3、触发程序执行

需要注意的是,写完输出(Sink)操作并不代表程序已经结束。因为当 main() 方法被调用时,其实只是定义了作业的每个执行操作如何添加到数据流图中;这时候并没有真正处理数据。因为数据可能还没有来。

Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。所以我们需要显示地调用执行环境的 execute() 方法来触发程序的执行。execute() 方法讲一直等待作业完成,返回一个执行结果(JobExecutionResult)。

1
env.execute();

2、源算子(Source)

Fink 可以从各种来源获取数据,如何构建 DataStream 进行转换处理。一般将数据地输入来源称为数据源(data source),而读取数据的算子就是源算子。所以,source 就是我们整个处理程序的输入端。

  • 在 Flink1.12 以前,旧的添加 source 的方法是调用执行环境的 addSource() 方法:

    1
    DataStream<String> stream = env.addSource(...);

    方法传入的参数是一个“源函数”(source function),需要实现 SourceFunction 接口。

  • 从 Flink1.12 开始,主要使用流批一体的新 Source 架构:

    1
    DataStreamSource<String> stream = env.fromSource(…)

Flink 直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的 Source,通常情况下足以应对我们的实际需求。

2.1、准备工作

为了方便练习,我们创建 WaterSensor 类作为数据模型。

字段名 数据类型 说明
id String 水位传感器类型
ts Long 传感器记录时间戳
vc Integer 水位记录

具体代码如下:

1
2
3
4
5
6
7
8
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
private String id;
private Long ts;
private Integer vc;
}

这里需要注意的点:

  • 类是公共的
  • 所有属性都是公有的
  • 所有属性的类型都是可以序列化的

Flink 会把这样的类作为一种特殊的 POJO(Plain Ordinary Java Object 简单的 Java 对象,实际就是普通 JavaBeans)数据类型来对待,方便数据的解析和序列化。

我们这里自定义的 POJO 类会在后面的代码中频繁使用,所以在后面的代码中碰到,把这里的 POJO 类导入就好了。

2.2、从集合中读取数据

最简单的读取数据的方式,就是在代码中直接创建一个 Java 集合,然后调用执行环境的 fromCollection 方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Demo01_CollectionSource {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 3333);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

//从集合中获取数据,用于测试代码的逻辑是否有bug
env.fromCollection(Arrays.asList(1,2,3,4,5,6)).print();

//基于元素成绩
env.fromElements(7,8,9,10,11,12).printToErr();

env.execute();
}
}

2.3、从文件中读取数据

真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。
读取文件,需要添加文件连接器依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Demo02_FileSource {
public static void main(String[] args) throws Exception {
//创建Flink配置类(空参创建的话都是默认值)
Configuration configuration = new Configuration();
//修改配置类中的WebUI端口号
configuration.setInteger("rest.port",3333);
//创建Flink环境(并且传入配置对象)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

/**
* 引入file-connector
*
* forRecordStreamFormat(
* final StreamFormat<T> streamFormat, hadoop中的输入格式。代表输入的每一行数据的格式。
* 压缩的文本,也能识别(部分)
* final Path...paths:读取的文件的路径,可以是单个文件,也可以是一个目录,可以是本地磁盘的目录,也可以是hdfs上的文件
* 如果读取hdfs上的文件,需要引入hadoop-client的依赖
* )
*/

//获取数据源
FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/ws.json")).build();

//使用就算环境,调用source算子去读数据
DataStreamSource<String> streamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "myfile");

//简单打印
streamSource.print();

//执行
env.execute();
}
}

说明:

  • 参数可以是目录,也可以是文件;还可以从 HDFS 目录下读取,使用路径 hdfs://…
  • 路径可以是相对路径,也可以是绝对路径
  • 相对路径是从系统属性 user.dir 获取路径;idea 下是 project 的根目录,standalone 模式下是集群节点根目录。

2.4、从 Soceket 读取数据

不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。

我们之前用到的读取 socket 文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

1
DataStream<String> stream = env.socketTextStream("hadoop102", 9999);

2.5、从 Kafka 读取数据

Flink 官方提供了连接工具 flink-connector-kafka,帮我们实现了一个消费者 FlinkKafkaConsumer,它就是用来读取 Kafka 数据的 SourceFunction。

所以想要以 Kafka 作为数据源获取数据,我们只需要引入 Kafka 连接器的依赖。Flink 官方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本只支持 0.10.0 版本以上的 Kafka。这里我们需要导入的依赖如下。

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Demo03_KafkaSource {
public static void main(String[] args) throws Exception {
//创建Flink配置类(空参创建的话都是默认值)
Configuration configuration = new Configuration();
//修改配置类中的WebUI端口号
configuration.setInteger("rest.port",3333);
//创建Flink环境(并且传入配置对象)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

/**
* 构造一个Source
* kafkaSource.<OUT>builder()
* OUT指kafka中读取的数据的value类型
*/
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
//声明kafka集群地址
.setBootstrapServers("hadoop102:9092")
//声明读取的主题
.setTopics("FlinkTest")
/**设置消费者组 Source算子可以有多个并行度,每个并行度都会被创建为一个Task,每个Task都是一个消费者组,但是多个消费者组属于同一个组
* 一个Source的Task可以消费一个主题的N个分区
*/
.setGroupId("flink")
/**
* 设置消费者的消费策略
* 从头消费:earliest
* 从尾消费:latest
*/
//没有设置策略的话是从头消费
// flink 程序在启动的时候,从之前备份的状态中读取offsets,从offsets位置继续往后消费!
//如果没有备份,此时参考消费策略
//从头消费
// .setStartingOffsets(OffsetsInitializer.earliest())
//从 kafka 读取当前组上次提交的 offset 位置,如果这个组没有提交过,再从头消费
//.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))


/**
* 如果你消费的数据,是没有 key 的,只需要设置 value 的反序列化器:setValueOnlyDeserializer
* 如果你消费的数据,有 key,需要设置 key-value 的反序列化器:setDeserializer
*/
.setValueOnlyDeserializer(new SimpleStringSchema())
//设置是否自动提交消费的 offset
.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
//设置自动提交的时间间隔
.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
.build();

env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"kafka").print();

env.execute();
}
}

2.6、从数据生成器读取数据

Flink 从 1.11 开始提供了一个内置的 DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。1.17 提供了新的Source写法,需要导入依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class Demo04_DataGenSource {
public static void main(String[] args) throws Exception {
//创建Flink配置类(空参创建的话都是默认值)
Configuration configuration = new Configuration();
//修改配置类中的WebUI端口号
configuration.setInteger("rest.port",3333);
//创建Flink环境(并且传入配置对象)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

String[] ids={"s1","s2","s3"};

/**
* 模拟源源不断的WaterSensor
*
* DataGeneratorSource(
* GeneratorFunction<Long,OUT> generationFunction,函数帮你模拟想要的数据OUT,你要模拟的数据类型
* Long count,一共要模拟多少条数据
* RateLimiterStrategy rateLimiterStrategy,速率限制
* TypeInformation<OUT> typeInfo 补充OUT的类型信息
* )
*/

//模拟数据
DataGeneratorSource<WaterSensor> source = new DataGeneratorSource<WaterSensor>(
new GeneratorFunction<Long,WaterSensor>()
{
@Override
public WaterSensor map(Long aLong) throws Exception {
return new WaterSensor(
ids[RandomUtils.nextInt(0,ids.length)],
System.currentTimeMillis(),
RandomUtils.nextInt(100,30000)
);
}
},
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(5d),
TypeInformation.of(WaterSensor.class)

);

//使用计算环境,调用fromSource算子去读数据
DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "dg");

//打印
sensorDataStreamSource.print();

//执行
env.execute();
}
}

1、Flink 的类型系统

Flink 使用”类型信息“(TypeInformation)来统一表示数据类型。TypeInformation 类是 Flink 中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

2、Flink 支持的数据类型

对于常见的 Java 和 Scala 数据类型,Flink 都是支持的。Flink 在内部,Flink 对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:

  1. 基本类型。所有 Java 基本类型及其包装类,再加上 Void、String、Date、BigDecimal 和 BigInteger。

  2. 数组类型。包括基本类型数组(Primitive_Array)和对象数组(Object_Array)。

  3. 复合数据类型。

    • Java 元组类型(Tuple):这是 Flink 内置的元组类型,是 Java API 的一部分。最多 25 个字段,也就是从Tuple0~Tuple25,不支持空字段。
    • Scala 样例类及 Scala 元组;不支持空字段。
    • 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
    • POJO:Flink 自定义的类似于 Java Bean 模式的类。
  4. 辅助类型。Option、Either、List、Map 等

  5. 泛型类型(Generic)。Flink 支持所有的 Java 类和 Scala 类。不过如果没有按照上面 POJO 类型的要求来定义,就会被 Flink 当作泛型类来处理。Flink 会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由 Flink 本身序列化的,而是由 Kryo 序列化的。

    在这些类型中,元组类型和 POJO 类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO 还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。所以,在项目实践中,往往会将流处理程序中的元素类型定为 Flink 的 POJO 类型。

    Flink 对 POJO 类型的要求如下:

    • 类是公有(public)的
    • 有一个无参的构造方法
    • 所有属性都是公有(public)的
    • 所有属性的类型都是可以序列化的

3、类型提示(Type Hints)

Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提前的信息是不够精细的,只告诉 Flink 当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

为了解决这类问题,Java API提供了专门的“类型同时”(type hints)

回忆一下之前的 word count 流处理程序,我们在将 String 类型的每个词转换成(word,count)二元组后,就明确地用 returns 指定了返回的类型。因为对于 map 里传入了 Lambda 表达式,系统只能推断出返回的是Tuple2 类型,而无法得到 Tuple<String,Long>。只有显示地告诉系统当前的返回类型,才能正确地解析出完整数据。

1
2
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

Flink 还专门提供了 TypeHint 类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时通过足够的信息。我们同样可以通过 returns() 方法,明确地指定转换之后的 DataStream 里元素的类型。

1
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})

3、转换算子(Transformation)

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream。

3.1、基本转换算子(map/filter/flatMap)

3.1.1、映射(map)

map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。

我们只需要基于 DataStream 调用 map() 方法就可以进行转换处理。方法需要传入的参数是接口 MapFunction 的实现;返回值类型还是 DataStream,不过泛型(流中的元素类型)可能改变。

下面是模拟读取数据库数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class Demo01_Map {
public static void main(String[] args) throws Exception {
//创建Flink配置类(空参创建的话都是默认值)
Configuration configuration = new Configuration();
//修改配置类中的WebUI端口号
configuration.setInteger("rest.port",3333);
//创建Flink环境(并且传入配置对象)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

//设置并行度是1
env.setParallelism(1);

List<Integer> nums = Arrays.asList(1, 2, 3, 4, 5);

// 方式一:传入匿名类,实现 MapFunction
//env.fromCollection(nums).map(new MapFunction<WaterSensor, String>() {
// @Override
// public String map(WaterSensor e) throws Exception {
// return e.id;
// }
//}).print();

// 方式二:传入 MapFunction 的实现类
env.fromCollection(nums)
.map(new MyMapFunction())
.print();
env.execute();

}

private static class MyMapFunction extends RichMapFunction<Integer, String> {

private String conn;
@Override
public String map(Integer integer) throws Exception {

//使用连接,读取数据库
System.out.println(integer + "使用" + conn);
return integer.toString();
}

/**
* RichFunction
* 在Task被创建的时候,执行一次
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
//模拟创建连接
conn="连接";
System.out.println("创建好了连接..................");
}

/**
* RichFunction
* 在Task关闭的时候,执行一次
* @throws Exception
*/
@Override
public void close() throws Exception {
//关闭连接
System.out.println("关闭了连接...................");
}
}
}

上面代码中,MapFuntion 实现类的泛型类型与输入数据类型和输出数据的类型有关。在实现 MapFunction 接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还需要重写一个 map() 方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。

3.1.2、过滤(filter)

filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。

进行 filter 转换之后的新数据流的数据类型与原数据流是相同的。filter 转换需要传入的参数需要实现FilterFunction 接口,而 FilterFunction 内要实现 filter() 方法,就相当于一个返回布尔类型的条件表达式。

输出偶数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Demo02_Filter {
public static void main(String[] args) throws Exception {
//创建Flink配置类(空参创建的话都是默认值)
Configuration configuration = new Configuration();
//修改配置类中的WebUI端口号
configuration.setInteger("rest.port",3333);
//创建Flink环境(并且传入配置对象)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

List<Integer> nums = Arrays.asList(1, 2, 3, 4, 5, 6);

env.fromCollection(nums)
.filter(integer -> integer%2==0)
.print();

env.execute();
}
}

3.1.3、扁平映射(flatMap)

flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

同 map 一样,flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。

输出偶数,并且输出多次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Demo03_FlatMap {
public static void main(String[] args) throws Exception {
//创建Flink配置类(空参创建的话都是默认值)
Configuration configuration = new Configuration();
//修改配置类中的WebUI端口号
configuration.setInteger("rest.port",3333);
//创建Flink环境(并且传入配置对象)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);


List<Integer> nums = Arrays.asList(1, 2, 3, 4, 5, 6);

//炸裂(留下偶数)
env.fromCollection(nums)
.flatMap(new FlatMapFunction<Integer, Integer>() {
@Override
public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
if (integer%2==0){
collector.collect(integer);
collector.collect(integer);
collector.collect(integer);
collector.collect(integer);
}
}
})
.print();

env.execute();
}
}

3.2、聚合算子

计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并—这就是所谓的“聚合”(Aggregation),类似于 MapReduce 中的 Reduce 操作。

3.2.1、按键分区(KeyBy),要做聚合,需要先进行分区

对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 KeyBy 来完成的。

KeyBy 是聚合前必须用到的一个算子。KeyBy 通过指定键(key),可以将一条流从逻辑上划分为不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。

基于不同的 key,流中的数据将被分配到不同的分区中;这样一来,所有具有相同的 key 的数据,都将被发往同一个分区。

在内部,是通过计算 key 的哈希值(hash code),对分区进行取模运算来实现的,所以这里 key 如果是 POJO 的话,必须重写 hashCode 方法。

keyBy() 方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指定 key;
比如 Tuple 数据类型,可以指定字段的位置或多个位置的组合;
对于 POJO 类型,可以指定字段的名称(String);
另外,还可以传入 Lambda 表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取 key 的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Demo01_CommonAgg {
public static void main(String[] args) throws Exception {
//获取Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
);

// 方式一:使用Lambda表达式
KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);

// 方式二:使用匿名类实现KeySelector
KeyedStream<WaterSensor, String> keyedStream1 = stream.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor e) throws Exception {
return e.id;
}
});

env.execute();
}
}

运行截图:

需要注意的是,keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为 KeyedStream。KeyedStream 可以认为是“分区流”或者“键控流”,它是对 DataStream 按照 key 的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定 key 的类型。

KeyedStream 也继承自 DataStream,所以基于它的操作也都归属于 DataStream API。但它跟之前的转换操作得到的 SingleOutputStreamOperator 不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如 sum,reduce)。

3.2.2、简单聚合(Sum/Min/MinBy/MaxBy)

有了按键分区的数据流 KeyedStream,我们可以就可以给予它进行聚合操作了。Flink 为我们内置实现了一些最基本、最简单的聚合 API,主要有以下几种:

  • sum():在输入流上,对指定的字段做叠加求和的操作。
  • min():在输入流上,对指定的字段求最小值
  • max():在输入流上,对指定的字段求最大值
  • minBy():与 min() 类似,在输入流上针对指定字段求最小值。不同的是,min() 只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy() 则会返回包含字段最小值的整条数据
  • maxBy():与 max() 类似,在输入流上针对指定字段求最大值。两者区别与 min()MinBy() 的区别一样。

简单聚合算子使用很方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定字段就可以了。指定字段的方式有两种:指定位置和指定名称

对于元组类型的数据,可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称是以 f0、f1、f2、… 来命名的。

如果数据流的类型是 POJO 类,那么就只能通过指定字段名称来指定,不能通过位置来指定了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TransAggregation {
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
);
stream.keyBy(e -> e.id).max("vc"); // 指定字段名称
env.execute();
}
}

简单聚合算子返回的同样是一个 SingleOutputStreamOperator,也就是从 KeyedStream 又转换成了常规的 DataStream。所以可以这样理解:keyBy 和聚合是成对出现的,先分区、后聚合,得到的依然是一个 DataStream。而且经过简单聚合之后的数据流,元素的数据类型保持不变。

一个聚合算子,会为每一个 key 保存一个聚合的值,在 Flink 中我们把它叫作“状态”(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,一个只用在含有有限个 key 的数据流上。

3.2.3、归约聚合(Reduce)

Reduce 可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值再做一个聚合计算。

Reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。

调用 KeyedStream 的 Reduce 方法时,需要传入一个参数,实现 ReduceFunction 接口。接口在源码中的定义如下:

1
2
3
public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}

ReduceFunction 接口里需要实现 Reduce() 方法,这个方法接受两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

我们可以单独定义一个函数类实现 ReduceFunction 接口,也可以直接传入一个匿名类。当然,同样也可以通过传入 Lambda 表达式实现类似的功能。

定义一个 WaterSensorMapFunction:

1
2
3
4
5
6
7
8
public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {

@Override
public WaterSensor map(String s) throws Exception {
String[] fileds = s.split(",");
return new WaterSensor(fileds[0],Long.valueOf(fileds[1]),Integer.valueOf(fileds[2]));
}
}

案例:使用 Reduce 实现取最小值功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 规律:无KeyBy(分组),无聚合
*
* reduce(ReduceFunction x):
* ReduceFunction的逻辑由用户自己编写,灵活点
* 特点:
* 输入的类型和输出的类型必须是一致的
* 两两聚合。每两条数据,执行一次聚合
*/
public class Demo02_Reduce {
public static void main(String[] args) throws Exception {
//获取Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//获取数据源
DataStreamSource<String> ds = env.socketTextStream("hadoop102", 9999);
//设置并行度是1
env.setParallelism(1);

/**
* SingleOutputStreamOperator:这个算子是不具有聚合功能的
*/
//一对一映射为WaterSensor对象
SingleOutputStreamOperator<WaterSensor> map = ds.map(new WaterSensorMapFunction());

//KeyedStream:有聚合功能
KeyedStream<WaterSensor, String> keyBy = map.keyBy(WaterSensor::getId);

/**
* 如果是第一条数据的话是不走方法的,而是直接将数据储存在value1中
* 后面的每个数据都会走这个方法,然后返回值会重新写入到value1中
*/
keyBy.reduce(new ReduceFunction<WaterSensor>() {
/**
* value1:上一次计算的结果
* value2:当前最新到达的数据
*/
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
//实现minBy(vc)的效果
// if (value1.getVc() <= value2.getVc()){
//实现minBy(vc,false)的效果
if (value1.getVc()<value2.getVc()){
return value1;
}else {
return value2;
}
}
})
.print();

env.execute();
}
}

Reduce 和简单聚合算子一样,也要针对每一个 key 保存状态。因为状态不会清空,所以我们需要将 Reduce 算子作用在一个有限 key 的流上。

3.3、用户自定义函数(user-defined function,UDF)

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。

用户自定义函数分为:函数类、匿名函数、富函数类。

3.3.1 函数类(Function Classes)

Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction、FilterFunction、ReduceFunction 等。所以用户可以自定义一个函数类,实现对应的接口。

需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:

方式一:实现 FilterFunction 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TransFunctionUDF {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
);

DataStream<String> filter = stream.filter(new UserFilter());

filter.print();
env.execute();
}

public static class UserFilter implements FilterFunction<WaterSensor> {
@Override
public boolean filter(WaterSensor e) throws Exception {
return e.id.equals("sensor_1");
}
}
}

方式二:通过匿名类来实现 FilterFunction 接口:

1
2
3
4
5
6
DataStream<String> stream = stream.filter(new FilterFunction< WaterSensor>() {
@Override
public boolean filter(WaterSensor e) throws Exception {
return e.id.equals("sensor_1");
}
});

方法二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字 “sensor_1” 抽象出来作为类的属性,调用构造方法时传进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
DataStreamSource<WaterSensor> stream = env.fromElements(        
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
);

DataStream<String> stream = stream.filter(new FilterFunctionImpl("sensor_1"));

public static class FilterFunctionImpl implements FilterFunction<WaterSensor> {
private String id;

FilterFunctionImpl(String id) { this.id=id; }

@Override
public boolean filter(WaterSensor value) throws Exception {
return thid.id.equals(value.id);
}
}

方式三:采用匿名函数(Lambda)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TransFunctionUDF {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
);

// map 函数使用 Lambda 表达式,不需要进行类型声明
SingleOutputStreamOperator<String> filter = stream.filter(sensor -> "sensor_1".equals(sensor.id));

filter.print();

env.execute();
}
}

3.3.2 富函数类(Rich Function Classes)

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其 Rich 版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等。

与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

Rich Function 有生命周期的概念。典型的生命周期方法有:

  • open() 方法:Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map() 或者 filter() 方法被调用之前,open() 会首先被调用。
  • close() 方法:生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。

需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction 中的 map,在每条数据来到后都会触发一次调用。

来看一个例子说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class RichFunctionExample {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

env
.fromElements(1,2,3,4)
.map(new RichMapFunction<Integer, Integer>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");
}

@Override
public Integer map(Integer integer) throws Exception {
return integer + 1;
}

@Override
public void close() throws Exception {
super.close();
System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");
}
})
.print();

env.execute();
}
}

运行截图:

3.4、物理分区算子(Physical Partitioning)

常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)

3.4.1、随机分区(Shuffle)

最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的 .shuffle() 方法,将数据随机地分配到下游算子的并行任务中去。

随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。

经过随机分区之后,得到的依然是一个 DataStream。

我们可以做个简单测试:将数据读入之后直接打印到控制台,将输出的并行度设置为 2,中间经历一次 shuffle。执行多次,观察结果是否相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ShuffleExample {
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(2);

DataStreamSource<String> stream = env.socketTextStream("hadoop102", 9999);

stream.shuffle().print();

env.execute();
}
}

第一次运行截图:

第二次运行截图:

3.4.2、轮询分区(Round-Robin)

轮询,简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用 DataStream 的 .rebalance() 方法,就可以实现轮询重分区。rebalance 使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

3.4.3、重缩放分区(rescale)

重缩放分区和轮询分区非常相似。当调用 rescale() 方法时,其实底层也是使用 Round-Robin 算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。rescale 的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。

3.4.4、广播(BroadCast)

这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream的broadcast() 方法,将输入数据复制并发送到下游算子的所有并行任务中去。

1
stream.broadcast()

3.4.5、全局分区(Global)

全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用 .global() 方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去(0号)。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

3.4.6、自定义分区(Custom)

当 Flink 提供的所有分区策略都不能满足用户的需求时,我们可以通过使用 partitionCustom() 方法来自定义分区策略。

1、自定义分区器

1
2
3
4
5
6
7
public class MyPartitioner implements Partitioner<String> {

@Override
public int partition(String key, int numPartitions) {
return Integer.parseInt(key) % numPartitions;
}
}

2、使用自定义分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class PartitionCustomDemo {
public static void main(String[] args) throws Exception {
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

env.setParallelism(2);

DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);

DataStream<String> myDS = socketDS
.partitionCustom(
new MyPartitioner(),
value -> value);

myDS.print();
env.execute();
}
}

3.5、分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个 DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

3.5.1、简单实现(filter)

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用 .filter() 方法进行筛选,就可以得到拆分之后的流了。

案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Demo01_FilterDivide {
public static void main(String[] args) throws Exception {
//创建Flink配置对象
Configuration configuration = new Configuration();
//修改Flink的webUI端口配置
configuration.setInteger("rest.port",8888);
//创建Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

//设置并行度是2
env.setParallelism(2);
//不能进行算子链的合并
env.disableOperatorChaining();

//获取网络传输数据源,并且映射为数字
SingleOutputStreamOperator<Integer> map = env.socketTextStream("hadoop102", 9999)
.name("s")
.map(Integer::valueOf);

//奇数一个流
map.filter(s -> s % 2 == 1).print("奇数");
//偶数一个流
map.filter(s -> s % 2 == 0).print("偶数");

//执行
env.execute();
}
}

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

3.5.2、使用测输出流

简单来说,只需要调用上下文 ctx 的 .output() 方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的id和类型。

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class SplitStreamByOutputTag {    
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction());


OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class)){};
OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class)){};
//返回的都是主流
SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>()
{
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {

if ("s1".equals(value.getId())) {
ctx.output(s1, value);
} else if ("s2".equals(value.getId())) {
ctx.output(s2, value);
} else {
//主流
out.collect(value);
}
}
});

ds1.print("主流,非s1,s2的传感器");
SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1);
SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2);

s1DS.printToErr("s1");
s2DS.printToErr("s2");

env.execute();
}
}

3.6、基本合流操作

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。

3.6.1、联合(union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

在代码中,我们只要基于 DataStream 直接调用 .union() 方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream:

1
stream1.union(stream2, stream3, ...)

注意:union() 的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Demo01_Union {
public static void main(String[] args) throws Exception {
//创建Flink配置对象
Configuration configuration = new Configuration();
//修改Flink的webUI端口配置
configuration.setInteger("rest.port",8888);
//创建Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

env.setParallelism(1);
//创建四个流(前三个流的元素是一致类型)
DataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<Integer> ds1 = env.fromElements(11, 12, 13, 14, 15);
DataStreamSource<Integer> ds2 = env.fromElements(21, 22, 23, 24, 25);
DataStreamSource<String> ds3 = env.fromElements("1","2","3");

//合并为一个流 使用union算子的前提是流中的元素类型是一致的,要不然会报错导致用不了
DataStream<Integer> unionDS = ds.union(ds1, ds2);
unionDS.print();

env.execute();
}
}

运行截图:

3.6.2、连接(Connect)

流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink 还提供了另外一种方便的合流操作——连接(connect)。

1、连接流(Connect)

代码实现:需要分为两步:首先基于一条 DataStream 调用 .connect() 方法,传入另外一条DataStream 作为参数,将两条流连接起来,得到一个 ConnectedStreams;然后再调用同处理方法得到 DataStream。这里可以的调用的同处理方法有 .map()/.flatMap(),以及 .process() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ConnectDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
// DataStreamSource<String> source2 = env.fromElements("a", "b", "c");

SingleOutputStreamOperator<Integer> source1 = env
.socketTextStream("hadoop102", 9999)
.map(i -> Integer.parseInt(i));

DataStreamSource<String> source2 = env.socketTextStream("hadoop102", 9998);

/**
* TODO 使用 connect 合流
* 1、一次只能连接 2条流
* 2、流的数据类型可以不一样
* 3、连接后可以调用 map、flatmap、process来处理,但是各处理各的
*/
ConnectedStreams<Integer, String> connect = source1.connect(source2);

SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {
@Override
public String map1(Integer value) throws Exception {
return "来源于数字流:" + value.toString();
}

@Override
public String map2(String value) throws Exception {
return "来源于字母流:" + value;
}
});

result.print();

env.execute();
}
}

运行截图:

上面的代码中,ConnectedStreams 有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用 .map() 方法时传入的不再是一个简单的 MapFunction,而是一个 CoMapFunction,表示分别对两条流中的数据执行 map 操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1() 就是对第一条流中数据的 map 操作,.map2() 则是针对第二条流。

2、CoProcessFunction

与 CoMapFunction 类似,如果是调用 .map() 就需要传入一个 CoMapFunction,需要实现 map1()map2() 两个方法;而调用 .process() 时,传入的则是一个 CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是 processElement1()processElement2() 两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。
值得一提的是,ConnectedStreams 也可以直接调用 .keyBy() 进行按键分区的操作,得到的还是一个 ConnectedStreams:

1
connectedStreams.keyBy(keySelector1, keySelector2);

这里传入两个参数 keySelector1 和 keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的 keyBy 用法完全一致。ConnectedStreams 进行 keyBy 操作,其实就是把两条流中 key 相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。

案例需求:连接两条流,输出能根据 id 匹配上的数据(类似inner join效果)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class ConnectKeybyDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
Tuple2.of(1, "a1"),
Tuple2.of(1, "a2"),
Tuple2.of(2, "b"),
Tuple2.of(3, "c")
);
DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
Tuple3.of(1, "aa1", 1),
Tuple3.of(1, "aa2", 2),
Tuple3.of(2, "bb", 1),
Tuple3.of(3, "cc", 1)
);

ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);

// 多并行度下,需要根据 关联条件 进行 keyby,才能保证 key 相同的数据到一起去,才能匹配上
ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKey = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);

SingleOutputStreamOperator<String> result = connectKey.process(
new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
// 定义 HashMap,缓存来过的数据,key=id,value=list<数据>
Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();

@Override
public void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {
Integer id = value.f0;
// TODO 1.来过的s1数据,都存起来
if (!s1Cache.containsKey(id)) {
// 1.1 第一条数据,初始化 value的list,放入 hashmap
List<Tuple2<Integer, String>> s1Values = new ArrayList<>();
s1Values.add(value);
s1Cache.put(id, s1Values);
} else {
// 1.2 不是第一条,直接添加到 list中
s1Cache.get(id).add(value);
}

//TODO 2.根据id,查找s2的数据,只输出 匹配上 的数据
if (s2Cache.containsKey(id)) {
for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
out.collect("s1:" + value + "<--------->s2:" + s2Element);
}
}
}

@Override
public void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
Integer id = value.f0;
// TODO 1.来过的s2数据,都存起来
if (!s2Cache.containsKey(id)) {
// 1.1 第一条数据,初始化 value的list,放入 hashmap
List<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();
s2Values.add(value);
s2Cache.put(id, s2Values);
} else {
// 1.2 不是第一条,直接添加到 list中
s2Cache.get(id).add(value);
}

//TODO 2.根据id,查找s1的数据,只输出 匹配上 的数据
if (s1Cache.containsKey(id)) {
for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
out.collect("s1:" + s1Element + "<--------->s2:" + value);
}
}
}
});

result.print();

env.execute();
}
}

运行截图:

4、输出算子(Sink)

Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部储存,为外部应用提供支持。

4.1、连接到外部系统

Flink 的 DataStream API 专门提供了向外部提供写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的。Flink 程序中所有对外的输出操作,一般都是利用 Sink 算子完成的。

Flink1.12 以前,Sink 算子的创建是通过调用 DataStream 的 .addSink() 方法实现的。

1
stream.addSink(new SinkFunction(…));

addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

Flink1.12 开始,同样重构了Sink架构,

1
stream.sinkTo(…)

当然,Sink 多数情况下同样并不需要我们自己实现。之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。Flink 官方为我们提供了一部分的框架的 Sink 连接器。如下图所示,列出了 Flink 官方目前支持的第三方系统连接器:

我们可以看到,像 Kafka 之类流式系统,Flink 提供了完美对接,source/sink 两端都能连接,可读可写;而对于 Elasticsearch、JDBC 等数据存储系统,则只提供了输出写入的 sink 连接器。

除 Flink 官方之外,Apache Bahir 框架,也实现了一些其他第三方系统与 Flink 的连接器。

除此以外,就需要用户自定义实现 sink 连接器了。

4.2、输出到文件

Flink 专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink 支持的文件系统。

FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用 FileSink 的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class SinkFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每个目录中,都有 并行度个数的 文件在写入
env.setParallelism(2);

// 必须开启checkpoint,否则一直都是 .inprogress
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "Number:" + value;
}
},
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(1000),
Types.STRING
);

DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

// 输出到文件系统
FileSink<String> fieSink = FileSink
// 输出行式存储的文件,指定路径、指定编码
.<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
// 输出文件的一些配置: 文件名的前缀、后缀
.withOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("atguigu-")
.withPartSuffix(".log")
.build()
)
// 按照目录分桶:如下,就是每个小时一个目录
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
// 文件滚动策略: 1分钟 或 1m
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(1))
.withMaxPartSize(new MemorySize(1024*1024))
.build()
)
.build();

dataGen.sinkTo(fieSink);
env.execute();
}
}

4.3、输出到 Kafka

  1. 添加 Kafka 连接器依赖

    由于我们已经测试过从 Kafka 数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。

  2. 启动 Kafka 集群

  3. 编写输出到 Kafka 的示例代码

输出无 key 的 record:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SinkKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 如果是精准一次,必须开启checkpoint(后续章节介绍)
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

SingleOutputStreamOperator<String> sensorDS = env
.socketTextStream("hadoop102", 7777);

/**
* Kafka Sink:
* TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
* 1、开启checkpoint(后续介绍)
* 2、设置事务前缀
* 3、设置事务超时时间: checkpoint间隔 < 事务超时时间 < max的15分钟
*/
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// 指定 kafka 的地址和端口
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
// 指定序列化器:指定Topic名称、具体的序列化
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("ws")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// 写到kafka的一致性级别: 精准一次、至少一次
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 如果是精准一次,必须设置 事务的前缀
.setTransactionalIdPrefix("atguigu-")
// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
.build();

sensorDS.sinkTo(kafkaSink);
env.execute();
}
}

自定义序列化器,实现带 key 的 record:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class SinkKafkaWithKey {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.noRestart());


SingleOutputStreamOperator<String> sensorDS = env
.socketTextStream("hadoop102", 7777);


/**
* 如果要指定写入kafka的key,可以自定义序列化器:
* 1、实现 一个接口,重写 序列化 方法
* 2、指定key,转成 字节数组
* 3、指定value,转成 字节数组
* 4、返回一个 ProducerRecord对象,把key、value放进去
*/
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
.setRecordSerializer(
new KafkaRecordSerializationSchema<String>() {
@Nullable
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
String[] datas = element.split(",");
byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
byte[] value = element.getBytes(StandardCharsets.UTF_8);
return new ProducerRecord<>("ws", key, value);
}
}
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("atguigu-")
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
.build();

sensorDS.sinkTo(kafkaSink);
env.execute();
}
}

运行代码,在 Linux 主机启动一个消费者,查看是否收到数据

1
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws

4.4、输出到 MySQL(JDBC)

  1. 添加依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <!--mysql驱动 -->
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>3.1.0-1.17</version>
    </dependency>
  2. 启动 MySQL,在 test 库下建表

    1
    2
    3
    4
    5
    6
    CREATE TABLE `ws` (
    `id` varchar(100) NOT NULL,
    `ts` bigint(20) DEFAULT NULL,
    `vc` int(11) DEFAULT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
  3. 输出到 MySQL 的示例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    public class SinkMySQL {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    SingleOutputStreamOperator<WaterSensor> sensorDS = env
    .socketTextStream("hadoop102", 7777)
    .map(new WaterSensorMapFunction());

    /**
    * TODO 写入mysql
    * 1、只能用老的sink写法: addsink
    * 2、JDBCSink的4个参数:
    * 第一个参数: 执行的sql,一般就是 insert into
    * 第二个参数: 预编译sql, 对占位符填充值
    * 第三个参数: 执行选项 ---》 攒批、重试
    * 第四个参数: 连接选项 ---》 url、用户名、密码
    */
    SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
    "insert into ws values(?,?,?)",
    new JdbcStatementBuilder<WaterSensor>() {
    @Override
    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
    //每收到一条WaterSensor,如何去填充占位符
    preparedStatement.setString(1, waterSensor.getId());
    preparedStatement.setLong(2, waterSensor.getTs());
    preparedStatement.setInt(3, waterSensor.getVc());
    }
    },
    JdbcExecutionOptions.builder()
    .withMaxRetries(3) // 重试次数
    .withBatchSize(100) // 批次的大小:条数
    .withBatchIntervalMs(3000) // 批次的时间
    .build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
    .withUsername("root")
    .withPassword("000000")
    .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
    .build()
    );

    sensorDS.addSink(jdbcSink);
    env.execute();
    }
    }
  4. 运行代码,用客户端连接 MySQL,查看是否成功写入数据。

4.4、自定义 Sink 输出

如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,就只能自定义Sink 进行输出了。与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction 抽象类,只要实现它,通过简单地调用 DataStream 的 .addSink() 方法就可以自定义写入任何外部存储。

1
stream.addSink(new MySinkFunction<String>());

在实现 SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义 Sink 想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器 Flink 官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。


06-Flink DateStrame API
https://flepeng.github.io/045-Flink-31-基础-06-Flink-DateStrame-API/
作者
Lepeng
发布于
2021年3月8日
许可协议