百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Flink 高级应用模式之应用模式的动态更新

csdh11 2024-11-30 14:14 4 浏览

文章来源:加米谷大数据

我们特意略过了关于如何初始化应用的规则,以及在运行时有哪些方法来更新这些规则的细节内容。在这篇文章中我们将具体介绍这些细节。你将学习如何将第一部分中描述的数据分区方法与动态配置结合起来使用。只要共同使用这两种模式,调整很多业务逻辑时就不用再重新编译代码和重新部署 Flink 作业了。

规则广播

首先我们来看一下先前定义的数据处理管道:

DataStream<Alert> alerts =
    transactions
        .process(new DynamicKeyFunction())
        .keyBy((keyed) -> keyed.getKey());
        .process(new DynamicAlertFunction())

DynamicKeyFunction 提供动态数据分区,而 DynamicAlertFunction 负责执行处理事务的主要逻辑,并根据已定义的规则发送警报消息。

本系列的第一篇文章简化了用例,并假定应用的规则集已预先初始化,可以通过 DynamicKeyFunction 中的 List访问。

public class DynamicKeyFunction
    extends ProcessFunction<Transaction, Keyed<Transaction, String, Integer>> {

  /* Simplified */
  List<Rule> rules = /* Rules that are initialized somehow.*/;
  ...
}

显然,在初始化阶段就可以直接在 Flink 作业的代码内部向这个列表添加规则(创建一个 List 对象,使用它的 add 方法)。这样做的主要缺点是每次修改规则后都需要重新编译作业。在现实的欺诈检测系统中规则会经常更改,因此从业务和运营角度来看,这种方法是不可接受的。我们需要另一种方式。

接下来是在上篇文章中引入的规则定义示例:

图 1:规则定义


上一篇文章提到,DynamicKeyFunction 使用 groupingKeyNames 来提取消息键。该规则第二部分中的参数由 DynamicAlertFunction 使用:它们定义所执行操作的实际逻辑及其参数(例如警报触发阈值)。这意味着在 DynamicKeyFunction 和 DynamicAlertFunction 中必须存在相同的规则。为了获得这个结果,我们将使用 Apache Flink 的数据分发广播机制。

下图展示了我们正在构建系统的最终作业图:

图 2:欺诈检测 Flink 作业的作业图


事务处理管道的主要模块有:

  • 事务源(Transaction Source),它并行消费来自 Kafka 分区的事务消息。
  • 动态键函数(Dynamic Key Function),使用一个动态键执行数据强化(enrichment)。后续的 keyBy 对这个动态键进行哈希处理,并在随后的运算符的所有并行实例之间对数据进行分区操作。
  • 动态警报函数 (Dynamic Alert Function),可生成一个数据窗口并基于该窗口创建警报。

Apache Flink 内部的数据交换

上面的作业图还指出了运算符之间的各种数据交换模式。为了解广播模式的工作机制,我们先走一小段弯路,讨论 Apache Flink 的分布式运行时中存在哪些消息传播方法。

  • 事务源之后的FORWARD连接意味着事务源运算符的一个并行实例消费的所有数据,都将精确传输到后续 DynamicKeyFunction 运算符的一个实例上。它还指出两个连接的运算符(在上述情况下为 12)并行度相同。此通信模式如图 3 所示。橙色圆圈表示事务,虚线矩形表示相联运算符的并行实例。

图 3:跨运算符实例传递的 FORWARD 消息


  • DynamicKeyFunction 和 DynamicAlertFunction 之间的HASH连接意味着,对于每个消息都将计算一个哈希码,并且消息将在下一个运算符的可用并行实例之间平均分配。需要使用 keyBy 从 Flink 显式“请求”这样的连接。

图 4:在运算符实例之间传递的哈希消息(通过keyBy)


  • REBALANCE分布是由对 rebalance() 的显式调用或并行度的更改(对于图 2 中的作业图而言,为 12->1)引起的。调用 rebalance() 会使数据以循环方式重新分区,并且在某些情况下可以帮助减轻数据偏斜。

图 5:跨运算符实例传递的 REBALANCE 消息


图 2 中的欺诈检测作业图包含一个附加数据源:规则源(Rules Source)。它还从 Kafka 消费。规则通过BROADCAST通道“混合”到主处理数据流中。在运算符之间传输数据的其他方法(例如 forward、hash 或 rebalance),会让每个消息只可在接收的运算符的并行实例之一中处理;相比之下,broadcast 会让每个消息在 broadcast stream 连接的运算符的所有并行实例的输入上可用。这使得 broadcast 方法适用于多种需要影响所有消息处理的任务,而无需考虑它们的键或源分区。

