大纲
1.Disruptor的生产者源码分析
2.Disruptor的消费者源码分析
3.Disruptor的WaitStrategy等待策略分析
4.Disruptor的高性能原因
5.Disruptor高性能之数据结构(内存预加载机制)
6.Disruptor高性能之内核(使用单线程写)
7.Disruptor高性能之系统内存优化(内存屏障)
8.Disruptor高性能之系统缓存优化(消除伪共享)
9.Disruptor高性能之序号获取优化(自旋 + CAS)
1.Disruptor的生产者源码分析
(1)通过Sequence序号发布消息
(2)通过Translator事件转换器发布消息
(1)通过Sequence序号发布消息
生产者可以先从RingBuffer中获取一个可用的Sequence序号,然后再根据该Sequence序号从RingBuffer的环形数组中获取对应的元素,接着对该元素进行赋值替换,最后调用RingBuffer的publish()方法设置当前生产者的Sequence序号来完成事件消息的发布。- //注意:这里使用的版本是3.4.4
- //单生产者单消费者的使用示例
- public class Main {
- public static void main(String[] args) {
- //参数准备
- OrderEventFactory orderEventFactory = new OrderEventFactory();
- int ringBufferSize = 4;
- ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
-
- //参数一:eventFactory,消息(Event)工厂对象
- //参数二:ringBufferSize,容器的长度
- //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler
- //参数四:ProducerType,单生产者还是多生产者
- //参数五:waitStrategy,等待策略
- //1.实例化Disruptor对象
- Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
- orderEventFactory,
- ringBufferSize,
- executor,
- ProducerType.SINGLE,
- new BlockingWaitStrategy()
- );
-
- //2.添加Event处理器,用于处理事件
- //也就是构建Disruptor与消费者的一个关联关系
- disruptor.handleEventsWith(new OrderEventHandler());
-
- //3.启动Disruptor
- disruptor.start();
-
- //4.获取实际存储数据的容器: RingBuffer
- RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
- OrderEventProducer producer = new OrderEventProducer(ringBuffer);
- ByteBuffer bb = ByteBuffer.allocate(8);
- for (long i = 0; i < 5; i++) {
- bb.putLong(0, i);
- //向容器中投递数据
- producer.sendData(bb);
- }
- disruptor.shutdown();
- executor.shutdown();
- }
- }
- public class OrderEventProducer {
- private RingBuffer<OrderEvent> ringBuffer;
-
- public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
- this.ringBuffer = ringBuffer;
- }
-
- public void sendData(ByteBuffer data) {
- //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
- long sequence = ringBuffer.next();
- try {
- //2.根据这个序号, 找到具体的"OrderEvent"元素
- //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
- OrderEvent event = ringBuffer.get(sequence);
- //3.进行实际的赋值处理
- event.setValue(data.getLong(0));
- } finally {
- //4.提交发布操作
- ringBuffer.publish(sequence);
- }
- }
- }
- public class OrderEventHandler implements EventHandler<OrderEvent> {
- public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
- Thread.sleep(1000);
- System.err.println("消费者: " + event.getValue());
- }
- }
复制代码- //多生产者多消费者的使用示例
- public class Main {
- public static void main(String[] args) throws InterruptedException {
- //1.创建RingBuffer
- RingBuffer<Order> ringBuffer = RingBuffer.create(
- ProducerType.MULTI,//多生产者
- new EventFactory<Order>() {
- public Order newInstance() {
- return new Order();
- }
- },
- 1024 * 1024,
- new YieldingWaitStrategy()
- );
- //2.通过ringBuffer创建一个屏障
- SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
- //3.创建消费者数组,每个消费者Consumer都需要实现WorkHandler接口
- Consumer[] consumers = new Consumer[10];
- for (int i = 0; i < consumers.length; i++) {
- consumers[i] = new Consumer("C" + i);
- }
- //4.构建多消费者工作池WorkerPool,因为多消费者模式下需要使用WorkerPool
- WorkerPool<Order> workerPool = new WorkerPool<Order>(
- ringBuffer,
- sequenceBarrier,
- new EventExceptionHandler(),
- consumers
- );
- //5.设置多个消费者的sequence序号,用于单独统计每个消费者的消费进度, 并且设置到RingBuffer中
- ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
- //6.启动workerPool
- workerPool.start(Executors.newFixedThreadPool(5));
- final CountDownLatch latch = new CountDownLatch(1);
- for (int i = 0; i < 100; i++) {
- final Producer producer = new Producer(ringBuffer);
- new Thread(new Runnable() {
- public void run() {
- try {
- latch.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- for (int j = 0; j < 100; j++) {
- producer.sendData(UUID.randomUUID().toString());
- }
- }
- }).start();
- }
- Thread.sleep(2000);
- System.err.println("----------线程创建完毕,开始生产数据----------");
- latch.countDown();
- Thread.sleep(10000);
- System.err.println("任务总数:" + consumers[2].getCount());
- }
- }
- public class Producer {
- private RingBuffer<Order> ringBuffer;
-
- public Producer(RingBuffer<Order> ringBuffer) {
- this.ringBuffer = ringBuffer;
- }
- public void sendData(String uuid) {
- //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
- long sequence = ringBuffer.next();
- try {
- //2.根据这个序号, 找到具体的"Order"元素
- //注意:此时获取的Order对象是一个没有被赋值的"空对象"
- Order order = ringBuffer.get(sequence);
- //3.进行实际的赋值处理
- order.setId(uuid);
- } finally {
- //4.提交发布操作
- ringBuffer.publish(sequence);
- }
- }
- }
- public class Consumer implements WorkHandler<Order> {
- private static AtomicInteger count = new AtomicInteger(0);
- private String consumerId;
- private Random random = new Random();
- public Consumer(String consumerId) {
- this.consumerId = consumerId;
- }
- public void onEvent(Order event) throws Exception {
- Thread.sleep(1 * random.nextInt(5));
- System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + event.getId());
- count.incrementAndGet();
- }
- public int getCount() {
- return count.get();
- }
- }
复制代码 其中,RingBuffer的publish(sequence)方法会调用Sequencer接口的publish()方法来设置当前生产者的Sequence序号。
[code]abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7;}abstract class RingBufferFields extends RingBufferPad { ... private static final Unsafe UNSAFE = Util.getUnsafe(); private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; RingBufferFields(EventFactory eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) |