Flink 使用 Broadcast State 实现流处理配置实时更新

Broadcast State 是 Flink 支持的一种 Operator State。使用 Broadcast State,可以在 Flink 程序的一个 Stream 中输入数据记录,然后将这些数据记录广播(Broadcast)到下游的每个 Task 中,使得这些数据记录能够为所有的 Task 所共享,比如一些用于配置的数据记录。这样,每个 Task 在处理其所对应的 Stream 中记录的时候,读取这些配置,来满足实际数据处理需要。
另外,在一定程度上,Broadcast State 能够使得 Flink Job 在运行过程中与外部的其他系统解耦合。比如,通常 Flink 会使用 YARN 来管理计算资源,使用 Broadcast State 就可以不用直接连接 MySQL 数据库读取相关配置信息了,也无需对 MySQL 做额外的授权操作。因为在一些场景下,会使用 Flink on YARN 部署模式,将 Flink Job 运行的资源申请和释放交给 YARN 去管理,那么就存在 Hadoop 集群节点扩缩容的问题,如新加节点可能需要对一些外部系统的访问,如 MySQL 等进行连接操作授权,如果忘记对 MysQL 访问授权,Flink Job 被调度到新增的某个新增节点上连接并读取 MySQL 配置信息就会出错。

Broadcast State API

通常,我们首先会创建一个 Keyed 或 Non-Keyed 的 Data Stream,然后再创建一个 Broadcasted Stream,最后通过 Data Stream 来连接(调用 connect 方法)到 Broadcasted Stream 上,这样实现将 Broadcast State 广播到 Data Stream 下游的每个 Task 中。
如果 Data Stream 是 Keyed Stream,则连接到 Broadcasted Stream 后,添加处理 ProcessFunction 时需要使用 KeyedBroadcastProcessFunction 来实现,下面是 KeyedBroadcastProcessFunction 的 API,代码如下所示:

public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
    public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
    public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
}

上面泛型中的各个参数的含义,说明如下:

  • KS:表示 Flink 程序从最上游的 Source Operator 开始构建 Stream,当调用 keyBy 时所依赖的 Key 的类型;
  • IN1:表示非 Broadcast 的 Data Stream 中的数据记录的类型;
  • IN2:表示 Broadcast Stream 中的数据记录的类型;
  • OUT:表示经过 KeyedBroadcastProcessFunction 的 processElement() 和p rocessBroadcastElement() 方法处理后输出结果数据记录的类型。

如果 Data Stream 是 Non-Keyed Stream,则连接到 Broadcasted Stream 后,添加处理 ProcessFunction 时需要使用 BroadcastProcessFunction 来实现,下面是 BroadcastProcessFunction 的 API,代码如下所示:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
    public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
    public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
}

上面泛型中的各个参数的含义,与前面 KeyedBroadcastProcessFunction 的泛型类型中的后 3 个含义相同,只是没有调用 keyBy 操作对原始 Stream 进行分区操作,就不需要 KS 泛型参数。
具体如何使用上面的 BroadcastProcessFunction,接下来我们会在通过实际编程,来以使用 KeyedBroadcastProcessFunction 为例进行详细说明。

使用场景实践

用户购物路径长度跟踪场景描述

我们先描述一下使用 Broadcast State 的场景:
针对用户在手机 App 上操作行为的事件,通过跟踪用户操作来实时触发指定的操作。假设我们关注一个用户在 App 上经过多次操作之后,比如浏览了几个商品、将浏览过的商品加入购物车、将购物车中的商品移除购物车等等,最后发生了购买行为,那么对于用户从开始到最终达成购买所进行操作的行为的次数,我们定义为用户购物路径长度,通过这个概念假设可以通过推送优惠折扣权限、或者适时地提醒用户使用 App 等运营活动,能够提高用户的复购率,这个是我们要达成的目标。
事件均以指定的格式被实时收集上来,我们统一使用 JSON 格式表示,例如,一个用户在 App 上操作行为我们定义有如下几种:

  • VIEW_PRODUCT
  • ADD_TO_CART
  • REMOVE_FROM_CART
  • PURCHASE

