08-Flink 时间语义和水位线

Flink 官网主页地址:https://flink.apache.org
Flink 官方中文地址:https://nightlies.apache.org/flink/flink-docs-stable/zh/

1、时间语义

1.2、哪种时间语义更重要

为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例子:电影《星球大战》。

如上图所示,我们会发现,看电影其实就是处理影片中数据的过程,所以影片的上映时间就相当于“处理时间”;而影片的数据就是所描述的故事,它所发生的背景时间就相当于“事件时间”。两种时间语义都有各自的用途,适用于不同的场景。

在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。

在 Flink 中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从 Flink1.12 版本开始,Flink 已经将事件时间作为默认的时间语义了。

2、水位线(Watermark)

2.1、事件时间和窗口

2.2、什么是水位线

在 Flink 中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

  1. 有序流中的水位线

  2. 乱序流中的水位线


  3. 水位线特性

    • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
    • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
    • 水位线是基于数据的时间戳产生的
    • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
    • 水位线可以通过设置延迟来确保正确处理乱序数据
    • 一个水位线Watermark(t)表示当前流事件时间已经达到了时间戳,这代表t之前所有数据都到齐了,之后流中不会出现时间戳t<=t的数据

水位线是 Flink 流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

2.3、水位线和窗口的工作原理



注意:Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。

2.4、生成水位线

2.4.1、生成水位线的总体原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。

如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对正确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以 Flink 中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权利交给了程序员,我们可以在代码中定义水位线的生产策略。

2.4.2、水位线生成策略

在 Flink 的 DataStream API 中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。具体使用如下:

1
2
DataStream<Event> stream = env.addSource(new ClickSource());
DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(<watermark strategy>);

说明:WatermarkStrategy 作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy 是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner 和一个“水位线生成器” WatermarkGenerator。

1
2
3
4
5
6
7
8
9
10
11
12
public interface WatermarkStrategy<T> 
extends TimestampAssignerSupplier<T>,
WatermarkGeneratorSupplier<T>{

// 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

// 主要负责按照既定的方式,基于时间戳生成水位线
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

1、有序流中内置水位线设置

对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用 WatermarkStrategy.forMonotonousTimestamps() 方法就可以实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 在中间环节产生水印,需要使用:assignTimestampsAndWatermarks(WatermarkStrategy x)
*
* WatermarkStrategy:水印策略。
* 包含以下信息:
* (1)水印的特征
* (a)无水印:watermarkStrategy.noWatermarks()
* (b)自定义水印特征 WatermarkStrategy.forGenerator()
* 选择系统已经提供:
* (c)连续水印:数据中提取时间属性-1ms-0
* WatermarkStrategy.forMonotonousTimeStamps()
* (d)乱序水印:数据中提前的时间属性-1ms-自定义间隔时间
* WatermarkStrategy.forBoundedOutOfOrderNess()
* (2)水印的计算方式
* 水印从数据的时间属性中计算得到
* 计算方式的核心功能就是告诉算子,数据中的哪个属性是事件属性
* ----------------------------------------------------------------------------
* 一开始玩,一定要把并行度设置为1
*/
public class Demo01_ShowWaterMark {
public static void main(String[] args) throws Exception {
//创建Flink配置类(空参创建的话都是默认值)
Configuration configuration = new Configuration();
//修改配置类中的WebUI端口号
configuration.setInteger("rest.port",3333);
//创建Flink环境(并且传入配置对象)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

//不合并操作算子
env.disableOperatorChaining();
//设置并行度是1
env.setParallelism(1);
//自定义水印策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()
.withTimestampAssigner((e, ts) -> e.getTs());

env.socketTextStream("hadoop102",9999)
.map(new WaterSensorFunction())
.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(value+"="+ctx.timerService().currentWatermark());
}
})
.print();

env.execute();
}
}

测试截图:

2、乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness() 方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class WatermarkOutOfOrdernessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);


SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction());


// TODO 1.定义Watermark策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 1.1 指定watermark生成:乱序的,等待3s
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
// 1.2 指定 时间戳分配器,从数据中提取
.withTimestampAssigner(
(element, recordTimestamp) -> {
// 返回的时间戳,要 毫秒
System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
return element.getTs() * 1000L;
});

// TODO 2. 指定 watermark策略
SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);


sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
// TODO 3.使用 事件时间语义 的窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(
new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

@Override
public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

long count = elements.spliterator().estimateSize();

out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
}
}
)
.print();

env.execute();
}
}

测试截图:

