豺独 发表于 前天 00:15

[数据结构/Java] 数据结构之循环队列

1 概述:循环队列

循环队列


[*]循环队列: 一种先进先出(FIFO)的数据结构——它通过将【顺序队列】的末尾连接到开头,形成一个【环状结构】,从而解决了【顺序队列】的【虚假满状态问题】。




[*]【队列】:一种先进先出(First In First Out)的线性表,简称FIFO。允许插入的一端称为【队尾(head)】,允许删除的一端称为【队头(tail)】。


[*]循环队列在大数据领域应用场景较为丰富。


[*]Hadoop MapReduce 在 Shuffle 过程中的环形缓冲区
别名:循环缓冲队列,Circular Buffer Queue
在 Shuffle 过程中,【环形缓冲区】主要用于存储来自 Mapper 节点传输的数据。


[*]实时计算中,缓存单个设备的最近N秒的状态数据
[*]...


[*]环形缓冲区(Ring Buffer,也称为循环缓冲区或 Circular Buffer Queue)是【无锁队列】实现中一种【高效的数据结构】,特别适合【高性能并发场景】,如线程池任务调度、实时系统、服务器请求处理等。


[*]环形缓冲区是一个逻辑上通过【头指针】(head)和【尾指针】(tail)形成循环队列的数据结构。
换言之,它由一个【固定大小】的【列表】和两个【指针】组成。一个指针 head 用于指向队列的头部,另一个指针 tail 则用于指向队列的尾部。
当指针到达数组末尾时,自动“绕回”到开头(通过模运算)。它常用于【无锁队列】,因为:


[*]固定内存:避免动态分配,减少内存碎片。
[*]缓存友好:连续内存布局提高缓存命中率。
[*]高效操作:头尾指针通过原子操作更新,支持并发访问。
核心组件


[*]缓冲区:固定大小的数组或链表。
[*]头指针(head):指向下一个读取位置(消费者使用)。
[*]尾指针(tail):指向下一个写入位置(生产者使用)。
[*]大小计数器(size):跟踪队列中元素数量(可选,视场景)。
[*]原子操作:使用 std::atomic(C/C++) / AtomicXxx(Java) 等锁机制,确保 head 和 tail 的线程安全更新。
不同策略下的循环队列


[*]有锁的循环队列


[*]覆盖策略(满时入队覆盖队首)
[*]阻塞策略(满/空时阻塞等待)
[*]非阻塞策略(满/空时返回 null / 抛异常)


[*]无锁的循环队列
基于 ConcurrentLinkedDeque 实现线程安全的循环队列的思路


[*]ConcurrentLinkedDeque 是 Java 并发包中提供的非阻塞、线程安全的【双端队列】,基于【无锁】(CAS)机制实现。
java.util.concurrent.ConcurrentLinkedDeque


[*]要基于 ConcurrentLinkedDeque 实现线程安全的循环队列。
核心思路是:利用 ConcurrentLinkedDeque 的并发安全特性封装队列操作,通过固定容量限制实现“【循环】”(队列满时入队会覆盖/阻塞/抛异常,队空时出队会阻塞/抛异常)。


[*]核心设计要点


[*]固定容量:循环队列的核心是容量固定,满后入队需遵循循环规则(覆盖旧元素/阻塞/拒绝)。
[*]线程安全:复用ConcurrentLinkedDeque的并发安全特性,避免手动加锁。
[*]循环逻辑:入队时若队列满,根据策略处理(如覆盖队首、阻塞等待、抛异常);出队时若空,同理。
循环队列的特点


[*]优势:


[*]无动态分配,性能高。
[*]内存布局连续,缓存命中率高。
[*]适合固定容量的高性能场景。


[*]挑战:


[*]固定容量,可能溢出或不足。
[*]MPMC 场景下【头尾指针竞争】,可能导致 CAS 重试。
[*]需要仔细处理【内存序】和【数据一致性】。
2 覆盖式循环队列

此版本,项目亲测。
2.1 实现思路