可以很容易根据上面的事件类型定义,理解每种类型的含义。用户在最终达成下单购买操作过程中,会经过一系列操作:VIEW_PRODUCT、ADD_TO_CART、REMOVE_FROM_CART 的不同组合,每个也可以重复操作多次,最终发生购买类型 PURCHASE 的行为,然后我们对该用户计算其购物路径长度,通过计算该度量来为外部业务系统提供运营或分析活动的基础数据,外部系统可以基于该数据对用户进行各种运营活动。
例如,下面是几个示例事件的记录,如下所示:

{"userId":"d8f3368aba5df27a39cbcfd36ce8084f","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:27:11","data":{"productId":196}}
{"userId":"d8f3368aba5df27a39cbcfd36ce8084f","channel":"APP","eventType":"ADD_TO_CART","eventTime":"2018-06-12_09:43:18","data":{"productId":126}}
{"userId":"d8f3368aba5df27a39cbcfd36ce8084f","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:27:11","data":{"productId":126}}
{"userId":"d8f3368aba5df27a39cbcfd36ce8084f","channel":"APP","eventType":"PURCHASE","eventTime":"2018-06-12_09:30:28","data":{"productId":196,"price":600.00,"amount":600.00}}

另外,因为 App 注册用户很多,不可能所有的用户发生的购物行为路径都能满足特定条件,假设对于购物路径长度很短的,很可能该用户使用 App 时目的性很强,很快就下单购买,对于这类用户我们暂时先不想对他们做任何运营活动,所以进行流数据处理时需要输入对应的路径长度的配置值,来限制这种情况。而且,随着时间的推移,该值可能会根据实际业务需要而发生变化,我们希望整个 Flink 计算程序能够动态获取并更新对应的配置值,配置字符串也是 JSON 格式,示例如下:

{"channel":"APP","registerDate":"2018-01-01","historyPurchaseTimes":0,"maxPurchasePathLength":3}

这时,使用Flink提供的 Broadcast State 特性就非常方便。另外,我们可以假设存在多个不同的渠道,这里只会以 APP 渠道为例进行说明实践。
假设满足大于配置的最大购物路径长度的用户,我们计算出该用户购物的路径长度,同时将其输出到另一个指定的 Kafka Topic 中,以便其它系统消费该 Topic,从而对这些用户进行个性化运营。例如,计算得到的结果格式,除了一个购物路径长度外,还分别统计了达成购买过程中各个操作行为的个数,JSON 格式字符串如下所示:

{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","purchasePathLength":9,"eventTypeCounts":{"ADD_TO_CART":1,"PURCHASE":1,"VIEW_PRODUCT":7}}

后续外部系统如何使用该结果数据,我们暂时不去过多考虑。

基本设计

基于上面描述的使用场景,为了直观表达系统的技术架构、基本组件和数据处理流程,基本设计如下图所示:
Flink-Broadcase-State-Processing
如上图所示,正是我们计划实现流处理流程,对应的核心要点,描述如下:

  • 用户操作行为事件实时写入到 Kafka 的 Topic 中,通过 input-event-topic 参数指定。
  • 基于 input-event-topic 参数指定的 Topic,创建一个 Flink Source Operator,名称为 kafkaUserEventSource。
  • 基于 kafkaUserEventSource 创建一个 Data Stream,名称为 customerUserEventStream。
  • 渠道配置信息,根据实际业务需要更新,并实时写入到Kafka的 Topic 中,通过 input-config-topic 参数指定。
  • 基于 input-config-topic 参数指定的 Topic,创建一个 Flink Source Operator,名称为 kafkaConfigEventSource。
  • 基于 kafkaConfigEventSource 创建一个 Broadcast Stream,名称为 configBroadcastStream。
  • 将上述创建的两个 Stream,通过 customerUserEventStream 连接到 configBroadcastStream,得到新的 connectedStream。
  • 基于 connectedStream 设置 ProcessFunction 实现,来处理新的 Stream 中的数据记录,可以在每个Task中基于获取到统一的配置信息,进而处理用户事件。
  • 将处理结果发送到 Flink Sink Operator,名称为 kafkaSink。
  • kafkaSink 将处理的结果,保存到 Kafka 的 Topic 中,通过 output-topic 指定 Topic 名称。

另外,在 Flink Job 中开启 Checkpoint 功能,每隔 1 小时对 Flink Job 中的状态进行 Checkpointing,以保证流处理过程发生故障后,也能够恢复。

实现 Flink Job 主流程处理

我们把输入的用户操作行为事件,实时存储到 Kafka 的一个 Topic 中,对于相关的配置也使用一个 Kafka Topic 来存储,这样就会构建了 2 个 Stream:一个是普通的 Stream,用来处理用户行为事件;另一个是 Broadcast Stream,用来处理并更新配置信息。计算得到的最终结果,会保存到另一个 Kafka 的 Topic 中,供外部其他系统消费处理以支撑运营或分析活动。

  • 输入参数

Flink 程序的输入参数格式,代码如下所示:

    LOG.info("Input args: " + Arrays.asList(args));
    // parse input arguments
    final ParameterTool parameterTool = ParameterTool.fromArgs(args);
    if (parameterTool.getNumberOfParameters() < 5) {
      System.out.println("Missing parameters!\n" +
          "Usage: Kafka --input-event-topic <topic> --input-config-topic <topic> --output-topic <topic> " +
          "--bootstrap.servers <kafka brokers> " +
          "--zookeeper.connect <zk quorum> --group.id <some id>");
      return;
    }

其中对应上面描述的 3 个 Topic,配置 Key 分别为 input-event-topic、input-config-topic、output-topic,另外还有与 Kafka 集群建立连接所必需 bootstrap.servers 和 zookeeper.connect 这 2 个参数,具体含义不再详述,可以参考其他文档。

  • 配置 Flink 环境

需要对 Flink 的相关运行配置进行设置,包括 Checkpoint 相关配置,代码如下所示:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new FsStateBackend(
        "hdfs://namenode01.td.com/flink-checkpoints/customer-purchase-behavior-tracker"));
    CheckpointConfig config = env.getCheckpointConfig();
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    config.setCheckpointInterval(1 * 60 * 60 * 1000);
    env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

指定 Flink Job 运行过程中开启 Checkpoint 功能,并且 Checkpoint 数据存储到指定的 HDFS 路径 hdfs://namenode01.td.com/flink-checkpoints/customer-purchase-behavior-tracker 下面,并且 Checkpoint 时间间隔为 1 小时。Flink 处理过程中使用的 TimeCharacteristic,我们使用了 TimeCharacteristic.EventTime,也就是根据事件本身自带的时间来进行处理,用来生成 Watermark 时间戳,对应生成 Watermark 的实现我们使用了 BoundedOutOfOrdernessTimestampExtractor,即设置一个容忍事件乱序的最大时间长度,实现代码如下所示:

  private static class CustomWatermarkExtractor extends BoundedOutOfOrdernessTimestampExtractor<UserEvent> {

    public CustomWatermarkExtractor(Time maxOutOfOrderness) {
      super(maxOutOfOrderness);
    }
    @Override
    public long extractTimestamp(UserEvent element) {
      return element.getEventTimestamp();
    }

  }
  • 创建用户行为事件Stream

创建一个用来处理用户在 App 上操作行为事件的 Stream,并且使用 map 进行转换,使用 keyBy 来对 Stream 进行分区,实现代码如下所示:

    // create customer user event stream
    final FlinkKafkaConsumer010 kafkaUserEventSource = new FlinkKafkaConsumer010<>(
        parameterTool.getRequired("input-event-topic"),
        new SimpleStringSchema(), parameterTool.getProperties());

    // (userEvent, userId)
    final KeyedStream<UserEvent, String> customerUserEventStream = env
        .addSource(kafkaUserEventSource)
        .map(new MapFunction<String, UserEvent>() {
          @Override
          public UserEvent map(String s) throws Exception {
            return UserEvent.buildEvent(s);
          }
        })
        .assignTimestampsAndWatermarks(new CustomWatermarkExtractor(Time.hours(24)))
        .keyBy(new KeySelector<UserEvent, String>() {
          @Override
          public String getKey(UserEvent userEvent) throws Exception {
            return userEvent.getUserId();
          }
        });

