CAS无锁操作

CAS无锁操作CAS 原理 CAS 原语有三个参数 内存地址 期望值 新值

CAS原理:

CAS原语有三个参数,内存地址,期望值,新值。如果内存地址的值==期望值,表示该值未修改,此时可以修改成新值。否则表示修改失败,返回false,由用户决定后续操作。

缺点:

使用CAS会造成ABA问题

ABA 问题
thread1意图对val=1进行操作变成2,cas(*val,1,2)。
thread1先读取val=1;thread1被 抢占,让thread2运行。
thread2 修改val=3,又修改回1。
thread1继续执行,发现期望值与“原值”(其实被修改过了)相同,完成CAS操作。

解决方案
ABAʹ:添加额外的标记用来指示是否被修改。

维基百科给出定义:
int compare_and_swap(int* reg, int oldval, int newval) { ATOMIC(); int old_reg_val = *reg; if (old_reg_val == oldval) *reg = newval; END_ATOMIC(); return old_reg_val; }

bool compare_and_swap(int *accum, int *dest, int newval) { if (*accum == *dest) { *dest = newval; return true; } else { *accum = *dest; return false; } }

在实际的C/C++程序中,CAS的各种实现版本如下:

1)GCC的CAS

GCC4.1+版本中支持CAS的原子操作(完整的原子操作可参看 GCC Atomic Builtins)

1
2
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

2)Windows的CAS

在Windows下,你可以使用下面的Windows API来完成CAS:(完整的Windows原子操作可参看MSDN的InterLocked Functions)

1
2
3
InterlockedCompareExchange ( __inout LONG volatile *Target,
                                 __in LONG Exchange,
                                 __in LONG Comperand);

3) C++11中的CAS

C++11中的STL中的atomic类的函数可以让你跨平台。(完整的C++11的原子操作可参看 Atomic Operation Library)

1
2
3
4
5
6
template < class T >
bool atomic_compare_exchange_weak( std::atomic* obj,
                                    T* expected, T desired );
template < class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,
                                    T* expected, T desired );
gcc从4.1.2提供了__sync_*系列的built-in函数,用于提供加减和逻辑运算的原子操作。
其声明如下:
type __sync_fetch_and_add (type  * ptr, type value, ...)
type __sync_fetch_and_sub (type 
* ptr, type value, ...)
type __sync_fetch_and_or (type 
* ptr, type value, ...)
type __sync_fetch_and_and (type 
* ptr, type value, ...)
type __sync_fetch_and_xor (type 
* ptr, type value, ...)
type __sync_fetch_and_nand (type 
* ptr, type value, ...)

type __sync_add_and_fetch (type 
* ptr, type value, ...)
type __sync_sub_and_fetch (type 
* ptr, type value, ...)
type __sync_or_and_fetch (type 
* ptr, type value, ...)
type __sync_and_and_fetch (type 
* ptr, type value, ...)
type __sync_xor_and_fetch (type 
* ptr, type value, ...)
type __sync_nand_and_fetch (type 
* ptr, type value, ...)

这两组函数的区别在于第一组返回更新前的值,第二组返回更新后的值。
编程实例:
#include <iostream> #include <cstdio> #include <vector> #include <pthread.h> #include <unistd.h> using namespace std; static volatile int cnt=0; int mutex = 0, lock = 0, unlock=1; void* addelem(void* arg); bool compare_and_swap(int *currvalue, int oldvalue, int newvalue); bool compare_and_swap(int *currvalue, int oldvalue, int newvalue) { if(*currvalue == oldvalue) { *currvalue = newvalue; return true; } return false; } int main(int argc, char *argv[]) { vector<int> vec; pthread_t pid[5]; for (int i = 0; i < 5; ++i) { pthread_create(&pid[i], NULL, addelem, NULL); } for (int i = 0; i < 5; ++i) { pthread_join(pid[i], NULL); } return 0; } void* addelem(void* arg) { int i =0; while(i++ < 100) { while(!compare_and_swap(&mutex,lock,1)) { usleep(); } ++cnt; cout << cnt << endl; compare_and_swap(&mutex,unlock,0); } return NULL; } 

简化版本:
#include <iostream> #include <cstdio> #include <vector> #include <pthread.h> #include <unistd.h> using namespace std; #define Lock(lock) do{ \ while(!(__sync_bool_compare_and_swap(lock, 0, 1))) \ { \ sched_yield(); \ } \ }while(0); #define UnLock(lock) do{ \ *(lock) = 0; \ }while(0); #define TryLock(lock) { \ if(__sync_bool_compare_and_swap(lock, 0, 1)?0:-1); \ } static volatile int cnt=0; int lock = 0; void* addelem(void* arg); int main(int argc, char *argv[]) { vector<int> vec; pthread_t pid[5]; for (int i = 0; i < 5; ++i) { pthread_create(&pid[i], NULL, addelem, NULL); } for (int i = 0; i < 5; ++i) { pthread_join(pid[i], NULL); } return 0; } void* addelem(void* arg) { int i =0; while(i++ < 100) { Lock(&lock); ++cnt; cout << cnt << endl; UnLock(&lock); } return NULL; } 
注:sched_yield() 是出让cpu相当于sleep()函数