2.4.4、自定义水位线生成器

1、周期性水位生成器(Periodic Generator)

周期性生成器一般是通过 onEvent() 观察判断输入的事件,而在 onPeriodicEmit() 里触发水位线。 下面是一段自定义周期性生成水位线的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env
.addSource(new ClickSource())
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.print();

env.execute();
}

public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {

@Override
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {

return new SerializableTimestampAssigner<Event>() {

@Override
public long extractTimestamp(Event element,long recordTimestamp) {
return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
}
};
}

@Override
public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomBoundedOutOfOrdernessGenerator();
}
}

public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {

private Long delayTime = 5000L; // 延迟时间
private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳

@Override
public void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {
// 每来一条数据就调用一次
maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳
}

@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 发射水位线,默认200ms调用一次
output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
}
}
}

我们在 onPeriodicEmit() 里调用 output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认 200ms 一次。

如果想修改默认周期时间,可以通过下面方法修改。例如:修改为400ms

1
env.getConfig().setAutoWatermarkInterval(400L);

2、断点式水位生成器(Punctuated Generator)

断点式生成器会不停地检测 onEvent() 中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在 onEvent 方法当中即可。

3、在数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用 assignTimestampsAndWatermarks 方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用 assignTimestampsAndWatermarks 方法生成水位线二者只能取其一。示例程序如下:

1
2
3
env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)

2.5、水位线的传递

在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所以得下游子任务。而当一个任务接受到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。

水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。

2.6、迟到数据的处理

2.6.1、推迟水印推进

在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

1
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

2.6.2、设置窗口延迟关闭

Flink 的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到 wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。

1
2
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

2.6.3、使用测流接受迟到的数据

1
2
3
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateWS)

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* 正常情况下,数据的产生一定是从早到晚。
* 早产生的数据,一定是先到达系统的
* 很少会有迟到的情况。
*
* -------------------------------------
* 水印的意义:如果当前的算子的水印是x,那么意味着正常情况下,x之前的数据都已经到达了
*
* 迟到:数据的时间属性<水印
*
* 处理迟到的数据:
* 迟到的数据,在对应的窗口关闭之前到达,是不受影响的
* 迟到的数据,在对应的窗口关闭之后到达,是无法进入窗口的,也无法被计算,此时可以这样处理:
* 1、调慢水印时间
* 操作的是水印策略
* WatermarkStrategy.forBoundedOutOfOrderNess(Duration.ofSeconds(5))
*
* 2、如果1无法解决,可以延迟窗口的关闭时间
* 操作的对象是窗口
* 窗口是到点就计算,然后关闭
* 当我延迟以后,窗口到点就计算,但是不关闭,会延迟一段时间再关闭。在此期间如果有数据,会再次触发窗口计算
*
* 3、如果2无法解决,可以使用测流接受迟到的数据
* 操作的对象是窗口
* 窗口关闭之后的数据,可以导入到一个测流中。后续再处理
*
* 如果测流中的数据过多,说明当前的系统有问题。排查
* 或数据在产生时,无法保证时序,对于无法保证时序的数据,建议批处理
*
*/
public class Demo03_HandleLate {
public static void main(String[] args) throws Exception {
//创建Flink配置类(空参创建的话都是默认值)
Configuration configuration = new Configuration();
//修改配置类中的WebUI端口号
configuration.setInteger("rest.port",3333);
//创建Flink环境(并且传入配置对象)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

//设置周期性发送水印(单位是毫秒)
env.getConfig().setAutoWatermarkInterval(2000);
//关闭算子链
env.disableOperatorChaining();
//设置并行度
env.setParallelism(1);
//设置水印特征
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
//设置是乱序水印
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//从数据中抽取时间戳
.withTimestampAssigner(
(e, ts) -> e.getTs()
);

//设置测流标记
OutputTag<WaterSensor> lateTag = new OutputTag<>("late", TypeInformation.of(WaterSensor.class));

SingleOutputStreamOperator<String> process = env.socketTextStream("hadoop102", 9999)
.map(new WaterSensorFunction())
//设置水印策略
.assignTimestampsAndWatermarks(watermarkStrategy)
//全局时间滚动窗口
/**
* 5s的滚动窗口。
* 滚动窗口,从1970-1-1 0:0:0开始当作第一个窗口的起始时间计算。
*
* 第一个窗口: [0,5000) 等价于 [0,4999]
* 4999就触发第一次运算,但是不关闭。
* 6999关闭
* 第二个窗口: [5000,9999]
* 9999就触发第一次运算,但是不关闭
* 11999关闭
*/
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
//运行延迟2s
.allowedLateness(Time.seconds(2))
//添加测流
.sideOutputLateData(lateTag)
//运算逻辑
.process(new ProcessAllWindowFunction<WaterSensor, String, TimeWindow>() {
@Override
public void process(ProcessAllWindowFunction<WaterSensor, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
TimeWindow window = context.window();
out.collect(window + ":" + MyUtils.paresToList(elements));
}
});

//主流打印
process.print();
//测流打印
process.getSideOutput(lateTag).printToErr("迟到");

//执行
env.execute();
}
}

