Java中的队列_java实现简单队列queue

Java中的队列_java实现简单队列queue目录参考Deque从初学者的角度,认真地学习Java中队列的使用和设计

从初学者的角度,认真地学习Java中队列的使用和设计。

参考

  • java doc
  • DelayQueue详解

Queue

继承Collection接口
在这里插入图片描述

Deque

一个支持两端插入和删除的线性集合,此接口支持容量受限和不受限的双端队列(大多数实现容量不受限)。

该接口定义了访问两端元素的方法,主要是插入、删除、检查元素方法。这些方法主要有两种形式,一种在操作失败时引发异常,一种在操作失败时返回特殊值(null 或者false)。这里着重提一下插入操作,只有当队列容量受限时,插入操作才可能失败。

12个方法如下
在这里插入图片描述
该接口扩展了Queue接口。 当双端队列被用作队列时,将导致FIFO(先进先出)行为。 元素在双端队列的末尾添加,并从开头删除。 从Queue接口继承的方法与Deque方法完全等效,如下表所示:
在这里插入图片描述
双端队列也可以用作LIFO(后进先出)堆栈。 此接口应优先于旧版Stack类使用。 当双端队列被用作堆栈时,元素从双端队列的开始处被压入并弹出。 堆栈方法等同于Deque方法如下表所示:
在这里插入图片描述
强烈建议不要在队列中插入null ,因为null是队列中某些方法的返回值,具有特殊意义,比如队列中没有元素了。

BlockingQueue

BlockingQueue支持队列访问时阻塞(如果检索队列时队列已空,等待其有元素后再返回;如果存放元素时队列已满,等待队列有空间存放元素后再返回)。

接口内部的方法主要可以分为以下4类
在这里插入图片描述
该接口的方法是线程安全的,因为实现类内部都使用了锁机制.

ArrayBlockingQueue

数组支持的有界阻塞队列。该队列对元素FIFO(先进先出)进行排序。队列的开头是已在队列中停留最长时间的元素。队列的尾部是最短时间位于队列中的元素。新元素插入到队列的尾部,并且队列检索操作在队列的开头获取元素。

这是经典的“有界缓冲区”,其中固定大小的数组包含由生产者插入并由消费者提取的元素。创建后,容量将无法更改。试图将一个元素放入一个完整的队列将导致操作阻塞(put方法)。试图从空队列中取出一个元素的尝试也会类似地阻塞(take方法)。

此类支持给予等待的生产者和使用者线程一个可选的公平性策略。默认情况下,不保证此排序(公平性策略为false)。但是,将公平性设置为true构造的队列将按FIFO顺序授予线程访问权限。公平通常会降低吞吐量,但会减少可变性并避免饥饿

分析一段代码

public void test() throws InterruptedException { 
   
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<String>(4);
        new Thread(new Runnable() { 
   
            @Override
            public void run() { 
   
                try { 
   
                    blockingQueue.put("1");
                    blockingQueue.put("2");
                    blockingQueue.put("3");
                    blockingQueue.put("4");
                    blockingQueue.put("5");
                    blockingQueue.put("6");
                }catch (Exception e){ 
   
                }
            }
        },"thread1").start();
        
        
        Thread.sleep(2*1000);
        //使thread1执行到put(5)时阻塞住
        //此时数组元素为[1,2,3,4]
        new Thread(new Runnable() { 
   
            @Override
            public void run() { 
   
                try { 
   
                    //移除[1,2,3,4]中的1,2
                    blockingQueue.take();
                    blockingQueue.take();
                    //接着thread1执行put(5),put(6)
                    //数组元素此时为[5,6,3,4]
                }catch (Exception e){ 
   
                }
            }
        },"thread2").start();
        
        while (true){ 
   
        }
    }

在这里插入图片描述
内部维护了一个putIndex, takeIndex,表示此次put(take)元素的位置(从0开始), 到达数组元素size后,重置为0,重新来算。

所以FIFO,元素出队列后,并没有随后的元素都往前移动一位,而是将take的指针takeIndex往后移动一位,逻辑上等价于元素都往前移动一位。

LinkedBlockingQueue

链表组成的有界阻塞队列,内部使用两个可冲入锁, put操作可重入锁, take操作可冲入锁, 以及两个锁上相应的Condition