[*]基于 Java JDK 的 并发双端队列(java.util.concurrent.ConcurrentLinkedDeque)实现覆盖式循环队列。
容量固定,满时入队覆盖队首元素 (即: 新元素从队尾进入,满时覆盖队首元素(第n个元素向前覆盖第n-1个元素,n>=2))
2.2 源码实现(Java)

CoverStrategyCircularQueue

import com.alibaba.fastjson.JSON;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 基于 ConcurrentLinkedDeque 的循环队列(覆盖策略)
* 容量固定,满时入队覆盖队首元素 (即: 新元素从队尾进入,满时覆盖队首元素(第n个元素向前覆盖第n-1个元素,n>=2))
* @param <E> 队列元素类型
*/
public class CoverStrategyCircularQueue<E> implements Serializable {
    // 并发安全的双端队列(底层存储)
    private final ConcurrentLinkedDeque<E> deque;

    // 队列最大容量(原子类保证并发下计数准确)
    private final int capacity;

    // 当前元素数量(原子操作,避免并发计数错误)
    private final AtomicInteger size = new AtomicInteger(0);

    /**
   * 构造循环队列
   * @param capacity 最大容量(必须>0)
   */
    public CoverStrategyCircularQueue(int capacity) {
      if (capacity <= 0) {
            throw new IllegalArgumentException("The capacity must be greater than 0");//容量必须大于0
      }
      this.capacity = capacity;
      this.deque = new ConcurrentLinkedDeque<>();
    }

    /**
   * 基于 list , 构造循环队列
   * @param capacity
   * @param list 允许为 null
   */
    public CoverStrategyCircularQueue(int capacity, @Nullable List<E> list) {
      this(capacity);
      if(list != null && list.size() != 0){
            for (E element : list) {
                offer(element);
            }
      }
    }

    /**
   * 入队:新元素从队尾进入,满时覆盖队首元素(第n个元素向前覆盖第n-1个元素,n>=2)
   * @param element 待入队元素
   * @return 成功入队返回true(覆盖时也返回true)
   */
    public boolean offer(E element) {
      if (element == null) {
            throw new NullPointerException("The element not be null");//元素不能为null
      }
      // 循环核心:满了先移除队首,再入队
      while (size.get() >= capacity) {
            // 移除队首(CAS保证原子性,避免并发下重复移除)
            if (deque.pollFirst() != null) {
                size.decrementAndGet();
            }
      }
      // 入队尾
      deque.offerLast(element);
      size.incrementAndGet();
      return true;
    }

    /**
   * 出队:空时返回null
   * @return 队首元素,空则返回null
   */
    public E poll() {
      E element = deque.pollFirst();
      if (element != null) {
            size.decrementAndGet();
      }
      return element;
    }

    /**
   * 查看队首元素(不移除)
   * @return 队首元素,空则返回null
   */
    public E peekFirst() {
      return deque.peekFirst();
    }

    /**
   * 查看队尾元素(不移除)
   * @return 队尾元素,空则返回 null
   */
    public E peekLast() {
      return deque.peekLast();
    }

    /**
   * 获取倒数第 index 个元素(不删除元素)
   * @note 从队尾向队首遍历
   * @param index 倒数第 index 个元素 , index ∈
   * @return
   */
    public E getLast(Integer index) {
      Iterator<E> iterator = deque.descendingIterator();
      E result = null;
      int cursor = 0;
      while( iterator.hasNext() && cursor < index) {
            result = iterator.next();
      }
      return result;
    }

    /**
   * 获取正数第 index 个元素(不删除元素)
   * @note 从队首向队尾遍历
   * @param index 正数第 index 个元素 , index ∈
   * @return
   */
    public E get(Integer index) {
      Iterator<E> iterator = deque.iterator();
      E result = null;
      int cursor = 0;
      while( iterator.hasNext() && cursor < index) {
            result = iterator.next();
      }
      return result;
    }

    /**
   * 获取当前元素数量
   * @return 元素个数
   */
    public int size() {
      return size.get();
    }