上面从 Kafka 的 Topic 中读取的事件都是 JSON 格式字符串,我们调用 map 将其转换成 UserEvent 对象,继续调用 assignTimestampsAndWatermarks() 方法设置 Watermark,调用 keyBy() 方法设置根据用户 ID(userId)来对 Stream 中的数据记录进行分区,即属于同一个用户的操作行为事件会发送到同一个下游的 Task 中进行处理,这样可以在 Task 中完整地保存某个用户相关的状态信息,从而等到 PURCHASE 类型的购物操作事件到达后进行一次计算,如果满足配置条件则处理缓存的事件并输出最终结果。

  • 创建配置事件Stream

创建一个用来动态读取 Kafka Topic 中配置的 Broadcast Stream,它是基于 Flink 的 Broadcast State 特性,实现代码如下所示:

    // create dynamic configuration event stream
    final FlinkKafkaConsumer010 kafkaConfigEventSource = new FlinkKafkaConsumer010<>(
        parameterTool.getRequired("input-config-topic"),
        new SimpleStringSchema(), parameterTool.getProperties());

    final BroadcastStream<Config> configBroadcastStream = env
        .addSource(kafkaConfigEventSource)
        .map(new MapFunction<String, Config>() {
          @Override
          public Config map(String value) throws Exception {
            return Config.buildConfig(value);
          }
        })
        .broadcast(configStateDescriptor);

上面代码中,最后一行调用了 broadcast() 方法,用来指定要广播的状态变量,它在 Flink 程序运行时会发送到下游每个 Task 中,供 Task 读取并使用对应配置信息,下游 Task 可以根据该状态变量就可以获取到对应的配置值。参数值 configStateDescriptor 是一个 MapStateDescriptor 类型的对象,定义并初始化,代码如下所示:

  private static final MapStateDescriptor<String, Config> configStateDescriptor =
      new MapStateDescriptor<>(
          "configBroadcastState",
          BasicTypeInfo.STRING_TYPE_INFO,
          TypeInformation.of(new TypeHint<Config>() {}));

它使用事件中的 channel(渠道)字段作为 Key,也就是不同渠道对应的配置是不同的,实现了对渠道配置的灵活性。而对应的 Value 则是我们定义的 Config,该类中定了如下几个属性:

  private String channel;
  private String registerDate;
  private int historyPurchaseTimes;
  private int maxPurchasePathLength;

具体含义可以从属性名称命名得知,广播后下游的每个 Task 都可以读取到这些配置属性值。

  • 连接两个Stream并实现计算处理

我们需要把最终的计算结果保存到一个输出的 Kafka Topic 中,所以先创建一个 FlinkKafkaProducer010,代码如下所示:

    final FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010<>(
        parameterTool.getRequired("output-topic"),
        new EvaluatedResultSchema(),
        parameterTool.getProperties());

然后,再调用customerUserEventStream的connect()方法连接到configBroadcastStream,从而获取到configBroadcastStream中对应的配置信息,进而处理实际业务逻辑,代码如下所示:

    // connect above 2 streams
    DataStream<EvaluatedResult> connectedStream = customerUserEventStream
        .connect(configBroadcastStream)
        .process(new ConnectedBroadcastProcessFuntion());
    connectedStream.addSink(kafkaProducer);
    env.execute("UserPurchaseBehaviorTracker");

用户操作行为事件 Stream 调用 connect() 方法,参数是 Broadcast Stream,就可以生成一个新的 BroadcastConnectedStream 类型的 Stream,再调用 process() 方法,并增加对该 Stream 中数据记录处理的逻辑。

对BroadcastConnectedStream进行处理

BroadcastConnectedStream 调用 process() 方法,参数类型为 KeyedBroadcastProcessFunction 或者 BroadcastProcessFunction,我们这里实现类为 ConnectedBroadcastProcessFuntion,它继承自 KeyedBroadcastProcessFunction 抽象类。通过前面 Broadcast State API 部分,我们已经了解到,需要实现 processBroadcastElement() 和 processElement() 这两个处理方法,一个是处理 Broadcast Stream,另一个是处理用户操作行为事件 Stream。我们首先在 ConnectedBroadcastProcessFuntion 中定义了一个用来存储用户操作行为事件的状态变量,代码如下:

    // (channel, Map<uid, UserEventContainer>)
    private final MapStateDescriptor<String, Map<String, UserEventContainer>> userMapStateDesc =
        new MapStateDescriptor<>(
            "userEventContainerState",
            BasicTypeInfo.STRING_TYPE_INFO,
            new MapTypeInfo<>(String.class, UserEventContainer.class));