图 6:跨运算符实例传递的 BROADCAST 消息


注意:实际上 Flink 中有一些更特殊的数据分区方案,我们在这里没有提到。如果你想了解更多信息,请参阅 Flink有关流分区的文档。

广播状态模式

为了使用规则源,我们需要将其“连接”到主数据流:

// Streams setup
DataStream<Transaction> transactions = [...]
DataStream<Rule> rulesUpdateStream = [...]

BroadcastStream<Rule> rulesStream = rulesUpdateStream.broadcast(RULES_STATE_DESCRIPTOR);

// Processing pipeline setup
 DataStream<Alert> alerts =
     transactions
         .connect(rulesStream)
         .process(new DynamicKeyFunction())
         .keyBy((keyed) -> keyed.getKey())
         .connect(rulesStream)
         .process(new DynamicAlertFunction())

如你所见,可以调用 broadcast 方法并指定状态描述符,从任何常规流中创建广播流。Flink 假定在处理主数据流的事件时需要存储和检索广播的数据,因此总是从该状态描述符自动创建相应的广播状态(broadcast state)。这与其他的 Apache Flink 状态类型是不一样的,其他类型中你需要在处理函数的 open() 方法中对其进行初始化。另请注意,广播状态始终具有键值格式(MapState)。

public static final MapStateDescriptor<Integer, Rule> RULES_STATE_DESCRIPTOR =
        new MapStateDescriptor<>("rules", Integer.class, Rule.class);

连接到 rulesStream 会导致处理函数的签名发生某些变化。上一篇文章在这里做了一点简化,用的是 ProcessFunction。但是,DynamicKeyFunction 实际上是一个 BroadcastProcessFunction。

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> {

    public abstract void processElement(IN1 value,
                                        ReadOnlyContext ctx,
                                        Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value,
                                                 Context ctx,
                                                 Collector<OUT> out) throws Exception;

}

这里的区别在于增加了 processBroadcastElement 方法,规则流的消息将通过该方法到达。下面新版本的 DynamicKeyFunction 允许在运行时通过这个流,修改数据分配键的列表:

public class DynamicKeyFunction
    extends BroadcastProcessFunction<Transaction, Rule, Keyed<Transaction, String, Integer>> {


  @Override
  public void processBroadcastElement(Rule rule,
                                     Context ctx,
                                     Collector<Keyed<Transaction, String, Integer>> out) {
    BroadcastState<Integer, Rule> broadcastState = ctx.getBroadcastState(RULES_STATE_DESCRIPTOR);
    broadcastState.put(rule.getRuleId(), rule);
  }

  @Override
  public void processElement(Transaction event,
                           ReadOnlyContext ctx,
                           Collector<Keyed<Transaction, String, Integer>> out){
    ReadOnlyBroadcastState<Integer, Rule> rulesState =
                                  ctx.getBroadcastState(RULES_STATE_DESCRIPTOR);
    for (Map.Entry<Integer, Rule> entry : rulesState.immutableEntries()) {
        final Rule rule = entry.getValue();
        out.collect(
          new Keyed<>(
            event, KeysExtractor.getKey(rule.getGroupingKeyNames(), event), rule.getRuleId()));
    }
  }
}

在上面的代码中,processElement() 接收事务,而 processBroadcastElement() 接收规则更新。创建新规则后将按图 6 所示分配,并使用 processBroadcastState 将其保存在运算符的所有并行实例中。我们使用规则的 ID 作为存储和引用各个规则的键。我们不再迭代硬编码的 List,而是迭代动态更新的广播状态的条目。

在将规则存储在广播 MapState 中时,DynamicAlertFunction 遵循相同的逻辑。如第一部分中所述,processElement 输入中的每个消息会由一个特定规则处理,并通过 DynamicKeyFunction 带有相应 ID 的“预标记”。我们需要做的就是使用提供的 ID 从 BroadcastState 中检索相应规则的定义,并根据该规则所需的逻辑对其进行处理。在这一阶段,我们还将消息添加到内部函数状态,以便在所需的数据时间窗口上执行计算。我们将在欺诈检测系列的最后一篇文章中探讨如何做到这一点。

小结