    /**
   * 判断队列是否已满
   * @return 满则true
   */
    public boolean isFull() {
      return size.get() >= capacity;
    }

    /**
   * 判断队列是否为空
   * @return 空则true
   */
    public boolean isEmpty() {
      return size.get() == 0;
    }

    /**
   * 清空队列
   */
    public void clear() {
      deque.clear();
      size.set(0);
    }

    /**
   * 获取底层的队列
   * @note 原则上,不允许获取底层队列,但为了方便特殊场景下的数据操纵、及验证测试,故此处提供该方法
   * @return
   */
    public ConcurrentLinkedDeque getDeque() {
      return deque;
    }

    /**
   * 转 List
   * @return
   */
    public List<E> toList(){
      List<E> list = new ArrayList<>();
      if( this.deque == null || this.deque.isEmpty() ){
            return list;
      }
      for (E element : this.deque) {
            list.add(element);
      }
      return list;
    }

    @Override
    public String toString() {
      return "CoverStrategyCircularQueue{" +
                "deque=" + JSON.toJSONString(deque) +
                ", capacity=" + capacity +
                ", size=" + size +
                '}';
    }
}CircularQueueTest

public class CircularQueueTest {
    private final static Logger log = LoggerFactory.getLogger(CircularQueueTest.class);

    @Test
    public void CoverStrategyCircularQueueTest(){
      int length = 5;
      CoverStrategyCircularQueue<String> queue = new CoverStrategyCircularQueue<>(length);

      //入队
      for (int i = 0; i < length + 3; i++) {
            queue.offer( (new Integer(i+1)).toString() );
      }
      log.info("queue:{}", queue);//

      //出队
      queue.poll();
      queue.poll();
      queue.poll();
      log.info("queue:{}", queue);//
    }
}3 阻塞式循环队列(等待策略)


[*]满时入队阻塞,空时出队阻塞,适合生产-消费模型,需结合 Lock和 Condition实现。


[*]ReentrantLock : 可重入的互斥锁,又被称为“独占锁”
[*]Condition : Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此,通常来说比较推荐使用Condition。
源码实现(Java)

import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* 基于 ConcurrentLinkedDeque 的阻塞循环队列
* 满时入队阻塞,空时出队阻塞
* @param <E> 元素类型
*/
public class BlockingCircularQueue<E> {
    private final ConcurrentLinkedDeque<E> deque;
    private final int capacity;
    // 锁 + 条件变量(空/满)
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

    public BlockingCircularQueue(int capacity) {
      if (capacity <= 0) {
            throw new IllegalArgumentException("容量必须大于0");
      }
      this.capacity = capacity;
      this.deque = new ConcurrentLinkedDeque<>();
    }

    /**
   * 阻塞入队:满则等待
   * @param element 元素
   * @throws InterruptedException 中断异常
   */
    public void put(E element) throws InterruptedException {
      if (element == null) {
            throw new NullPointerException("元素不能为null");
      }
      lock.lockInterruptibly();
      try {
            // 满则等待
            while (deque.size() >= capacity) {
                notFull.await();
            }
            deque.offerLast(element);
            // 唤醒出队阻塞的线程
            notEmpty.signal();
      } finally {
            lock.unlock();
      }
    }

    /**
   * 阻塞出队:空则等待
   * @return 队首元素
   * @throws InterruptedException 中断异常
   */
    public E take() throws InterruptedException {
      lock.lockInterruptibly();
      try {
            // 空则等待
            while (deque.isEmpty()) {
                notEmpty.await();
            }
            E element = deque.pollFirst();
            // 唤醒入队阻塞的线程
            notFull.signal();
            return element;
      } finally {
            lock.unlock();
      }
    }

    /**
   * 非阻塞入队:满则返回false
   * @param element 元素
   * @return 成功则true
   */
    public boolean offer(E element) {
      if (element == null) {
            throw new NullPointerException("元素不能为null");
      }
      lock.lock();
      try {
            if (deque.size() >= capacity) {
                return false;
            }
            deque.offerLast(element);
            notEmpty.signal();
            return true;
      } finally {
            lock.unlock();
      }
    }