上面代码中,userMapStateDesc 是一个 Map 结构,Key 是渠道(channel),Value 又是一个包含用户 ID(userId)和 UserEventContainer 的 Map 结构。UserEventContainer 内部封装了一个 List,用来保存属于同一个用户的 UserEvent 列表。
我们先看一下 processBroadcastElement() 方法实现,代码如下所示:

    @Override
    public void processBroadcastElement(Config value, Context ctx, Collector<EvaluatedResult> out)
        throws Exception {
      String channel = value.getChannel();
      BroadcastState<String, Config> state = ctx.getBroadcastState(configStateDescriptor);
      final Config oldConfig = ctx.getBroadcastState(configStateDescriptor).get(channel);
      if(state.contains(channel)) {
        LOG.info("Configured channel exists: channel=" + channel);
        LOG.info("Config detail: oldConfig=" + oldConfig + ", newConfig=" + value);
      } else {
        LOG.info("Config detail: defaultConfig=" + defaultConfig + ", newConfig=" + value);
      }
      // update config value for configKey
      state.put(channel, value);
    }

通过调用 ctx.getBroadcastState(configStateDescriptor),根据上面定义的 MapStateDescriptor 可以获取到对应的 BroadcastState,其中包括渠道(channel)和 Config 对象。上面实现逻辑包含了,如果更新对应配置变更的操作,更新后的配置信息会存储到 BroadcastState 中,它其实就是一个 Map 结构,通过 Key 就可以获取到对应最新的配置 Value(这里 Key 是渠道,Value 是 Config 对象)。
再看一下 processElement() 方法的实现,它的实现才是业务处理最核心的部分,代码如下所示:

    @Override
    public void processElement(UserEvent value, ReadOnlyContext ctx,
        Collector<EvaluatedResult> out) throws Exception {
      String userId = value.getUserId();
      String channel = value.getChannel();

      EventType eventType = EventType.valueOf(value.getEventType());
      Config config = ctx.getBroadcastState(configStateDescriptor).get(channel);
      LOG.info("Read config: channel=" + channel + ", config=" + config);
      if (Objects.isNull(config)) {
        config = defaultConfig;
      }

      final MapState<String, Map<String, UserEventContainer>> state =
          getRuntimeContext().getMapState(userMapStateDesc);

      // collect per-user events to the user map state
      Map<String, UserEventContainer> userEventContainerMap = state.get(channel);
      if (Objects.isNull(userEventContainerMap)) {
        userEventContainerMap = Maps.newHashMap();
        state.put(channel, userEventContainerMap);
      }
      if (!userEventContainerMap.containsKey(userId)) {
        UserEventContainer container = new UserEventContainer();
        container.setUserId(userId);
        userEventContainerMap.put(userId, container);
      }
      userEventContainerMap.get(userId).getUserEvents().add(value);

      // check whether a user purchase event arrives
      // if true, then compute the purchase path length, and prepare to trigger predefined actions
      if (eventType == EventType.PURCHASE) {
        LOG.info("Receive a purchase event: " + value);
        Optional<EvaluatedResult> result = compute(config, userEventContainerMap.get(userId));
        result.ifPresent(r -> out.collect(result.get()));
        // clear evaluated user's events
        state.get(channel).remove(userId);
      }
    }

