找回密码
 立即注册
首页 业界区 业界 Disruptor—3.核心源码实现分析

Disruptor—3.核心源码实现分析

材部 4 天前
大纲
1.Disruptor的生产者源码分析
2.Disruptor的消费者源码分析
3.Disruptor的WaitStrategy等待策略分析
4.Disruptor的高性能原因
5.Disruptor高性能之数据结构(内存预加载机制)
6.Disruptor高性能之内核(使用单线程写)
7.Disruptor高性能之系统内存优化(内存屏障)
8.Disruptor高性能之系统缓存优化(消除伪共享)
9.Disruptor高性能之序号获取优化(自旋 + CAS)
 
1.Disruptor的生产者源码分析
(1)通过Sequence序号发布消息
(2)通过Translator事件转换器发布消息
 
(1)通过Sequence序号发布消息
生产者可以先从RingBuffer中获取一个可用的Sequence序号,然后再根据该Sequence序号从RingBuffer的环形数组中获取对应的元素,接着对该元素进行赋值替换,最后调用RingBuffer的publish()方法设置当前生产者的Sequence序号来完成事件消息的发布。
  1. //注意:这里使用的版本是3.4.4
  2. //单生产者单消费者的使用示例
  3. public class Main {
  4.     public static void main(String[] args) {
  5.         //参数准备
  6.         OrderEventFactory orderEventFactory = new OrderEventFactory();
  7.         int ringBufferSize = 4;
  8.         ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  9.   
  10.         //参数一:eventFactory,消息(Event)工厂对象
  11.         //参数二:ringBufferSize,容器的长度
  12.         //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler
  13.         //参数四:ProducerType,单生产者还是多生产者
  14.         //参数五:waitStrategy,等待策略
  15.         //1.实例化Disruptor对象
  16.         Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
  17.             orderEventFactory,
  18.             ringBufferSize,
  19.             executor,
  20.             ProducerType.SINGLE,
  21.             new BlockingWaitStrategy()
  22.         );
  23.   
  24.         //2.添加Event处理器,用于处理事件
  25.         //也就是构建Disruptor与消费者的一个关联关系
  26.         disruptor.handleEventsWith(new OrderEventHandler());
  27.   
  28.         //3.启动Disruptor
  29.         disruptor.start();
  30.   
  31.         //4.获取实际存储数据的容器: RingBuffer
  32.         RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
  33.         OrderEventProducer producer = new OrderEventProducer(ringBuffer);
  34.         ByteBuffer bb = ByteBuffer.allocate(8);
  35.         for (long i = 0; i < 5; i++) {
  36.             bb.putLong(0, i);
  37.             //向容器中投递数据
  38.             producer.sendData(bb);
  39.         }
  40.         disruptor.shutdown();
  41.         executor.shutdown();
  42.     }
  43. }
  44. public class OrderEventProducer {
  45.     private RingBuffer<OrderEvent> ringBuffer;
  46.    
  47.     public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
  48.         this.ringBuffer = ringBuffer;
  49.     }
  50.    
  51.     public void sendData(ByteBuffer data) {
  52.         //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
  53.         long sequence = ringBuffer.next();
  54.         try {
  55.             //2.根据这个序号, 找到具体的"OrderEvent"元素
  56.             //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
  57.             OrderEvent event = ringBuffer.get(sequence);
  58.             //3.进行实际的赋值处理
  59.             event.setValue(data.getLong(0));
  60.         } finally {
  61.             //4.提交发布操作
  62.             ringBuffer.publish(sequence);
  63.         }
  64.     }
  65. }
  66. public class OrderEventHandler implements EventHandler<OrderEvent> {
  67.     public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
  68.         Thread.sleep(1000);
  69.         System.err.println("消费者: " + event.getValue());
  70.     }
  71. }
