09-Flink 处理函数之广播流

Flink 官网主页地址:https://flink.apache.org
Flink 官方中文地址:https://nightlies.apache.org/flink/flink-docs-stable/zh/

Broadcast State 是 Flink 1.5 引入的功能。Flink 中的广播流(Broadcast Stream)是一种特殊的数据流,它允许将一个流的数据广播到其他所有的流上。从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用,因为这些数据需要在所有任务中保持一致且实时更新。

使用场景示例:

  • 应用配置更新:将配置信息广播到多个数据流,当配置发生变化时,可以实时更新所有使用该配置的数据流处理逻辑。

    • 例如,我们需要依赖一个不断变化的控制规则来处理主数据流的数据,主数据流数据量比较大,只能分散到多个算子实例上,控制规则数据相对比较小,可以分发到所有的算子实例上。
    • 例如,当需要从 MySQL 数据库中实时查询和更新某些关键字过滤规则时,如果直接在计算函数中进行查询,可能会阻塞整个计算过程甚至导致任务停止。通过使用广播流,可以将这些配置信息广播到所有相关任务的实例中,然后在计算过程中直接使用这些配置信息,从而提高计算效率和实时性。
  • 查询规则更新:将查询规则广播到数据流,进行实时的规则检测和响应。

  • 缓存数据更新:将缓存的数据或者参数广播到数据流,进行实时的数据缓存更新。

Broadcast State 与直接在时间窗口进行两个数据流的 Join 的不同点在于,控制规则数据量较小,可以直接放到每个算子实例里,这样可以大大提高主数据流的处理速度。

广播流的使用通常涉及以下步骤:

  1. 定义MapStateDescriptor:首先需要定义一个 MapStateDescriptor 来描述要广播的数据的格式。这个描述器指定了数据的键值对类型。

  2. 创建广播流:然后,需要将一个普通的流转换为广播流。这通常通过调用流的 broadcast() 方法实现,并将 MapStateDescriptor 作为参数传入。

  3. 连接广播流与非广播流:一旦有了广播流,就可以将其与一个或多个非广播流(无论是 Keyed 流还是 Non-Keyed 流)连接起来。这通过调用非广播流的 connect() 方法完成,并将广播流作为参数传入。连接后的流是一个 BroadcastConnectedStream,它提供了 process() 方法用于处理数据。

  4. 处理数据:在 process() 方法中,可以编写逻辑来处理非广播流和广播流的数据。根据非广播流的类型(Keyed或Non-Keyed),需要传入相应的 KeyedBroadcastProcessFunctionBroadcastProcessFunction 类型的处理函数。

总的来说,Flink 的广播流提供了一种有效的方式来实现不同任务间的数据共享和实时更新,适用于各种需要全局数据或配置的场景。

案例

将用户信息进行广播,从 Kafka 中读取用户访问记录,判断访问用户是否存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;

import flink.demo.data.UserVo;
/**
* 多流connect,并进行join
*
*/
public class BroadcastTest{

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties proterties = new Properties();
proterties.setProperty("bootstrap.servers", "10.168.88.88:9092");
proterties.setProperty("group.id", "test");
proterties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
proterties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// proterties.setProperty("auto.offset.reset", "latest");

FlinkKafkaConsumer<ObjectNode> consumerVisit= new FlinkKafkaConsumer<>("test", new JSONKeyValueDeserializationSchema(false), proterties);
DataStreamSource<ObjectNode> streamSource = env.addSource(consumerVisit);

DataStreamSource<Tuple2<String, List<UserVo>>> userStreamSource = env.addSource(new UserListSource());


MapStateDescriptor<String, List<UserVo>> descriptor =
new MapStateDescriptor<>(
"userStream",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<List<UserVo>>() {}));

BroadcastStream<Tuple2<String, List<UserVo>>> broadcastStream = userStreamSource.broadcast(descriptor);
// 将数据流和控制流进行连接,利用控制流中的数据来控制字符串的输出
BroadcastConnectedStream<ObjectNode, Tuple2<String, List<UserVo>>> tmp=streamSource.connect(broadcastStream);
tmp.process(new UserPvProcessor()).print();

env.execute("kafkaTest");
}