public void put(E e) throws InterruptedException { 
   
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try { 
   
            
            while (count.get() == capacity) { 
   
                //队列已满,阻塞当前
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                //队列不满,唤醒put操作线程
                //这里是必要的,因为假设上面的线程A notFull.await释放了put锁,
                // 其他的线程B也进入了while死循环,当线程C执行take()后
                // A线程获取到put锁,跳出while循环, count原子性+1, 
                // 线程C仍处在while循环中
                // A signal唤醒C并且A释放put锁后,C才有可能从while中恢复
                notFull.signal();
        } finally { 
   
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }


private void signalNotEmpty() { 
   
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try { 
   
            notEmpty.signal();
        } finally { 
   
            takeLock.unlock();
        }
    }
..................................................
public E take() throws InterruptedException { 
   
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try { 
   
            while (count.get() == 0) { 
   
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally { 
   
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

private void signalNotFull() { 
   
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try { 
   
            notFull.signal();
        } finally { 
   
            putLock.unlock();
        }
    }

PriorityBlockingQueue

无界数组结构优先级队列(无界是doc上描述的,实际上数组长度长度不得超过Integer.MAX_VALUE-8)

使用了最小堆,put元素时,维护一个最小堆;take元素时,移除队首元素,则是堆顶元素(最小值),优先级最高(或者最低,具体看comparable接口方法的实现)的元素首先被移除

参考PriorityBlockingQueue 原理分析

有几个关键方法,put元素构造最小堆的siftUpZComparable方法,如下:

//将元素x置于k位置(k从0算起)
//方法执行完后,维护着一个数组结构的最小堆
private static <T> void siftUpComparable(int k, T x, Object[] array) { 
   
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) { 
   
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            //表示当前元素>=父节点元素,不做堆调整
            if (key.compareTo((T) e) >= 0)
                break;
            //当前元素 < 父节点元素,要做调整
            array[k] = e;
            k = parent;
        } //直到不比父节点小,才跳出循环
        
        //将入参元素置于节点k中
        //1. 若一进来,x元素就>=父节点,则k=入参中的k
        //2. 若while循环执行过,则k就是入参k的某级父节点,将入参元素x设置到其位置上
        array[k] = key;
    }

take元素移除首节点(移除优先级最低的元素), siftDownComparable

//将x元素插入到k位置(k从0算起),向下调整
//执行take时,最终会调用到siftDownComparable(0,x,array, size-1)
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) { 
   
        if (n > 0) { 
   
            Comparable<? super T> key = (Comparable<? super T>)x;
            //得到叶子节点的上一层下标
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) { 
   
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                //c表示k节点左右孩子节点中较小的那一个元素
                //child表示k节点左右孩子中较小的元素的下标
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }

举例:
移除队首元素前,队列是最小堆结构,如下:
在这里插入图片描述
take方法执行后,内部方法siftDownComparable执行前,结构如下:
在这里插入图片描述
执行siftDownComparable(0, 5, [2,7,3,8,9,4], 6)方法,
while循环第一次执行完后,结构如下,此时half=3, array[0]=3,k=2,
在这里插入图片描述
while循环第二次执行后,结构如下,此时half=3, array[2]=4, k = 5
在这里插入图片描述
while第三次执行时,while条件不满足,跳出,执行
array[5]=5,结束,结构如下:
在这里插入图片描述
至此,一个新的最小堆又生成了。

移除下标i的元素,总是等价于将数组最后一个下标元素值A取出来,但是下标引用置为null, 将A插入到i位置,但是这个插入的位置涉及到堆的向下调整,这是本质。


PriorityBlockingQueue构造方法支持入参传递一个集合对象(集合对象自然要实现Comparable接口),则构造方法内部就需要立刻调整元素,形成最小堆,如下

private void heapify() { 
   
        Object[] array = queue;
        int n = size;
        int half = (n >>> 1) - 1;
        //可以看出,从叶子节点的上一层开始,向下调整形成局部最小堆
        // i--,同样的调整操作直至操作到顶部节点,就调整生成了全局的最小堆
        Comparator<? super E> cmp = comparator;
        if (cmp == null) { 
   
            for (int i = half; i >= 0; i--)
                siftDownComparable(i, (E) array[i], array, n);
        }
        else { 
   
            for (int i = half; i >= 0; i--)
                siftDownUsingComparator(i, (E) array[i], array, n, cmp);
        }
    }

DelayQueue

无界延迟性阻塞队列,jdk中的java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask就是一个实现DelayQueue接口的子类
在这里插入图片描述
参考 DelayQueue详解

测试用例

@Test
    public void testScheduleTask() throws InterruptedException { 
   
        ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(5);
        for (int i = 0; i < 10; i++) { 
   
            int finalI = i;
            service.schedule(new Runnable() { 
   
                @Override
                public void run() { 
   
                    //十个任务,每个任务执行时间相差1s
                    System.out.println("i=" + finalI +
                            ", and thread=" +Thread.currentThread().getName() +
                            ",time=" + TestUtils.getTime());
                }
            }, i * 1000, TimeUnit.MILLISECONDS);
        }

        Thread.sleep(20*1000);
    }

在这里插入图片描述

SynchronousQueue

参考 Java 并发 — 阻塞队列之SynchronousQueue源码分析, 非常好的一篇从源码角度分析SynchronousQueue的博客。

LinkedTransferQueue

无界阻塞队列
Java 并发 — 阻塞队列之LinkedTransferQueue源码分析

LinkedBlockingQueue

链表结构的双向阻塞队列
Java 并发 — 阻塞队列之LinkedBlockingDeque源码分析

今天的文章Java中的队列_java实现简单队列queue分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/71524.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注