测试截图:

3、基于时间的合流——双流联结(Join)

可以发现,根据某个 key 合并两条流,与 关系型数据库中的表的 join 操作非常近似。事实上,Flink 中两条流的 connect 操作,就可以通过 keyBy 指定键进行分组后合并,实现了类似于 SQL 中的 join 操作;另外 connect 支持处理函数,可以使用自定义实现各种需求,其实已经能够处理双流 join 的大多数场景。

不过处理函数是底层接口,所以尽管 connect 能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink 的 DataStrema API 提供了内置的 join 算子。

3.1、窗口联结(Window Join)

Flink 为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

1、窗口联结的调用

窗口联结在代码中的实现,首先需要调用 DataStream 的 .join() 方法来合并两条流,得到一个 JoinedStreams;接着通过 .where().equalTo() 方法指定两条流中联结的 key;然后通过 .window() 开窗口,并调用 apply() 方法传入联结窗口函数进行处理计算。通用调用形式如下:

1
2
3
4
5
stream1.join(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)

上面代码中 .where() 的参数是键选择器(KeySelector),用来指定第一条流中的 key;而 .equalTo() 传入的 KeySelector 则指定了第二条流中的 key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。

这里 .window() 传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗口(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。

而后面调用 .apply() 可以看作实现了一个特殊的窗口函数。注意这里只能调用 .apply(),没有其他替代的方法。

传入的 JoinFunction 也是一个函数类接口,使用时需要实现内部的 .join() 方法。这个方法有两个参数,分别表示两条流中成对匹配的数据。

其实仔细观察可以发现,窗口 join 的调用语法和我们熟悉的 SQL 中表的 join 非常相似:

1
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;

这句 SQL 中 where 子句的表达,等价于 inner join … on,所以本身表示的是两张表基于 id 的“内连接”(inner join)。而 Flink 中的 window join,同样类似于 inner join。也就是说,最后处理输出的,只有两条流中数据按 key 配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用 JoinFunction 的 .join() 方法,也就没有任何输出了。

2、窗口联结实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class WindowJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
.fromElements(
Tuple2.of("a", 1),
Tuple2.of("a", 2),
Tuple2.of("b", 3),
Tuple2.of("c", 4)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);


SingleOutputStreamOperator<Tuple3<String, Integer,Integer>> ds2 = env
.fromElements(
Tuple3.of("a", 1,1),
Tuple3.of("a", 11,1),
Tuple3.of("b", 2,1),
Tuple3.of("b", 12,1),
Tuple3.of("c", 14,1),
Tuple3.of("d", 15,1)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Integer,Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);

// TODO window join
// 1. 落在同一个时间窗口范围内才能匹配
// 2. 根据keyby的key,来进行匹配关联
// 3. 只能拿到匹配上的数据,类似有固定时间范围的inner join
DataStream<String> join = ds1.join(ds2)
.where(r1 -> r1.f0) // ds1的keyby
.equalTo(r2 -> r2.f0) // ds2的keyby
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
/**
* 关联上的数据,调用join方法
* @param first ds1的数据
* @param second ds2的数据
* @return
* @throws Exception
*/
@Override
public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
return first + "<----->" + second;
}
});

join.print();

env.execute();
}

运行截图:

3.2、间隔联结(Interval Join)

在有些场景下,我们要处理的时间间隔可能并不是固定的。这时显然不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧,于是窗口内就都没有匹配了;会话窗口虽然时间不固定,但也明显不适合这个场景。基于时间的窗口联结已经无能为力了。

为了应对这样的需求,Flink 提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。

1、间隔联结的原理

间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素 a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素 b,如果它的时间戳落在了这个区间范围内,a 和 b 就可以成功配对,进而进行计算输出结果。所以匹配的条件为:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里需要注意,做间隔联结的两条流 A 和 B,也必须基于相同的 key;下界 lowerBound 应该小于等于上界 upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。