通过调用 ctx.getBroadcastState(configStateDescriptor).get(channel) 就获取到了 Broadcast Stream 中某个渠道最新的配置 Config 对象,然后就可以在处理事件过程中使用该配置信息。配置信息一旦变更,这里面也会实时地获取到由 processBroadcastElement() 方法处理并更新的配置值。到达的每个用户的操作行为事件,会首先保存到 userMapStateDesc 这个 MapStateDescriptor 类型的状态变量中,不断累积缓存,一旦该用户的一个 PURCHASE 类型的购物事件到达,则调用 compute() 方法计算结果数据,最后结果数据 EvaluatedResult 会被输出到 Sink Operator 对应的 Task 中,保存到 Kafka Topic 中。上面代码中调用了 compute() 方法,具体实现如下所示:

    private Optional<EvaluatedResult> compute(Config config, UserEventContainer container) {
      Optional<EvaluatedResult> result = Optional.empty();
      String channel = config.getChannel();
      int historyPurchaseTimes = config.getHistoryPurchaseTimes();
      int maxPurchasePathLength = config.getMaxPurchasePathLength();

      int purchasePathLen = container.getUserEvents().size();
      if (historyPurchaseTimes < 10 && purchasePathLen > maxPurchasePathLength) {
        // sort by event time
        container.getUserEvents().sort(Comparator.comparingLong(UserEvent::getEventTimestamp));

        final Map<String, Integer> stat = Maps.newHashMap();
        container.getUserEvents()
            .stream()
            .collect(Collectors.groupingBy(UserEvent::getEventType))
            .forEach((eventType, events) -> stat.put(eventType, events.size()));

        final EvaluatedResult evaluatedResult = new EvaluatedResult();
        evaluatedResult.setUserId(container.getUserId());
        evaluatedResult.setChannel(channel);
        evaluatedResult.setEventTypeCounts(stat);
        evaluatedResult.setPurchasePathLength(purchasePathLen);
        LOG.info("Evaluated result: " + evaluatedResult.toJSONString());
        result = Optional.of(evaluatedResult);
      }
      return result;
    }

上面代码使用配置对象 Config 来判断是否需要输出该用户对应的计算结果,如果是,则计算购物路径长度,并统计该用户操作行为事件类型的个数。

提交运行 Flink Job

我们需要创建对应 Topic,创建命令参考如下:

./bin/kafka-topics.sh --zookeeper 172.23.4.138:2181,172.23.4.139:2181,172.23.4.140:2181/kafka --create --topic user_events --replication-factor 1 --partitions 1
./bin/kafka-topics.sh --zookeeper 172.23.4.138:2181,172.23.4.139:2181,172.23.4.140:2181/kafka --create --topic app_config --replication-factor 1 --partitions 1
./bin/kafka-topics.sh --zookeeper 172.23.4.138:2181,172.23.4.139:2181,172.23.4.140:2181/kafka --create --topic action_result --replication-factor 1 --partitions 1

上面程序开发完成后,需要进行编译打包,并提交 Flink Job 到 Flink 集群,执行如下命令:

bin/flink run -d -c org.shirdrn.flink.broadcaststate.UserPurchaseBehaviorTracker ~/flink-app-jobs.jar --input-event-topic user_events --input-config-topic app_config --output-topic action_result --bootstrap.servers 172.23.4.138:9092 --zookeeper.connect zk01.td.com:2181,zk02.td.com:2181,zk03.td.com:2181/kafka --group.id customer-purchase-behavior-tracker

Flink Job 正常运行后,我们向 Kafka 的 app_config 这个 Topic 中模拟发送配置事件记录:

./bin/kafka-console-producer.sh --topic app_config --broker-list 172.23.4.138:9092

输入如下配置事件 JSON 字符串:

{"channel":"APP","registerDate":"2018-01-01","historyPurchaseTimes":0,"maxPurchasePathLength":6}

向 Kafka 的 user_events 这个 Topic 中模拟发送用户操作行为事件记录:

./bin/kafka-console-producer.sh --topic app_config --broker-list 172.23.4.138:9092

分别依次输入如下用户的每个操作行为事件 JSON 字符串:

