CAS原理:
CAS原语有三个参数,内存地址,期望值,新值。如果内存地址的值==期望值,表示该值未修改,此时可以修改成新值。否则表示修改失败,返回false,由用户决定后续操作。
缺点:
使用CAS会造成ABA问题
int compare_and_swap(int* reg, int oldval, int newval) { ATOMIC(); int old_reg_val = *reg; if (old_reg_val == oldval) *reg = newval; END_ATOMIC(); return old_reg_val; }
bool compare_and_swap(int *accum, int *dest, int newval) { if (*accum == *dest) { *dest = newval; return true; } else { *accum = *dest; return false; } }
在实际的C/C++程序中,CAS的各种实现版本如下:
1)GCC的CAS
GCC4.1+版本中支持CAS的原子操作(完整的原子操作可参看 GCC Atomic Builtins)
1
2
|
bool
__sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
|
2)Windows的CAS
在Windows下,你可以使用下面的Windows API来完成CAS:(完整的Windows原子操作可参看MSDN的InterLocked Functions)
1
2
3
|
InterlockedCompareExchange ( __inout
LONG
volatile
*Target,
__in
LONG
Exchange,
__in
LONG
Comperand);
|
3) C++11中的CAS
C++11中的STL中的atomic类的函数可以让你跨平台。(完整的C++11的原子操作可参看 Atomic Operation Library)
1
2
3
4
5
6
|
template
<
class
T >
bool
atomic_compare_exchange_weak( std::atomic* obj,
T* expected, T desired );
template
<
class
T >
bool
atomic_compare_exchange_weak(
volatile
std::atomic* obj,
T* expected, T desired );
|
其声明如下:
type __sync_fetch_and_sub (type * ptr, type value, ...)
type __sync_fetch_and_or (type * ptr, type value, ...)
type __sync_fetch_and_and (type * ptr, type value, ...)
type __sync_fetch_and_xor (type * ptr, type value, ...)
type __sync_fetch_and_nand (type * ptr, type value, ...)
type __sync_add_and_fetch (type * ptr, type value, ...)
type __sync_sub_and_fetch (type * ptr, type value, ...)
type __sync_or_and_fetch (type * ptr, type value, ...)
type __sync_and_and_fetch (type * ptr, type value, ...)
type __sync_xor_and_fetch (type * ptr, type value, ...)
type __sync_nand_and_fetch (type * ptr, type value, ...)
这两组函数的区别在于第一组返回更新前的值,第二组返回更新后的值。
#include <iostream> #include <cstdio> #include <vector> #include <pthread.h> #include <unistd.h> using namespace std; static volatile int cnt=0; int mutex = 0, lock = 0, unlock=1; void* addelem(void* arg); bool compare_and_swap(int *currvalue, int oldvalue, int newvalue); bool compare_and_swap(int *currvalue, int oldvalue, int newvalue) { if(*currvalue == oldvalue) { *currvalue = newvalue; return true; } return false; } int main(int argc, char *argv[]) { vector<int> vec; pthread_t pid[5]; for (int i = 0; i < 5; ++i) { pthread_create(&pid[i], NULL, addelem, NULL); } for (int i = 0; i < 5; ++i) { pthread_join(pid[i], NULL); } return 0; } void* addelem(void* arg) { int i =0; while(i++ < 100) { while(!compare_and_swap(&mutex,lock,1)) { usleep(); } ++cnt; cout << cnt << endl; compare_and_swap(&mutex,unlock,0); } return NULL; }
#include <iostream> #include <cstdio> #include <vector> #include <pthread.h> #include <unistd.h> using namespace std; #define Lock(lock) do{ \ while(!(__sync_bool_compare_and_swap(lock, 0, 1))) \ { \ sched_yield(); \ } \ }while(0); #define UnLock(lock) do{ \ *(lock) = 0; \ }while(0); #define TryLock(lock) { \ if(__sync_bool_compare_and_swap(lock, 0, 1)?0:-1); \ } static volatile int cnt=0; int lock = 0; void* addelem(void* arg); int main(int argc, char *argv[]) { vector<int> vec; pthread_t pid[5]; for (int i = 0; i < 5; ++i) { pthread_create(&pid[i], NULL, addelem, NULL); } for (int i = 0; i < 5; ++i) { pthread_join(pid[i], NULL); } return 0; } void* addelem(void* arg) { int i =0; while(i++ < 100) { Lock(&lock); ++cnt; cout << cnt << endl; UnLock(&lock); } return NULL; }
注:sched_yield() 是出让cpu相当于sleep()函数
设计无锁并发栈
既然了解了 CAS,现在就来设计一个并发堆栈。这个堆栈没有锁;这种无锁的并发数据结构也称为非阻塞数据结构。清单 12 给出代码接口。
template <typename T> class Stack { typedef struct Node { T data; Node* next; Node(const T& d) : data(d), next(0) { } } Node; Node *top; public: Stack( ) : top(0) { } void push(const T& data); T pop( ) throw (…); }; |
void Stack<T>::push(const T& data) { Node *n = new Node(data); while (1) { n->next = top; if (__sync_bool_compare_and_swap(&top, n->next, n)) { // CAS break; } } } |
从单一线程的角度来看,创建了一个新节点,它的 next 指针指向堆栈的顶部。接下来,调用 CAS 内置函数,把新的节点复制到 top 位置。 从多个线程的角度来看,完全可能有两个或更多线程同时试图把数据压入堆栈。假设线程 A 试图把 20 压入堆栈,线程 B 试图压入 30,而线程 A 先获得了时间片。但是,在 n->next = top
指令结束之后,调度程序暂停了线程 A。现在,线程 B 获得了时间片(它很幸运),它能够完成 CAS,把 30 压入堆栈后结束。接下来,线程 A 恢复执行,显然对于这个线程 *top
和 n->next
不匹配,因为线程 B 修改了 top 位置的内容。因此,代码回到循环的开头,指向正确的 top 指针(线程 B 修改后的),调用 CAS,把 20 压入堆栈后结束。整个过程没有使用任何锁。
T Stack<T>::pop( ) { while (1) { Node* result = top; if (result == NULL) throw std::string(“Cannot pop from empty stack”); if (top && __sync_bool_compare_and_swap(&top, result, result->next)) { // CAS return result->data; } } } |
无锁队列的链表实现
下面的东西主要来自John D. Valois 1994年10月在拉斯维加斯的并行和分布系统系统国际大会上的一篇论文——《Implementing Lock-Free Queues》。
我们先来看一下进队列用CAS实现的方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
EnQueue(x)
//进队列
{
//准备新加入的结点数据
q =
new
record();
q->value = x;
q->next = NULL;
do
{
p = tail;
//取链表尾指针的快照
}
while
( CAS(p->next, NULL, q) != TRUE);
//如果没有把结点链在尾指针上,再试
CAS(tail, p, q);
//置尾结点
}
|
我们可以看到,程序中的那个 do- while 的 Re-Try-Loop。就是说,很有可能我在准备在队列尾加入结点时,别的线程已经加成功了,于是tail指针就变了,于是我的CAS返回了false,于是程序再试,直到试成功为止。这个很像我们的抢电话热线的不停重播的情况。
你会看到,为什么我们的“置尾结点”的操作(第12行)不判断是否成功,因为:
- 如果有一个线程T1,它的while中的CAS如果成功的话,那么其它所有的 随后线程的CAS都会失败,然后就会再循环,
- 此时,如果T1 线程还没有更新tail指针,其它的线程继续失败,因为tail->next不是NULL了。
- 直到T1线程更新完tail指针,于是其它的线程中的某个线程就可以得到新的tail指针,继续往下走了。
这里有一个潜在的问题——如果T1线程在用CAS更新tail指针的之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
EnQueue(x)
//进队列改良版
{
q =
new
record();
q->value = x;
q->next = NULL;
p = tail;
oldp = p
do
{
while
(p->next != NULL)
p = p->next;
}
while
( CAS(p.next, NULL, q) != TRUE);
//如果没有把结点链在尾上,再试
CAS(tail, oldp, q);
//置尾结点
}
|
我们让每个线程,自己fetch 指针 p 到链表尾。但是这样的fetch会很影响性能。而通实际情况看下来,99.9%的情况不会有线程停转的情况,所以,更好的做法是,你可以接合上述的这两个版本,如果retry的次数超了一个值的话(比如说3次),那么,就自己fetch指针。
好了,我们解决了EnQueue,我们再来看看DeQueue的代码:(很简单,我就不解释了)
1
2
3
4
5
6
7
8
9
10
|
DeQueue()
//出队列
{
do
{
p = head;
if
(p->next == NULL){
return
ERR_EMPTY_QUEUE;
}
while
( CAS(head, p, p->next) != TRUE );
return
p->next->value;
}
|
我们可以看到,DeQueue的代码操作的是 head->next,而不是head本身。这样考虑是因为一个边界条件,我们需要一个dummy的头指针来解决链表中如果只有一个素,head和tail都指向同一个结点的问题,这样EnQueue和DeQueue要互相排斥了。
注:上图的tail正处于更新之前的装态。
参考地址:
今天的文章 CAS无锁操作分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/99620.html