本文,我们继续研究了使用 Apache Flink 构建的欺诈检测系统的用例。我们研究了在并行运算符实例之间分配数据的各种方式,而最重要的是探讨了广播状态。我们演示了如何通过广播状态模式提供的功能来配合和增强动态分区(本系列第一部分中介绍的一种模式)。在运行时发送动态更新的能力是 Apache Flink 的强大功能,适用于其他多种用例,例如控制状态(清除 / 插入 / 修复)、运行 A/B 实验或执行 ML 模型系数的更新等。

相关推荐

Micheal Nielsen&#39;s神经网络学习之二

依然是跟着MichaelNielsen的神经网络学习,基于前一篇的学习,已经大概明白了神经网络的基本结构和BP算法,也能通过神经网络训练数字识别功能,之后我试验了一下使用神经网络训练之前的文本分类,...

CocoaPods + XCTest进行单元测试 c单元测试工具

在使用XCTest进行单元测试时,我们经常会遇到一些CocoaPods中的开源框架的调用,比如“Realm”或“Alamofire”在测试的时候,如果配置不当,会导致“frameworknotfo...

Java基础知识回顾第四篇 java基础讲解

1、&和&&的区别作为逻辑运算符:&(不管左边是什么,右边都参与运算),&&(如果左边为false,右边则不参与运算,短路)另外&可作为位运算符...

项目中的流程及类似业务的设计模式总结

说到业务流程,可能是我做过的项目中涉及业务最多的一个方面了。除了在流程设计之外,在一些考核系统、产业审批、还有很多地方,都用到相似的设计思路,在此一并总结一下。再说到模式,并不是因为流行才用这个词,而...

联想三款显示器首批获得 Eyesafe Certified 2.0 认证

IT之家7月31日消息,据外媒报道,三款全新联想显示器是全球首批满足EyesafeCertified2.0的设备。据报道,联想获得EyesafeCertified2.0认证的显...

maven的生命周期,插件介绍(二) 一个典型的maven构建生命周期

1.maven生命周期一个完整的项目构建过程通常包括清理、编译、测试、打包、集成测试、验证、部署等步骤,Maven从中抽取了一套完善的、易扩展的生命周期。Maven的生命周期是抽象的,其中的具体任务都...

多线程(3)-基于Object的线程等待与唤醒

概述在使用synchronized进行线程同步中介绍了依赖对象锁定线程,本篇文章介绍如何依赖对象协调线程。同synchronized悲观锁一样,线程本身不能等待与唤醒,也是需要对象才能完成等待与唤醒的...

jquery mobile + 百度地图 + phonegap 写的一个&quot;校园助手&quot;的app

1jquerymobile+百度地图+phonegap写的一个"校园助手"的app,使用的是基于Flat-UI的jQueryMobile,请参考:https://github.com/...

Apache 服务启动不了 apache系统服务启动不了

{我是新手,从未遇到此问题,请各位大大勿喷}事由:今天早上上班突然发现公司网站出现问题。经过排查,发现是Apache出现问题。首先检查配置文件没有出问题后,启动服务发现Apache服务能启动,但是没法...

健康债和技术债都不能欠 公众号: 我是攻城师(woshigcs)

在Solr4.4之后,Solr提供了SolrCloud分布式集群的模式,它带来的主要好处是:(1)大数据量下更高的性能(2)更好扩展性(3)更高的可靠性(4)更简单易用什么时候应该使用Sol...

Eye Experience怎么用?HTC告诉你 eyebeam怎么用

IT之家(www.ithome.com):EyeExperience怎么用?HTC告诉你HTC上周除了发布HTCDesireEYE自拍机和HTCRE管状运动相机之外,还发布了一系列新的智能手机...

Android系统应用隐藏和应用禁止卸载

1、应用隐藏与禁用Android设置中的应用管理器提供了一个功能,就是【应用停用】功能,这是针对某些系统应用的。当应用停用之后,应用的图标会被隐藏,但apk还是存在,不会删除,核心接口就是Packag...

计算机软件技术分享--赠人玫瑰,手遗余香

一、Netty介绍Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty...

Gecco爬虫框架的线程和队列模型 爬虫通用框架

简述爬虫在抓取一个页面后一般有两个任务,一个是解析页面内容,一个是将需要继续抓取的url放入队列继续抓取。因此,当爬取的网页很多的情况下,待抓取url的管理也是爬虫框架需要解决的问题。本文主要说的是g...

一点感悟(一) 初识 初读感知的意思

时间过得很快,在IT业已从业了两年多。人这一辈子到底需要什么,在路边看着人来人往,大部分人脸上都是很匆忙。上海真是一个魔都,它有魅力,有底蕴,但是一个外地人在这里扎根置业,真的是举全家之力,还贷3...