百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Disruptor框架源码阅读-如何不重复消费

csdh11 2025-03-28 16:01 13 浏览

RingBuffer 如何保证数据不丢失

由于ringbuffer是一个环形的队列,那么生产者和消费者在遍历这个队列的时候,如何制衡呢?

1、生产快,消费慢,数据丢失?

生产者速度过快,导致一个对象还没消费完,就循环生产了一个新的对象要加入ringbuffer,导致消费不完整,造成数据丢失?

我们注意到,在我们获取生产者下一个位置的时候,是通过ringbuffer的next方法,而这个next方式是调用了sequencer的next方法

这个对象,在我们创建disruptor对象的时候,创建的

所以这个ringbuffer就是disruptor中的sequencer对象,那么在进行获取next的时候,这里是如何获取下一个的呢?是否会对这个生产获取下一个序列进行相应的等待策略,避免产生相应的干扰!!!

这个各位看官还需多看看里面的代码以及封装(特别是封装,真是九转十八弯),多熟悉,我这绕着绕着很容易就绕晕了,刚开始也是云里雾里。

EventProcessor接口概览

EventProcessor顾名思义,就是事件处理器(handle和process都可以翻译为“处理”,但是process侧重于机器的处理,而handle侧重于有人工的处理,所以使用handle表示用户逻辑的处理,使用process表示机器的处理),这个接口有两个实现类,分别是WorkProcessor和BatchEventProcessor,它们对应的逻辑处理消费者分别是EventHandler和WorkHandler。下面是EventProcessor的UML类图及EventHandler和EventProcessor的接口定义。



/**
* Callback interface to be implemented for processing events as they become available in the {@link RingBuffer}
*
* @param  event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
* 处理事件的回调接口
*/
public interface EventHandler
{
 /**
 * Called when a publisher has published an event to the {@link RingBuffer}
 *
 * @param event published to the {@link RingBuffer}
 * @param sequence of the event being processed
 * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
 * @throws Exception if the EventHandler would like the exception handled further up the chain.
 */
 void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
/**
* EventProcessors waitFor events to become available for consumption from the {@link RingBuffer}
* 

* An EventProcessor will generally be associated with a Thread for execution. * 事件执行器,等待RingBuffer有可用消费事件。一个事件处理器关联一个执行线程 */ public interface EventProcessor extends Runnable { /** * Get a reference to the {@link Sequence} being used by this {@link EventProcessor}. * * @return reference to the {@link Sequence} for this {@link EventProcessor} */ Sequence getSequence(); /** * Signal that this EventProcessor should stop when it has finished consuming at the next clean break. * It will call {@link SequenceBarrier#alert()} to notify the thread to check status. */ void halt(); boolean isRunning(); }

EventProcessor接口继承了Runnable接口,主要有两种实现:单线程批量处理BatchEventProcessor和多线程处理WorkProcessor

在使用Disruptor帮助类构建消费者时,使用handleEventsWith方法传入多个EventHandler,内部使用多个BatchEventProcessor关联多个线程执行。这种情况类似JMS中的发布订阅模式,同一事件会被多个消费者并行消费。适用于同一事件触发多种操作。

而使用Disruptor的
handleEventsWithWorkerPool传入多个WorkHandler时,内部使用多个WorkProcessor关联多个线程执行。这种情况类似JMS的
点对点模式,同一事件会被一组消费者其中之一消费。适用于提升消费者并行处理能力。

消费技术实现

我们先回顾下Disruptor消费者的两个特点:消费者依赖图(即下文所谓的“消费链”)和事件多播。

假设现在有A,B,C,D四个消费者,它们都能组成什么样的形式呢?从众多的排列组合中,我挑了4组比较有代表性的消费链形式。



image.png