    /**
   * 非阻塞出队:空则返回null
   * @return 队首元素
   */
    public E poll() {
      lock.lock();
      try {
            if (deque.isEmpty()) {
                return null;
            }
            E element = deque.pollFirst();
            notFull.signal();
            return element;
      } finally {
            lock.unlock();
      }
    }

    // 辅助方法(size/isEmpty/isFull/clear)
    public int size() {
      lock.lock();
      try {
            return deque.size();
      } finally {
            lock.unlock();
      }
    }

    public boolean isEmpty() {
      return size() == 0;
    }

    public boolean isFull() {
      return size() >= capacity;
    }

    public void clear() {
      lock.lock();
      try {
            deque.clear();
            notFull.signalAll(); // 唤醒所有入队阻塞线程
      } finally {
            lock.unlock();
      }
    }
}关键注意事项


[*]并发计数准确性:


[*]ConcurrentLinkedDeque.size()是O(n)操作,高并发下性能差,因此基础版用AtomicInteger维护计数,阻塞版用锁保护计数。

[*]null元素禁止: ConcurrentLinkedDeque不允许null元素,因此入队时需校验。
[*]循环策略选择:


[*]覆盖策略:适合日志缓存、临时数据存储(允许旧数据被覆盖)。
[*]阻塞策略:适合生产-消费模型(如任务队列,需严格控制容量)。
[*]非阻塞策略:适合快速响应场景(满/空时直接返回,不阻塞)。
使用示例