{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_08:45:24","data":{"productId":126}}
{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_08:57:32","data":{"productId":273}}
{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:21:08","data":{"productId":126}}
{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:21:49","data":{"productId":103}}
{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:21:59","data":{"productId":157}}
{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:27:11","data":{"productId":126}}
{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"ADD_TO_CART","eventTime":"2018-06-12_09:43:18","data":{"productId":126}}
{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:27:11","data":{"productId":126}}
{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"PURCHASE","eventTime":"2018-06-12_09:30:28","data":{"productId":126,"price":299.00,"amount":260.00}}

可以看到,输入每个事件,都会在 Task 中接收到并处理。在输入到最后一个用户购买事件时,触发了计算并输出结果,可以在另一个输出 Kafka Topic action_result 中看到结果,如下所示:

{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","purchasePathLength":9,"eventTypeCounts":{"ADD_TO_CART":1,"PURCHASE":1,"VIEW_PRODUCT":7}}

如果我们将前面的配置内容,再改成如下内容:

{"channel":"APP","registerDate":"2018-01-01","historyPurchaseTimes":0,"maxPurchasePathLength":20}

同样输入上述用户操作行为事件记录,由于 maxPurchasePathLength=20,所以没有触发对应结果计算和输出,因为用户的 purchasePathLength=9,可见配置动态变更生效。

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

评论(22): “Flink 使用 Broadcast State 实现流处理配置实时更新

  1. 博主厉害,正好需要用到flink的配置更新方面的知识。

    还想请教一下关于用户定义状态保存方面的文章。
    比如下面两种场景:
    A. 用户定义2个状态:一个状态统计最近5分钟的数据,另一个状态统计从当前时刻向前1天的状态数据,而这些状态数据相当于是将1天内所有5分钟状态数据的汇总,不知道是否能够实现?
    B. 用户定义状态在flink集群重启时,如何重新读取回来,使用BackendState可以不?

    • A、你这个统计5分钟数据的需求,是不是要使用Tumbling Window就可以实现。每5分钟统计的结果,你不打算写到外部存储系统中去吗,在外部去做1天内的汇总会不会更好呢。
      B、用Checkpoint或Savepoint,可以保存并恢复状态的。

      • 多谢博主,A应该没啥问题。

        B问题中不打算使用checkpoint的原因是数据存储在mysql数据库的,而checkpoint机制对mysql不友好。比如说某个id1和ip1的对应关系写入到mysql了,checkpoint回去数据的时候,正好id1和ip1对应关系那会儿还没生成,那么从checkpoint那会儿重新处理流数据时,生成了新的id2和ip1对应关系,这个时候ip1对应到了id2,就感觉前后数据不一致了,不知道您有没有遇到过类似的问题?

  2. 博主好,需要实时统计最近10分钟的购买情况。购买情况也是可配置的。就是需要多使用一个时间窗口功能,要如何实现呢?谢谢。

  3. 博主好,非常感谢提供实时流配置的解决方案,帮了大忙。
    有一个小疑问,文中的例子是不是没有窗口的设置呢?那么wartermark在这种情况下起到什么作用呢?

  4. 广播状态是否可以在做Checkpoint的时候保存,失败重启的时候恢复。看博主的代码广播状态是没有失败恢复的呀。

  5. 博主你好,为啥我仿照你的这个例子中的userMapStateDesc写的一个mapstate不起作用,每次只能取出最新值而不能进行缓存数据呢?

    • 仔细看下,是不是在处理是从 userMapStateDesc 中取出,而没有正确放进去。这个结构有一点点复杂:MapStateDescriptor<String, Map>,不注意可能会放错。

  6. 博主你好,比如有个动态监控规则:统计每1分钟,用户支付的订单金额超过 100 元 的用户要告警。但是规则是动态的,有可能需要动态增加类似的规则,或者 1分钟,100元 的数值是可以根据需要动态变化的。用 broadcast 要怎么实现呢?

    • 我查阅了一些网站,说可以用 自定义 window 或者 自定义 state,但是我一点头绪都没有

    • 你说的动态规则,和我这里说的动态配置是一样的呀,规则在外Flink程序外部系统维护,一旦发生变更,就通过BroadcastStream去更新Flink Streaming Job实例内存中hold的规则数据结构,从而是规则生效。

  7. 我需要统计一分钟内的数据做监控报警 想通过broadcast来广播报警规则,这个规则我是想在ProcessWindowFunction 中使用这个规则 找了一圈没有找打相关方法,想问问大神这个方案是否可行,如果可行大致的实现思路麻烦指点下

    • 你可以使用 Flink 的 Window 功能来实现,不过在 ProcessWindowFunction 可能拿不到一些你想要的其他配置信息。

  8. 顺便咨询一下博主,我考虑在processfunction中注册定时器,来替代timewindow;connect上broadcaststream从而实现类动态timewindow。在你看来,这个思路是否可行呢?

郑威进行回复 取消回复

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>