  • 第1组中,消费者A消费按成后,B、C、D可同时消费;
  • 第2组中,消费者A、B、C、D顺序消费;
  • 第3组中,消费者A、B顺序消费后,C、D同时消费;
  • 第4组中,消费者A在消费完成后,B和C可以同时消费,但是必须在都消费完成后,D才能消费。

标号为1、3、4的消费链都使用了事件多播,可见事件多播属于消费链的一种组合形式。注意,在上面4种组合中,每个组合的每一水平行,都属于一个消费者组。

这些还只是较为简单的消费链组成,实际中消费链可能会更复杂。

那么在Disruptor内部是怎么实现消费链的呢?

我们可以先思考下。如果想把独立的消费者组成消费链,那么后方的消费者(组)必然要知道在它前方的消费者(组)的处理情况,否则就做不到顺序消费。同时,消费者也要了解生产者的位置,来判断是否有可用事件。之前我们分析生产者代码的时候,已经讲过,生产者为了不覆盖没有消费完全的事件,必须知道最慢消费者的处理情况

做到了这些才会有能力去控制消费者组成消费链。下面让我们具体看Disruptor中的实现。

单生产者,多消费者模式。多消费者对于消息不重复消费。

package liuqiang.complex.multi;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import liuqiang.complex.common.*;
import java.util.concurrent.Executors;
public class Main3 {
 //单生产者,多消费者模式。多消费者对于消息不重复消费。例如:1线程消费了消息0,则2线程只能从0后面的消息消费,不能对消息0进行消费。
 public static void main(String[] args) throws Exception {
 EventFactory factory = new OrderFactory();
 int ringBufferSize = 1024 * 1024;
 Disruptor disruptor =
 new Disruptor(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
 /*
 * 该方法传入的消费者需要实现WorkHandler接口,方法的内部实现是:先创建WorkPool,然后封装WorkPool为EventHandlerPool返回。
 * 消费者1、2对于消息的消费有时有竞争,保证同一消息只能有一个消费者消费
 */
 disruptor.handleEventsWithWorkerPool(new OrderHandler1("1"), new OrderHandler1("2"));
 disruptor.start();
 RingBuffer ringBuffer = disruptor.getRingBuffer();
 Producer producer = new Producer(ringBuffer);
 //单生产者,生产3条数据
 for (int l = 0; l < 3; l++) {
 producer.onData(l + "");
 }
 //为了保证消费者线程已经启动,留足足够的时间。具体原因详见另一篇博客:disruptor的shutdown失效问题
 Thread.sleep(1000);
 disruptor.shutdown();
 }
}

调用
handleEventsWithWorkerPool形成WorkerPool,并进一步封装成EventHandlerGroup。对于同一条消息,两消费者不重复消费。

可能输出结果如下:

OrderHandler1 1,消费信息:0

OrderHandler1 2,消费信息:1

OrderHandler1 1,消费信息:2

消费者可用序列屏障-SequenceBarrier

我们重点看一下SequenceBarrier,可直译为“序列屏障”。SequenceBarrier的主要作用是协调获取消费者可处理到的最大序号,内部持有着生产者和其依赖的消费者序列。它的接口定义如下。

public interface SequenceBarrier
{
 /**
 * Wait for the given sequence to be available for consumption.
* 等待指定序列可用 * @param sequence to wait for * @return the sequence up to which is available * @throws AlertException if a status change has occurred for the Disruptor * @throws InterruptedException if the thread needs awaking on a condition variable. * @throws TimeoutException * */ long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException; /** * Get the current cursor value that can be read.
* 获取当前可读游标值 * * @return value of the cursor for entries that have been published. * */ long getCursor(); /** * The current alert status for the barrier.
* 当前的alert状态 * * @return true if in alert otherwise false. */ boolean isAlerted(); /** * Alert the {@link EventProcessor}s of a status change and stay in this status until cleared.
* * 通知消费者状态变化。当调用EventProcessor#halt()将调用此方法。 */ void alert(); /** * Clear the current alert status.
* 清楚alert状态 */ void clearAlert(); /** * Check if an alert has been raised and throw an {@link AlertException} if it has. * 检查是否发生alert,发生将抛出异常 * @throws AlertException if alert has been raised. */ void checkAlert() throws AlertException; } SequenceBarrier实例引用被EventProcessor持有,用于等待并获取可用的消费事件,主要体现在waitFor这个方法。

要实现这个功能,需要3点条件:

  1. 知道生产者的位置。
  2. 因为Disruptor支持消费者链,在不同的消费者组之间,要保证后边的消 费者组只有在前消费者组中的消费者都处理完毕后,才能进行处理。
  3. 暂时没有事件可消费,在等待可用消费时,还需要使用某种等待策略进行等待。

看下SequenceBarrier实现类ProcessingSequenceBarrier的代码是如何实现waitFor方法。

final class ProcessingSequenceBarrier implements SequenceBarrier
{
 private final WaitStrategy waitStrategy; // 等待可用消费时,指定的等待策略
 private final Sequence dependentSequence; // 依赖的上组消费者的序号,如果当前为第一组则为cursorSequence(即生产者发布游标序列),否则使用FixedSequenceGroup封装上组消费者序列
 private volatile boolean alerted = false; // 当触发halt时,将标记alerted为true
 private final Sequence cursorSequence; // AbstractSequencer中的cursor引用,记录当前发布者发布的最新位置
 private final Sequencer sequencer; // MultiProducerSequencer 或 SingleProducerSequencer
 public ProcessingSequenceBarrier(
 final Sequencer sequencer,
 final WaitStrategy waitStrategy,
 final Sequence cursorSequence,
 final Sequence[] dependentSequences)
 {
 this.sequencer = sequencer;
 this.waitStrategy = waitStrategy;
 this.cursorSequence = cursorSequence;
 if (0 == dependentSequences.length) // 依赖的上一组序列长度,第一次是0
 {
 dependentSequence = cursorSequence;
 }
 else // 将上一组序列数组复制成新数组保存,引用不变
 {
 dependentSequence = new FixedSequenceGroup(dependentSequences);
 }
 }
 @Override
 public long waitFor(final long sequence)
 throws AlertException, InterruptedException, TimeoutException
 {
 // 检查是否停止服务
 checkAlert();
 // 获取最大可用序号 sequence为给定序号,一般为当前序号+1,cursorSequence记录生产者最新位置,
 long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
 if (availableSequence < sequence)
 {
 return availableSequence;
 }
 // 返回已发布最高的序列值,将对每个序号进行校验
 return sequencer.getHighestPublishedSequence(sequence, availableSequence);
 }
 // ... 
}

相关推荐

探索Java项目中日志系统最佳实践:从入门到精通

探索Java项目中日志系统最佳实践:从入门到精通在现代软件开发中,日志系统如同一位默默无闻却至关重要的管家,它记录了程序运行中的各种事件,为我们排查问题、监控性能和优化系统提供了宝贵的依据。在Java...

用了这么多年的java日志框架,你真的弄懂了吗?

在项目开发过程中,有一个必不可少的环节就是记录日志,相信只要是个程序员都用过,可是咱们自问下,用了这么多年的日志框架,你确定自己真弄懂了日志框架的来龙去脉嘛?下面笔者就详细聊聊java中常用日志框架的...

物理老师教你学Java语言(中篇)(物理专业学编程)

第四章物质的基本结构——类与对象...

一文搞定!Spring Boot3 定时任务操作全攻略

各位互联网大厂的后端开发小伙伴们,在使用SpringBoot3开发项目时,你是否遇到过定时任务实现的难题呢?比如任务调度时间不准确,代码报错却找不到方向,是不是特别头疼?如今,随着互联网业务规模...

你还不懂java的日志系统吗 ?(java的日志类)

一、背景在java的开发中,使用最多也绕不过去的一个话题就是日志,在程序中除了业务代码外,使用最多的就是打印日志。经常听到的这样一句话就是“打个日志调试下”,没错在日常的开发、调试过程中打印日志是常干...

谈谈枚举的新用法--java(java枚举的作用与好处)

问题的由来前段时间改游戏buff功能,干了一件愚蠢的事情,那就是把枚举和运算集合在一起,然后运行一段时间后buff就出现各种问题,我当时懵逼了!事情是这样的,做过游戏的都知道,buff,需要分类型,且...

你还不懂java的日志系统吗(javaw 日志)

一、背景在java的开发中,使用最多也绕不过去的一个话题就是日志,在程序中除了业务代码外,使用最多的就是打印日志。经常听到的这样一句话就是“打个日志调试下”,没错在日常的开发、调试过程中打印日志是常干...

Java 8之后的那些新特性(三):Java System Logger

去年12月份log4j日志框架的一个漏洞,给Java整个行业造成了非常大的影响。这个事情也顺带把log4j这个日志框架推到了争议的最前线。在Java领域,log4j可能相对比较流行。而在log4j之外...

Java开发中的日志管理:让程序“开口说话”

Java开发中的日志管理:让程序“开口说话”日志是程序员的朋友,也是程序的“嘴巴”。它能让程序在运行过程中“开口说话”,告诉我们它的状态、行为以及遇到的问题。在Java开发中,良好的日志管理不仅能帮助...

吊打面试官(十二)--Java语言中ArrayList类一文全掌握

导读...

OS X 效率启动器 Alfred 详解与使用技巧

问:为什么要在Mac上使用效率启动器类应用?答:在非特殊专业用户的环境下,(每天)用户一般可以在系统中进行上百次操作,可以是点击,也可以是拖拽,但这些只是过程,而我们的真正目的是想获得结果,也就是...

Java中 高级的异常处理(java中异常处理的两种方式)

介绍异常处理是软件开发的一个关键方面,尤其是在Java中,这种语言以其稳健性和平台独立性而闻名。正确的异常处理不仅可以防止应用程序崩溃,还有助于调试并向用户提供有意义的反馈。...

【性能调优】全方位教你定位慢SQL,方法介绍下!

1.使用数据库自带工具...

全面了解mysql锁机制(InnoDB)与问题排查

MySQL/InnoDB的加锁,一直是一个常见的话题。例如,数据库如果有高并发请求,如何保证数据完整性?产生死锁问题如何排查并解决?下面是不同锁等级的区别表级锁:开销小,加锁快;不会出现死锁;锁定粒度...

看懂这篇文章,你就懂了数据库死锁产生的场景和解决方法

一、什么是死锁加锁(Locking)是数据库在并发访问时保证数据一致性和完整性的主要机制。任何事务都需要获得相应对象上的锁才能访问数据,读取数据的事务通常只需要获得读锁(共享锁),修改数据的事务需要获...