public class CircularQueueTest {
    public static void main(String[] args) {
      // 基础版(覆盖策略)
      CircularQueue<String> queue = new CircularQueue<>(3);
      queue.offer("A");
      queue.offer("B");
      queue.offer("C");
      queue.offer("D"); // 满,覆盖队首"A"
      System.out.println(queue.poll()); // 输出B
      System.out.println(queue.size()); // 输出3(B/C/D)

      // 阻塞版(生产-消费)
      BlockingCircularQueue<Integer> blockingQueue = new BlockingCircularQueue<>(2);
      // 生产者线程
      new Thread(() -> {
            try {
                blockingQueue.put(1);
                blockingQueue.put(2);
                blockingQueue.put(3); // 满,阻塞
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
      }).start();

      // 消费者线程
      new Thread(() -> {
            try {
                Thread.sleep(1000);
                System.out.println(blockingQueue.take()); // 输出1,唤醒生产者
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
      }).start();
    }
}4 非阻塞循环队列(满/空时返回 null / 抛异常)

实现思路

基于 ConcurrentLinkedDeque 实现非阻塞循环队列


[*]ConcurrentLinkedDeque 是 Java 并发包中提供的非阻塞、线程安全的双端队列,基于无锁(CAS)机制实现。
[*]要基于它封装非阻塞式的循环队列,核心思路是:

[*]限制队列容量,模拟循环队列的“固定长度”特性;
[*]利用双端队列的首尾操作(offer/poll)模拟循环入队/出队;
[*]基于 CAS 保证并发安全,避免阻塞(非阻塞核心);
[*]处理队列满/空时的非阻塞策略(如返回 false/空值,而非等待)。
核心实现要点


[*]容量限制:通过原子变量记录当前元素数,入队前检查是否已满,出队后更新计数;
[*]循环逻辑:无需手动维护头尾指针(ConcurrentLinkedDeque 已封装链表节点的 CAS 操作),仅需在容量满时拒绝入队,空时拒绝出队;
[*]非阻塞特性:所有操作均为非阻塞,失败时立即返回结果,不使用锁或条件变量等待。
源码实现(Java)

NonBlockingCircularQueue

import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 基于 ConcurrentLinkedDeque 实现的非阻塞循环队列
* 特性:固定容量、线程安全、非阻塞(操作失败立即返回,不等待)
* @param <E> 队列元素类型
*/
public class NonBlockingCircularQueue<E> {
    // 底层非阻塞双端队列
    private final ConcurrentLinkedDeque<E> deque;
    // 循环队列的固定容量
    private final int capacity;
    // 原子计数器,记录当前元素数量(保证并发下计数准确)
    private final AtomicInteger size;

    /**
   * 构造函数:初始化循环队列容量
   * @param capacity 队列容量(必须>0)
   */
    public NonBlockingCircularQueue(int capacity) {
      if (capacity <= 0) {
            throw new IllegalArgumentException("容量必须大于0");
      }
      this.capacity = capacity;
      this.deque = new ConcurrentLinkedDeque<>();
      this.size = new AtomicInteger(0);
    }

    /**
   * 非阻塞入队(尾插):队列满则返回false,否则入队并返回true
   * @param e 待入队元素(非null)
   * @return 入队成功返回true,失败(队列满)返回false
   */
    public boolean offer(E e) {
      if (e == null) {
            throw new NullPointerException("元素不能为null");
      }
      // 自旋CAS保证计数准确(避免并发下容量判断错误)
      while (true) {
            int currentSize = size.get();
            // 队列已满,直接返回false(非阻塞核心)
            if (currentSize >= capacity) {
                return false;
            }
            // CAS更新计数:成功则执行入队,失败则重试
            if (size.compareAndSet(currentSize, currentSize + 1)) {
                deque.offerLast(e);
                return true;
            }
            // CAS失败说明有其他线程修改了计数,自旋重试
      }
    }

    /**
   * 非阻塞出队(头出):队列空则返回null,否则出队并返回元素
   * @return 出队元素,队列为空返回null
   */
    public E poll() {
      // 自旋CAS保证计数准确
      while (true) {
            int currentSize = size.get();
            // 队列为空,直接返回null(非阻塞核心)
            if (currentSize <= 0) {
                return null;
            }
            // CAS更新计数:成功则执行出队,失败则重试
            if (size.compareAndSet(currentSize, currentSize - 1)) {
                return deque.pollFirst();
            }
            // CAS失败说明有其他线程修改了计数,自旋重试
      }
    }

    /**
   * 查看队头元素(不删除)
   * @return 队头元素,队列为空返回null
   */
    public E peek() {
      return deque.peekFirst();
    }

    /**
   * 获取当前队列元素数量
   * @return 元素数量
   */
    public int size() {
      return size.get();
    }

    /**
   * 判断队列是否已满
   * @return 满返回true,否则false
   */
    public boolean isFull() {
      return size.get() >= capacity;
    }

    /**
   * 判断队列是否为空
   * @return 空返回true,否则false
   */
    public boolean isEmpty() {
      return size.get() <= 0;
    }

    /**
   * 获取队列容量
   * @return 容量
   */
    public int getCapacity() {
      return capacity;
    }
}关键细节说明


[*]原子计数器(AtomicInteger):

[*]必须用原子变量记录元素数量,避免并发下size()与实际队列元素数不一致(ConcurrentLinkedDeque 的size()是遍历计数,性能差且非原子);
[*]入队/出队时通过compareAndSet(CAS)自旋更新计数,保证计数与实际操作的原子性。

[*]非阻塞特性:

[*]入队时若队列满,直接返回false,不阻塞等待;
[*]出队时若队列空,直接返回null,不阻塞等待;
[*]所有操作基于 CAS 自旋,无锁、无阻塞,适合高并发低延迟场景。

[*]循环逻辑:

[*]无需手动维护循环队列的头尾指针(如数组实现的循环队列),ConcurrentLinkedDeque 已通过链表节点的 CAS 操作实现高效的首尾操作;
[*]“循环”体现在“固定容量+满则拒绝入队”,出队后释放容量可再次入队,模拟循环复用空间。

[*]线程安全:

[*]底层 ConcurrentLinkedDeque 保证了队列操作的线程安全;
[*]原子计数器保证了容量判断的准确性;
[*]CAS 自旋解决了“检查-更新”的竞态条件(如先判断容量,再入队的间隙被其他线程修改容量)。

适用场景与局限性

适用场景


[*]高并发、低延迟的生产-消费场景;
[*]不需要阻塞等待(如生产者可丢弃数据,消费者可跳过空队列);
[*]容量固定,需循环复用队列空间。
局限性


[*]不支持阻塞式操作(如需阻塞等待,应使用ArrayBlockingQueue/LinkedBlockingDeque);
[*]基于链表实现,内存开销略高于数组实现的循环队列;
[*]CAS 自旋在高并发下可能导致 CPU 占用升高(可通过限制自旋次数优化)。
优化方向


[*]自旋次数限制:避免无限自旋,可设置最大自旋次数,超过后返回失败;
[*]批量操作:提供批量入队/出队方法,减少 CAS 次数;
[*]公平性:可选公平策略(如按线程顺序自旋),避免线程饥饿;
[*]元素过期:支持过期元素自动清理,适配缓存场景。
Z 最佳实践

循环队列的性能优化建议


[*]高并发场景下,优先选择覆盖策略,避免【锁竞争】。
[*]若需阻塞策略,可考虑直接使用ArrayBlockingQueue(JDK 原生,性能更优)
[*]避免频繁调用 size()方法,高并发下改用isEmpty() / isFull()替代(O(1) 操作)。
基于循环队列实现滑动窗口


[*] 数据结构之滑动窗口 - 博客园/千千寰宇
方案2:基于【循环队列】实现【滑动窗口】
Hadoop MapReduce 在 Shuffle 过程中环形缓冲区的应用


[*]Hadoop MapReduce 在 Shuffle 过程中的环形缓冲区
别名:循环缓冲队列,Circular Buffer Queue
在 Shuffle 过程中,【环形缓冲区】主要用于存储来自 Mapper 节点传输的数据。


[*]环形缓冲区的工作原理
【环形缓冲区】的工作原理是【基于生产者-消费者模型】的。


[*]在 Shuffle 过程中,Mapper 节点充当生产者的角色,将数据写入【环形缓冲区】;而 Reducer 节点则充当【消费者】的角色,从【环形缓冲区】中读取数据并进行后续的处理。
[*]当 Mapper 节点将数据写入【环形缓冲区】时,tail 指针会递增。
如果 tail 指针追上了 head 指针,表示缓冲区已满,此时 Mapper 节点会等待一段时间,直到 Reducer 节点读取并释放了一些空间,再将数据写入【环形缓冲区】。


[*]当 Reducer 节点从【环形缓冲区】中读取数据时,head 指针会递增。
如果 head 指针追上了 tail 指针,表示缓冲区已空,此时 Reducer 节点会等待一段时间,直到 Mapper 节点写入了更多的数据,再继续读取。


[*]环形缓冲区在 Shuffle 过程中的作用


[*]环形缓冲区在 Shuffle 过程中起到了至关重要的作用。
[*]它将 Mapper 节点产生的数据进行【临时存储】,以便 Reducer 节点能够按照预定的顺序和方式进行读取和处理。
[*]另外,由于 Hadoop 在 Shuffle 过程中使用了磁盘进行大规模的数据传输,而磁盘读写较慢。
因此,【环形缓冲区】通过在【内存】中存储数据,加速了数据的传输和处理过程,提高了整个 Shuffle 过程的效率和性能。


[*]推荐文献


[*]Hadoop:Shuffle 过程中的环形缓冲区 - 极简博客
Y 推荐文献

X 参考文献


[*]环形缓冲区(Ring Buffer,也称为循环缓冲区或 Circular Buffer)是无锁队列实现中一种高效的数据结构 - CSDN
    本文作者:      千千寰宇   
    本文链接:         https://www.cnblogs.com/johnnyzen   
    关于博文:评论和私信会在第一时间回复,或直接私信我。   
    版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA   许可协议。转载请注明出处!
    日常交流:大数据与软件开发-QQ交流群: 774386015      【入群二维码】参见左下角。您的支持、鼓励是博主技术写作的重要动力!   

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: [数据结构/Java] 数据结构之循环队列