Flink 官网主页地址:https://flink.apache.org
Flink 官方中文地址:https://nightlies.apache.org/flink/flink-docs-stable/zh/
1、时间语义
1.1、Flink 中的时间语义
1.2、哪种时间语义更重要
为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例子:电影《星球大战》。
如上图所示,我们会发现,看电影其实就是处理影片中数据的过程,所以影片的上映时间就相当于“处理时间”;而影片的数据就是所描述的故事,它所发生的背景时间就相当于“事件时间”。两种时间语义都有各自的用途,适用于不同的场景。
在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。
在 Flink 中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从 Flink1.12 版本开始,Flink 已经将事件时间作为默认的时间语义了。
2、水位线(Watermark)
2.1、事件时间和窗口
2.2、什么是水位线
在 Flink 中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
有序流中的水位线
乱序流中的水位线
水位线特性
- 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
- 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
- 水位线是基于数据的时间戳产生的
- 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
- 水位线可以通过设置延迟来确保正确处理乱序数据
- 一个水位线Watermark(t)表示当前流事件时间已经达到了时间戳,这代表t之前所有数据都到齐了,之后流中不会出现时间戳t<=t的数据
水位线是 Flink 流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。
2.3、水位线和窗口的工作原理
注意:Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。
2.4、生成水位线
2.4.1、生成水位线的总体原则
完美的水位线是“绝对正确”的,也就是一个水位线一旦出现就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对正确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以 Flink 中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权利交给了程序员,我们可以在代码中定义水位线的生产策略。
2.4.2、水位线生成策略
在 Flink 的 DataStream API 中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks()
,它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。具体使用如下:
1 2
| DataStream<Event> stream = env.addSource(new ClickSource()); DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(<watermark strategy>);
|
说明:WatermarkStrategy
作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy 是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner 和一个“水位线生成器” WatermarkGenerator。
1 2 3 4 5 6 7 8 9 10 11 12
| public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
@Override TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
@Override WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); }
|
2.4.3、Flink 内置水位线
1、有序流中内置水位线设置
对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用 WatermarkStrategy.forMonotonousTimestamps()
方法就可以实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
public class Demo01_ShowWaterMark { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setInteger("rest.port",3333); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.disableOperatorChaining(); env.setParallelism(1); WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner((e, ts) -> e.getTs());
env.socketTextStream("hadoop102",9999) .map(new WaterSensorFunction()) .assignTimestampsAndWatermarks(watermarkStrategy) .keyBy(WaterSensor::getId) .process(new KeyedProcessFunction<String, WaterSensor, String>() { @Override public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception { out.collect(value+"="+ctx.timerService().currentWatermark()); } }) .print();
env.execute(); } }
|
测试截图:
2、乱序流中内置水位线设置
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()
方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public class WatermarkOutOfOrdernessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction());
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner( (element, recordTimestamp) -> { System.out.println("数据=" + element + ",recordTs=" + recordTimestamp); return element.getTs() * 1000L; });
SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
sensorDSwithWatermark.keyBy(sensor -> sensor.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process( new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } } ) .print();
env.execute(); } }
|
测试截图:
2.4.4、自定义水位线生成器
1、周期性水位生成器(Periodic Generator)
周期性生成器一般是通过 onEvent()
观察判断输入的事件,而在 onPeriodicEmit()
里触发水位线。 下面是一段自定义周期性生成水位线的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public class CustomPeriodicWatermarkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env .addSource(new ClickSource()) .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) .print();
env.execute(); }
public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
@Override public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<Event>() {
@Override public long extractTimestamp(Event element,long recordTimestamp) { return element.timestamp; } }; }
@Override public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new CustomBoundedOutOfOrdernessGenerator(); } }
public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {
private Long delayTime = 5000L; private Long maxTs = -Long.MAX_VALUE + delayTime + 1L;
@Override public void onEvent(Event event,long eventTimestamp,WatermarkOutput output) { maxTs = Math.max(event.timestamp,maxTs); }
@Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTs - delayTime - 1L)); } } }
|
我们在 onPeriodicEmit()
里调用 output.emitWatermark()
,就可以发出水位线了;这个方法由系统框架周期性地调用,默认 200ms 一次。
如果想修改默认周期时间,可以通过下面方法修改。例如:修改为400ms
1
| env.getConfig().setAutoWatermarkInterval(400L);
|
2、断点式水位生成器(Punctuated Generator)
断点式生成器会不停地检测 onEvent()
中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在 onEvent 方法当中即可。
3、在数据源中发送水位线
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用 assignTimestampsAndWatermarks 方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用 assignTimestampsAndWatermarks 方法生成水位线二者只能取其一。示例程序如下:
1 2 3
| env.fromSource( kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource" )
|
2.5、水位线的传递
在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所以得下游子任务。而当一个任务接受到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。
2.6、迟到数据的处理
2.6.1、推迟水印推进
在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。
1
| WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
|
2.6.2、设置窗口延迟关闭
Flink 的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到 wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。
1 2
| .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3))
|
2.6.3、使用测流接受迟到的数据
1 2 3
| .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3)) .sideOutputLateData(lateWS)
|
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
public class Demo03_HandleLate { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setInteger("rest.port",3333); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.getConfig().setAutoWatermarkInterval(2000); env.disableOperatorChaining(); env.setParallelism(1); WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner( (e, ts) -> e.getTs() );
OutputTag<WaterSensor> lateTag = new OutputTag<>("late", TypeInformation.of(WaterSensor.class));
SingleOutputStreamOperator<String> process = env.socketTextStream("hadoop102", 9999) .map(new WaterSensorFunction()) .assignTimestampsAndWatermarks(watermarkStrategy)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(2)) .sideOutputLateData(lateTag) .process(new ProcessAllWindowFunction<WaterSensor, String, TimeWindow>() { @Override public void process(ProcessAllWindowFunction<WaterSensor, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { TimeWindow window = context.window(); out.collect(window + ":" + MyUtils.paresToList(elements)); } });
process.print(); process.getSideOutput(lateTag).printToErr("迟到");
env.execute(); } }
|
测试截图:
3、基于时间的合流——双流联结(Join)
可以发现,根据某个 key 合并两条流,与 关系型数据库中的表的 join 操作非常近似。事实上,Flink 中两条流的 connect 操作,就可以通过 keyBy 指定键进行分组后合并,实现了类似于 SQL 中的 join 操作;另外 connect 支持处理函数,可以使用自定义实现各种需求,其实已经能够处理双流 join 的大多数场景。
不过处理函数是底层接口,所以尽管 connect 能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink 的 DataStrema API 提供了内置的 join 算子。
3.1、窗口联结(Window Join)
Flink 为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
1、窗口联结的调用
窗口联结在代码中的实现,首先需要调用 DataStream 的 .join()
方法来合并两条流,得到一个 JoinedStreams;接着通过 .where()
和 .equalTo()
方法指定两条流中联结的 key;然后通过 .window()
开窗口,并调用 apply()
方法传入联结窗口函数进行处理计算。通用调用形式如下:
1 2 3 4 5
| stream1.join(stream2) .where(<KeySelector>) .equalTo(<KeySelector>) .window(<WindowAssigner>) .apply(<JoinFunction>)
|
上面代码中 .where()
的参数是键选择器(KeySelector),用来指定第一条流中的 key;而 .equalTo()
传入的 KeySelector 则指定了第二条流中的 key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。
这里 .window()
传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗口(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。
而后面调用 .apply()
可以看作实现了一个特殊的窗口函数。注意这里只能调用 .apply()
,没有其他替代的方法。
传入的 JoinFunction 也是一个函数类接口,使用时需要实现内部的 .join()
方法。这个方法有两个参数,分别表示两条流中成对匹配的数据。
其实仔细观察可以发现,窗口 join 的调用语法和我们熟悉的 SQL 中表的 join 非常相似:
1
| SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
|
这句 SQL 中 where 子句的表达,等价于 inner join … on,所以本身表示的是两张表基于 id 的“内连接”(inner join)。而 Flink 中的 window join,同样类似于 inner join。也就是说,最后处理输出的,只有两条流中数据按 key 配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用 JoinFunction 的 .join()
方法,也就没有任何输出了。
2、窗口联结实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| public class WindowJoinDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env .fromElements( Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L) );
SingleOutputStreamOperator<Tuple3<String, Integer,Integer>> ds2 = env .fromElements( Tuple3.of("a", 1,1), Tuple3.of("a", 11,1), Tuple3.of("b", 2,1), Tuple3.of("b", 12,1), Tuple3.of("c", 14,1), Tuple3.of("d", 15,1) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Integer,Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L) );
DataStream<String> join = ds1.join(ds2) .where(r1 -> r1.f0) .equalTo(r2 -> r2.f0) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
@Override public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception { return first + "<----->" + second; } });
join.print();
env.execute(); }
|
运行截图:
3.2、间隔联结(Interval Join)
在有些场景下,我们要处理的时间间隔可能并不是固定的。这时显然不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧,于是窗口内就都没有匹配了;会话窗口虽然时间不固定,但也明显不适合这个场景。基于时间的窗口联结已经无能为力了。
为了应对这样的需求,Flink 提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。
1、间隔联结的原理
间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素 a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound]
,即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素 b,如果它的时间戳落在了这个区间范围内,a 和 b 就可以成功配对,进而进行计算输出结果。所以匹配的条件为:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流 A 和 B,也必须基于相同的 key;下界 lowerBound 应该小于等于上界 upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
如下图所示,我们可以清楚地看到间隔联结的方式:
下方的流A 去间隔联结上方的流B,所以基于 A 的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为 2 的 A 中元素,它的可匹配区间就是 [0, 3]
,流 B 中有时间戳为 0、1 的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A 中时间戳为3的元素,可匹配区间为 [1, 4]
,B 中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。
所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join 做匹配的时间段是基于流中数据的,所以并不确定;而且流 B 中的数据可以不只在一个区间内被匹配。
2、间隔联结的调用
间隔联结在代码中,是基于 KeyedStream 的联结(join)操作。DataStream 在 keyBy 得到 KeyedStream 之后,可以调用 .intervalJoin()
来合并两条流,传入的参数同样是一个 KeyedStream,两者的 key 类型应该一致;得到的是一个 IntervalJoin 类型。后续的操作同样是完全固定的:先通过 .between()
方法指定间隔的上下界,再调用 .process()
方法,定义对匹配数据对的处理操作。调用 .process()
需要传入一个处理函数,这是处理函数家族的最后一员:“处理联结函数”ProcessJoinFunction。
通用调用形式如下:
1 2 3 4 5 6 7 8 9 10 11
| stream1 .keyBy(<KeySelector>) .intervalJoin(stream2.keyBy(<KeySelector>)) .between(Time.milliseconds(-2), Time.milliseconds(1)) .process (new ProcessJoinFunction<Integer, Integer, String(){
@Override public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) { out.collect(left + "," + right); } });
|
可以看到,抽象类 ProcessJoinFunction 就像是 ProcessFunction 和 JoinFunction 的结合,内部同样有一个抽象方法 .processElement()
。与其他处理函数不同的是,它多了一个参数,这自然是因为有来自两条流的数据。参数中 left 指的就是第一条流中的数据,right 则是第二条流中与它匹配的数据。每当检测到一组匹配,就会调用这里的 .processElement()
方法,经处理转换之后输出结果。
3、间隔连接实例
案例需求:在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户,来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。
(1)代码实现:正常使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| public class IntervalJoinDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env .fromElements( Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L) );
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env .fromElements( Tuple3.of("a", 1, 1), Tuple3.of("a", 11, 1), Tuple3.of("b", 2, 1), Tuple3.of("b", 12, 1), Tuple3.of("c", 14, 1), Tuple3.of("d", 15, 1) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L) );
KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0); KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);
ks1.intervalJoin(ks2) .between(Time.seconds(-2), Time.seconds(2)) .process( new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
@Override public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception { out.collect(left + "<------>" + right); } }) .print(); env.execute(); } }
|
(2)代码实现,处理迟到的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| public class IntervalJoinWithLateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env .socketTextStream("hadoop102", 7777) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] datas = value.split(","); return Tuple2.of(datas[0], Integer.valueOf(datas[1])); } }) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((value, ts) -> value.f1 * 1000L) );
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env .socketTextStream("hadoop102", 8888) .map(new MapFunction<String, Tuple3<String, Integer, Integer>>() { @Override public Tuple3<String, Integer, Integer> map(String value) throws Exception { String[] datas = value.split(","); return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2])); } }) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((value, ts) -> value.f1 * 1000L) );
KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0); KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);
OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT)); OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT)); SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2) .between(Time.seconds(-2), Time.seconds(2)) .sideOutputLeftLateData(ks1LateTag) .sideOutputRightLateData(ks2LateTag) .process( new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
@Override public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception { out.collect(left + "<------>" + right); } });
process.print("主流"); process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据"); process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据"); env.execute(); } }
|