private static class UserPvProcessor extends BroadcastProcessFunction<ObjectNode, Tuple2<String, List<UserVo>>, String> {
private static final long serialVersionUID = 1L;
MapStateDescriptor<String, List<UserVo>> descriptor =
new MapStateDescriptor<>(
"userStream",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<List<UserVo>>() {}));

@Override
// 用户信息处理,当 broadcastStream 流有新数据时,更新 descriptor
public void processBroadcastElement(Tuple2<String, List<UserVo>> value, Context ctx, Collector<String> out) throws Exception {
// 将接收到的控制数据放到 broadcast state 中
ctx.getBroadcastState(descriptor).put(value.f0, value.f1);
// 打印控制信息
System.out.println(Thread.currentThread().getName() + " 接收到用户信息 : "+value.f0+" " + value.f1);
}

@Override
//数据流
public void processElement(ObjectNode element, ReadOnlyContext ctx, Collector<String> out) throws Exception {
// 从 broadcast state 中拿到用户列表信息
List<UserVo> userList = ctx.getBroadcastState(descriptor).get("userList");
String time=LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
if(userList!=null&&userList.size()>0) {

Map<String,String> userMap=new HashMap<>();
for(UserVo vo:userList) {
userMap.put(vo.getUserid(), vo.getUserName());
}
// System.out.println(userMap);
JsonNode value = element.get("value");
String userid=value.get("user").asText();
String userName=userMap.get(userid);
if (StringUtils.isNotBlank(userName)) {
out.collect(Thread.currentThread().getName()+"存在用户"+userid+" "+userName +" "+time);
}else {
out.collect(Thread.currentThread().getName()+"不存在用户"+userid+" "+time );
}
}else {
out.collect(Thread.currentThread().getName()+"不存在用户"+element.get("value")+" "+time );
}
}
}
}

总结下来,使用 Broadcast State 需要进行下面三步:

  1. 接收一个普通数据流,并使用 broadcast() 方法将其转换为 BroadcastStream,因为 Broadcast State 目前只支持 Key-Value 结构,需要使用 MapStateDescriptor 描述它的数据结构。
  2. BroadcastStream 与一个 DataStreamKeyedStream 使用 connect() 方法连接到一起。
  3. 实现一个 ProcessFunction,如果主流是 DataStream,则需要实现 BroadcastProcessFunction;如果主流是 KeyedStream,则需要实现 KeyedBroadcastProcessFunction。这两种函数都提供了时间和状态的访问方法。

KeyedBroadcastProcessFunction 个函数类中,有两个函数需要实现:

  • processElement:处理主数据流(非Broadcast流)中的每条元素,输出零到多个数据。ReadOnlyContext 可以获取时间和状态,但是只能以只读的形式读取Broadcast State,不能修改,以保证每个算子实例上的Broadcast State都是相同的。
  • processBroadcastElement:处理流入的广播流,可以输出零到多个数据,一般用来更新Broadcast State。

此外,在 KeyedBroadcastProcessFunction 中可以注册 Timer,并在 onTimer 方法中实现回调逻辑。本例中为了保持代码简洁,没有使用,一般可以用来清空状态,避免状态无限增长下去。

Broadcast 使用注意事项

  • 同一个 operator 的各个 task 之间没有通信,广播流侧(processBroadcastElement)可以能修改 broadcast state,而数据流侧(processElement)只能读 broadcast state.;
  • 需要保证所有 Operator task 对 broadcast state 的修改逻辑是相同的,否则会导致非预期的结果;
  • Operator tasks 之间收到的广播流元素的顺序可能不同:虽然所有元素最终都会下发给下游 tasks,但是元素到达的顺序可能不同,所以更新 state 时不能依赖元素到达的顺序;
  • 每个 task 对各自的 Broadcast state 都会做快照,防止热点问题;
  • 目前不支持 RocksDB 保存 Broadcast state:Broadcast state 目前只保存在内存中,需要为其预留合适的内存。

Reference


09-Flink 处理函数之广播流
https://flepeng.github.io/045-Flink-31-基础-09-Flink-处理函数之广播流/
作者
Lepeng
发布于
2021年3月8日
许可协议