设计无锁并发栈

既然了解了 CAS,现在就来设计一个并发堆栈。这个堆栈没有锁;这种无锁的并发数据结构也称为非阻塞数据结构。清单 12 给出代码接口。

单 12. 基于链表的非阻塞堆栈实现 

 template <typename T> class Stack { typedef struct Node { T data; Node* next; Node(const T& d) : data(d), next(0) { } } Node; Node *top; public: Stack( ) : top(0) { } void push(const T& data); T pop( ) throw (…); };

 void Stack<T>::push(const T& data) { Node *n = new Node(data); while (1) { n->next = top; if (__sync_bool_compare_and_swap(&top, n->next, n)) { // CAS break; } } }

        从单一线程的角度来看,创建了一个新节点,它的 next 指针指向堆栈的顶部。接下来,调用 CAS 内置函数,把新的节点复制到 top 位置。 从多个线程的角度来看,完全可能有两个或更多线程同时试图把数据压入堆栈。假设线程 A 试图把 20 压入堆栈,线程 B 试图压入 30,而线程 A 先获得了时间片。但是,在 n->next = top 指令结束之后,调度程序暂停了线程 A。现在,线程 B 获得了时间片(它很幸运),它能够完成 CAS,把 30 压入堆栈后结束。接下来,线程 A 恢复执行,显然对于这个线程 *top 和 n->next 不匹配,因为线程 B 修改了 top 位置的内容。因此,代码回到循环的开头,指向正确的 top 指针(线程 B 修改后的),调用 CAS,把 20 压入堆栈后结束。整个过程没有使用任何锁。

 T Stack<T>::pop( ) { while (1) { Node* result = top; if (result == NULL) throw std::string(“Cannot pop from empty stack”); if (top && __sync_bool_compare_and_swap(&top, result, result->next)) { // CAS return result->data; } } } 
这样,即使线程 B 在线程 A 试图弹出数据的同时修改了堆栈顶,也可以确保不会跳过堆栈中的素。
无锁队列的链表实现

下面的东西主要来自John D. Valois 1994年10月在拉斯维加斯的并行和分布系统系统国际大会上的一篇论文——《Implementing Lock-Free Queues》。

我们先来看一下进队列用CAS实现的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
EnQueue(x)  //进队列
{
     //准备新加入的结点数据
     q =  new  record();
     q->value = x;
     q->next = NULL;
 
     do  {
         p = tail;  //取链表尾指针的快照
     while ( CAS(p->next, NULL, q) != TRUE);  //如果没有把结点链在尾指针上,再试
 
     CAS(tail, p, q);  //置尾结点
}

我们可以看到,程序中的那个 do- while 的 Re-Try-Loop。就是说,很有可能我在准备在队列尾加入结点时,别的线程已经加成功了,于是tail指针就变了,于是我的CAS返回了false,于是程序再试,直到试成功为止。这个很像我们的抢电话热线的不停重播的情况。

你会看到,为什么我们的“置尾结点”的操作(第12行)不判断是否成功,因为:

  1. 如果有一个线程T1,它的while中的CAS如果成功的话,那么其它所有的 随后线程的CAS都会失败,然后就会再循环,
  2. 此时,如果T1 线程还没有更新tail指针,其它的线程继续失败,因为tail->next不是NULL了。
  3. 直到T1线程更新完tail指针,于是其它的线程中的某个线程就可以得到新的tail指针,继续往下走了。

这里有一个潜在的问题——如果T1线程在用CAS更新tail指针的之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
EnQueue(x)  //进队列改良版
{
     q =  new  record();
     q->value = x;
     q->next = NULL;
 
     p = tail;
     oldp = p
     do  {
         while  (p->next != NULL)
             p = p->next;
     while ( CAS(p.next, NULL, q) != TRUE);  //如果没有把结点链在尾上,再试
 
     CAS(tail, oldp, q);  //置尾结点
}

我们让每个线程,自己fetch 指针 p 到链表尾。但是这样的fetch会很影响性能。而通实际情况看下来,99.9%的情况不会有线程停转的情况,所以,更好的做法是,你可以接合上述的这两个版本,如果retry的次数超了一个值的话(比如说3次),那么,就自己fetch指针。

好了,我们解决了EnQueue,我们再来看看DeQueue的代码:(很简单,我就不解释了)

1
2
3
4
5
6
7
8
9
10
DeQueue()  //出队列
{
     do {
         p = head;
         if  (p->next == NULL){
             return  ERR_EMPTY_QUEUE;
         }
     while ( CAS(head, p, p->next) != TRUE );
     return  p->next->value;
}

我们可以看到,DeQueue的代码操作的是 head->next,而不是head本身。这样考虑是因为一个边界条件,我们需要一个dummy的头指针来解决链表中如果只有一个素,head和tail都指向同一个结点的问题,这样EnQueue和DeQueue要互相排斥了

注:上图的tail正处于更新之前的装态。


参考地址:
http://coolshell.cn/articles/8239.html
http://blog.csdn.net/chen/article/details/

今天的文章 CAS无锁操作分享到此就结束了,感谢您的阅读。
编程小号
上一篇 2025-01-02 15:17
下一篇 2025-01-02 15:11

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/99620.html