写作目的
说到无锁,其实就是用cas,不过我在百度上搜java实现无锁队列的文章其实不多,所以自己用cas和volatile实现一下,线程安全那是必须的。
无锁队列
package untils;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Data;
import sun.misc.Unsafe;
/**
* Created on 2021-06-23
*/
public class NoLockQueue {
private static final Unsafe unsafe;
//头节点
volatile Node head;
//尾节点
volatile Node tail;
//头节点偏移量
private static final Long headOffset;
//尾节点偏移量
private static final Long tailOffset;
//当前队列的长度
private AtomicInteger length = new AtomicInteger(0);
//队列允许的最大长度
private int maxSize = 0;
static {
try {
//获取成员变量
Field field = Unsafe.class.getDeclaredField("theUnsafe");
//设置为可访问
field.setAccessible(true);
//是静态字段,用null来获取Unsafe实例
unsafe = (Unsafe) field.get(null);
//设置头节点变量在类中的偏移值
headOffset = unsafe.objectFieldOffset(NoLockQueue.class.getDeclaredField("head"));
//设置尾节点变量在类中的偏移值
tailOffset = unsafe.objectFieldOffset(NoLockQueue.class.getDeclaredField("tail"));
} catch (Exception e) {
System.out.println(e.getLocalizedMessage());
throw new Error(e);
}
}
public NoLockQueue() {
this.maxSize = Integer.MAX_VALUE;
//初始化节点
head = tail = new Node();
}
public NoLockQueue(int maxSize) {
this.maxSize = maxSize;
//初始化节点
head = tail = new Node();
}
/**
* 入队
*/
public void enQueue(int value) {
//创建新节点
Node newNode = new Node();
newNode.setValue(value);
while (true) {
//获取尾节点
Node oldTail = this.tail;
if (length.get() < maxSize && oldTail.casNext(null, newNode)) {
System.out.println(Thread.currentThread().getName() + "进队列:" + value);
unsafe.compareAndSwapObject(this, tailOffset, oldTail, newNode);
length.incrementAndGet();
break;
}
}
}
/**
* 出队
*/
public void dequeue() {
while (true) {
//如果没有数据
if (length.get() <= 0) {
continue;
}
//获取头节点
Node oldHead = this.head;
Node oldNext = oldHead.getNext();
if (unsafe.compareAndSwapObject(this, headOffset, head, oldNext)) {
System.out.println(Thread.currentThread().getName() + "出队列:" + head.getValue());
length.decrementAndGet();
break;
}
}
}
}
@Data
class Node {
//Unsafe类
private static final Unsafe unsafe;
//next变量的偏移量
private static final Long nextOffset;
//值
private volatile int value;
//next节点
private volatile Node next;
static {
try {
//获取成员变量
Field field = Unsafe.class.getDeclaredField("theUnsafe");
//设置为可访问
field.setAccessible(true);
//是静态字段,用null来获取Unsafe实例
unsafe = (Unsafe) field.get(null);
//获取state变量在类中的偏移值
nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
} catch (Exception e) {
System.out.println(e.getLocalizedMessage());
throw new Error(e);
}
}
/**
* cas的方式设置next的值
* @param before 期望的值
* @param after 修改的值
* @return
*/
public boolean casNext(Node before, Node after) {
return unsafe.compareAndSwapObject(this, nextOffset, before, after);
}
}
测试类
package untils;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created on 2021-06-23
*/
public class Main {
public static void main(String[] args) {
NoLockQueue queue = new NoLockQueue(10);
ExecutorService executorService = Executors.newFixedThreadPool(20);
Random random = new Random();
for (int i = 0; i < 10; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
queue.enQueue(random.nextInt());
}
});
}
// //判断入队的顺序
// System.out.println("----------------");
// try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
// Node p = queue2.head.getNext();
// while (p != null){
// System.out.println(p.getValue());
// p = p.getNext();
// }
//
// System.out.println("----------------");
for (int i = 0; i < 10; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
queue.dequeue();
}
});
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
executorService.shutdown();
}
}
小插曲
unsafe类的获取
其实当时参考的是AtomicInteger里获取unsafe方法
private static final Unsafe unsafe = Unsafe.getUnsafe();
但是报错了,报错的原因竟然是 双亲委派模型
关于通过Unsafe.getUnsafe()方法拿Unsafe对象抛出SecurityException异常的原因 – 言午12138 – 博客园
那怎么获取unsafe类呢,如下所示,固定格式
//获取成员变量
Field field = Unsafe.class.getDeclaredField("theUnsafe");
//设置为可访问
field.setAccessible(true);
//是静态字段,用null来获取Unsafe实例
Unsafe unsafe = (Unsafe) field.get(null);
打印顺序不对,影响了代码的正确性
/**
* 出队
*/
public void dequeue() {
while (true) {
//如果没有数据
if (length.get() <= 0) {
continue;
}
//获取头节点
Node oldHead = this.head;
Node oldNext = oldHead.getNext();
if (unsafe.compareAndSwapObject(this, headOffset, head, oldNext)) {
//正确的位置
//System.out.println(Thread.currentThread().getName() + "出队列:" + head.getValue());
length.decrementAndGet();
//错误的位置
System.out.println(Thread.currentThread().getName() + "出队列:" + head.getValue());
break;
}
}
}
一开始我在错误的位置打印,发现入队和出队的顺序不一样,后来换了一个位置试了一下,好了。最后还是分析一下为什么吧。
比如此时此刻 队列里有2个元素A和B。现在2个线程按照下面的顺序执行,其实理论上出队顺序是没有问题的,只不过后面的先打印了,给了一种先出队的错觉。
收获
其实JAVA 无锁队列/栈_meiyongdesan的博客-CSDN博客 这个里面使用AtomicReference实现的,主要想用他的cas;但是我感觉有些绕,所以就自己用unsafe类实现cas。
断断续续写了一天才写了一个demo,其实不亏,至少写出来了。
参考
JAVA 无锁队列/栈_meiyongdesan的博客-CSDN博客
关于通过Unsafe.getUnsafe()方法拿Unsafe对象抛出SecurityException异常的原因 – 言午12138 – 博客园
ConcurrentLinkedQueue源码
今天的文章java实现无锁队列分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/5651.html