如下图所示,我们可以清楚地看到间隔联结的方式:

下方的流A 去间隔联结上方的流B,所以基于 A 的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为 2 的 A 中元素,它的可匹配区间就是 [0, 3],流 B 中有时间戳为 0、1 的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A 中时间戳为3的元素,可匹配区间为 [1, 4],B 中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。
所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join 做匹配的时间段是基于流中数据的,所以并不确定;而且流 B 中的数据可以不只在一个区间内被匹配。

2、间隔联结的调用

间隔联结在代码中,是基于 KeyedStream 的联结(join)操作。DataStream 在 keyBy 得到 KeyedStream 之后,可以调用 .intervalJoin() 来合并两条流,传入的参数同样是一个 KeyedStream,两者的 key 类型应该一致;得到的是一个 IntervalJoin 类型。后续的操作同样是完全固定的:先通过 .between() 方法指定间隔的上下界,再调用 .process() 方法,定义对匹配数据对的处理操作。调用 .process() 需要传入一个处理函数,这是处理函数家族的最后一员:“处理联结函数”ProcessJoinFunction。

通用调用形式如下:

1
2
3
4
5
6
7
8
9
10
11
stream1
.keyBy(<KeySelector>)
.intervalJoin(stream2.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){

@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(left + "," + right);
}
});

可以看到,抽象类 ProcessJoinFunction 就像是 ProcessFunction 和 JoinFunction 的结合,内部同样有一个抽象方法 .processElement()。与其他处理函数不同的是,它多了一个参数,这自然是因为有来自两条流的数据。参数中 left 指的就是第一条流中的数据,right 则是第二条流中与它匹配的数据。每当检测到一组匹配,就会调用这里的 .processElement() 方法,经处理转换之后输出结果。

3、间隔连接实例

案例需求:在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户,来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。

(1)代码实现:正常使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class IntervalJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
.fromElements(
Tuple2.of("a", 1),
Tuple2.of("a", 2),
Tuple2.of("b", 3),
Tuple2.of("c", 4)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);


SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
.fromElements(
Tuple3.of("a", 1, 1),
Tuple3.of("a", 11, 1),
Tuple3.of("b", 2, 1),
Tuple3.of("b", 12, 1),
Tuple3.of("c", 14, 1),
Tuple3.of("d", 15, 1)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);

// TODO interval join
//1. 分别做keyby,key其实就是关联条件
KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);
KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);

//2. 调用 interval join
ks1.intervalJoin(ks2)
.between(Time.seconds(-2), Time.seconds(2))
.process(
new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
/**
* 两条流的数据匹配上,才会调用这个方法
* @param left ks1的数据
* @param right ks2的数据
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {
// 进入这个方法,是关联上的数据
out.collect(left + "<------>" + right);
}
})
.print();
env.execute();
}
}

(2)代码实现,处理迟到的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class IntervalJoinWithLateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
.socketTextStream("hadoop102", 7777)
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] datas = value.split(",");
return Tuple2.of(datas[0], Integer.valueOf(datas[1]));
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);

SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
.socketTextStream("hadoop102", 8888)
.map(new MapFunction<String, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> map(String value) throws Exception {
String[] datas = value.split(",");
return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);

/**
* TODO Interval join
* 1、只支持事件时间
* 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后
* 3、process中,只能处理 join上的数据
* 4、两条流关联后的watermark,以两条流中最小的为准
* 5、如果 当前数据的事件时间 < 当前的watermark,就是迟到数据, 主流的process不处理
* => between后,可以指定将 左流 或 右流 的迟到数据 放入侧输出流
*/

//1. 分别做keyby,key其实就是关联条件
KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);
KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);

//2. 调用 interval join
OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));
OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));
SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2)
.between(Time.seconds(-2), Time.seconds(2))
.sideOutputLeftLateData(ks1LateTag) // 将 ks1的迟到数据,放入侧输出流
.sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据,放入侧输出流
.process(
new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
/**
* 两条流的数据匹配上,才会调用这个方法
* @param left ks1的数据
* @param right ks2的数据
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {
// 进入这个方法,是关联上的数据
out.collect(left + "<------>" + right);
}
});

process.print("主流");
process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");
process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");
env.execute();
}
}

08-Flink 时间语义和水位线
https://flepeng.github.io/044-Flink-42-核心概念-08-Flink-时间语义和水位线/
作者
Lepeng
发布于
2021年3月8日
许可协议