复制代码
  1. //多生产者多消费者的使用示例
  2. public class Main {
  3.     public static void main(String[] args) throws InterruptedException {
  4.         //1.创建RingBuffer
  5.         RingBuffer<Order> ringBuffer = RingBuffer.create(
  6.             ProducerType.MULTI,//多生产者
  7.             new EventFactory<Order>() {
  8.                 public Order newInstance() {
  9.                     return new Order();
  10.                 }
  11.             },
  12.             1024 * 1024,
  13.             new YieldingWaitStrategy()
  14.         );
  15.         //2.通过ringBuffer创建一个屏障
  16.         SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
  17.         //3.创建消费者数组,每个消费者Consumer都需要实现WorkHandler接口
  18.         Consumer[] consumers = new Consumer[10];
  19.         for (int i = 0; i < consumers.length; i++) {
  20.             consumers[i] = new Consumer("C" + i);
  21.         }
  22.         //4.构建多消费者工作池WorkerPool,因为多消费者模式下需要使用WorkerPool
  23.         WorkerPool<Order> workerPool = new WorkerPool<Order>(
  24.             ringBuffer,
  25.             sequenceBarrier,
  26.             new EventExceptionHandler(),
  27.             consumers
  28.         );
  29.         //5.设置多个消费者的sequence序号,用于单独统计每个消费者的消费进度, 并且设置到RingBuffer中
  30.         ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
  31.         //6.启动workerPool
  32.         workerPool.start(Executors.newFixedThreadPool(5));
  33.         final CountDownLatch latch = new CountDownLatch(1);
  34.         for (int i = 0; i < 100; i++) {
  35.             final Producer producer = new Producer(ringBuffer);
  36.             new Thread(new Runnable() {
  37.                 public void run() {
  38.                     try {
  39.                         latch.await();
  40.                     } catch (Exception e) {
  41.                         e.printStackTrace();
  42.                     }
  43.                     for (int j = 0; j < 100; j++) {
  44.                         producer.sendData(UUID.randomUUID().toString());
  45.                     }
  46.                 }
  47.             }).start();
  48.         }
  49.         Thread.sleep(2000);
  50.         System.err.println("----------线程创建完毕,开始生产数据----------");
  51.         latch.countDown();
  52.         Thread.sleep(10000);
  53.         System.err.println("任务总数:" + consumers[2].getCount());
  54.     }
  55. }
  56. public class Producer {
  57.     private RingBuffer<Order> ringBuffer;
  58.    
  59.     public Producer(RingBuffer<Order> ringBuffer) {
  60.         this.ringBuffer = ringBuffer;
  61.     }
  62.     public void sendData(String uuid) {
  63.         //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
  64.         long sequence = ringBuffer.next();
  65.         try {
  66.             //2.根据这个序号, 找到具体的"Order"元素
  67.             //注意:此时获取的Order对象是一个没有被赋值的"空对象"
  68.             Order order = ringBuffer.get(sequence);
  69.             //3.进行实际的赋值处理
  70.             order.setId(uuid);
  71.         } finally {
  72.             //4.提交发布操作
  73.             ringBuffer.publish(sequence);
  74.         }
  75.     }
  76. }
  77. public class Consumer implements WorkHandler<Order> {
  78.     private static AtomicInteger count = new AtomicInteger(0);
  79.     private String consumerId;
  80.     private Random random = new Random();
  81.     public Consumer(String consumerId) {
  82.         this.consumerId = consumerId;
  83.     }
  84.     public void onEvent(Order event) throws Exception {
  85.         Thread.sleep(1 * random.nextInt(5));
  86.         System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + event.getId());
  87.         count.incrementAndGet();
  88.     }
  89.     public int getCount() {
  90.         return count.get();
  91.     }
  92. }
复制代码
其中,RingBuffer的publish(sequence)方法会调用Sequencer接口的publish()方法来设置当前生产者的Sequence序号。
[code]abstract class RingBufferPad {    protected long p1, p2, p3, p4, p5, p6, p7;}abstract class RingBufferFields extends RingBufferPad {    ...    private static final Unsafe UNSAFE = Util.getUnsafe();    private final long indexMask;    //环形数组存储事件消息    private final Object[] entries;    protected final int bufferSize;    //RingBuffer的sequencer属性代表了当前线程对应的生产者    protected final Sequencer sequencer;        RingBufferFields(EventFactory eventFactory, Sequencer sequencer) {        this.sequencer = sequencer;        this.bufferSize = sequencer.getBufferSize();        if (bufferSize < 1) {            throw new IllegalArgumentException("bufferSize must not be less than 1");        }        if (Integer.bitCount(bufferSize) != 1) {            throw new IllegalArgumentException("bufferSize must be a power of 2");        }        this.indexMask = bufferSize - 1;        //初始化数组        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];        //内存预加载        fill(eventFactory);    }        private void fill(EventFactory eventFactory) {        for (int i = 0; i < bufferSize; i++) {            entries[BUFFER_PAD + i] = eventFactory.newInstance();        }    }        protected final E elementAt(long sequence) {        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask)
您需要登录后才可以回帖 登录 | 立即注册