摘要

本文介绍原子操作,包括乱序执行、SC=DRF、Memory Model等

乱序执行

同一线程中, 没有依赖关系的指令会被乱序执行

1
2
3
4
5
x = 1;
y = 2; //乱序执行

x = 1;
y = x + z; //顺序执行

原因1: 编译器优化。 编译器优化假设程序是单线程的。
原因2: 处理器乱序执行。 乱序流水线,提高处理器的效率。
原因3:存储系统, 处理器写指令要先写store buffer, 多核利用一致性协议来保障写一致性
原因4: On-chip Network片上网络, 多核数据传输

带来的问题

1
2
3
4
5
6
7
//线程1
x = 1;
if (y == 0) {
//进入临界期
} else {
//等待并重试
}
1
2
3
4
5
6
7
//线程2
y = 1;
if (x == 0) {
//进入临界区
} else {
//等待并重试
}

实际上并不能保证两个线程只有一个线程进入临界区
实际上编译器会做这样的处理:

1
2
3
4
5
6
7
register1 = (y == 0);
x = 1;
if (register1) {
//进入临界区
} else {
//等待并重试
}

另一方面, stor buffer也会导致乱序
x = 1y = 1会先写入store buffer,还没有同步到l2 cache中,导致y == 0判断时会从l2 cache中读

解决方法

通过锁或者原子操作进行同步操作

  1. 当临界区包含多条指令,用锁
  2. 当临界区只包含一个整数或指针时, 用原子操作

SC for Data-Race-Free Programs SC-DRF

如果程序没有竞争, 那么程序运行起来就好像是顺序一致性
C++11标准承诺SC-DRF

例子1

情况一:全局变量a, 类型是struct {char c; char d;}
情况二: 全局变量a,类型是struct {int c; int d:7;}

1
2
3
4
//线程1
s.c = 1;
//线程2
s.d = 2;

C++访问不同变量的时候没有竞争。 所有第一种情况是两个不同的变量,没有竞争。
但是第二种情况,这里c和d是位字段, 即int c:9表示的是c是占用了9个bit位的int类型变量,所以上面的情况可以理解为,c,d共用了16位,占用了半个int,所以会发生竞争。

例子2

conditional lock
安全的, 需要编译器优化正确处理

1
2
3
4
5
6
7
8
9
10
11
if (cond) {
mutex1.lock();
}
...
if (cond) {
x = 1;
}
···
if (cond) {
mutex1.unlock();
}

Memory Model

  1. memory_order_relaxed:
    用于load或store, 操作是原子的,但是没有顺序的保证,能够乱序执行
  2. memory_order_release:
    用于store,表示release语义.
    保证当前线程中,所有之前的内存读写操作都不会在该存储操作之后执行。
  3. memory_order_acquire:
    用于load,表示acquire语义
    保证在当前线程中,所有后续的内存读写操作都不会在该加载操作之前执行。
  4. memory_order_acq_rel:
    用于load或store, 对于store,表示release语义,对于load,表示acquire语义
  5. memory_order_seq_cst:
    用于load或者store,表示sc语义,Sequentially Consistent
    最强的内存序,提供全局顺序一致性
    适用于严格同步的场景
  6. memory_order_consume:
    用于load,表示consume语义,尽量不要使用

memory_order_relaxed

用于load或store,操作是原子的,没有顺序的保障,能够乱序执行。
不会在不同线程中引入同步或者顺序的约束,所以比其他的性能更好

count

1
2
3
4
5
6
7
//线程1...N
while(...) {
...
if (...) {
scount.fetch_add(1, memory_order_relaxed);
}
}
1
2
3
4
5
6
7
8
//主线程
int main() {
std::atomic<int> counter(0);
launch_workers();
...
join_workers();
std::cout<< cout.load(memory_order_relaxed) << std::endl;
}

memory_order_acquire

用于load,是一种同步机制,确保在当前线程中对原子对象的加载操作(load操作) 在其他内存操作(load or store)之前完成

确保:

  1. 当前线程中,所有后续的内存读写操作都不会在该原子操作之前执行。 (阻止编译器或CPU的乱序)
  2. 如果其他线程使用了 memory_order_release 或更强的内存序(如 memory_order_seq_cst)来存储同一个原子对象,那么在当前线程中使用 memory_order_acquire 进行加载操作时,当前线程可以看到之前所有对共享内存的修改。

simple flag setting

1
2
3
4
5
6
while(!stop.load(memory_order_relaxed)) {
...
if (...) {
dirty.store(true, memory_order_relaxed);
}
}
1
2
3
4
5
6
7
8
9
10
11
int main() {
std::atomic<bool> dirty(false);
std::atomic<bool> stop;
launch_workers();
...
stop = true;
join_workers();
if(dirty.load(memory_order_acquire)) {
clean_up_dirty_stuff();
}
}

dirty.load(memory_order_acquire)防止clean_up执行这之前
stop = true, 默认是memory_order_seq_cst,全局顺序一致,保证…部分的代码一定在此之前完成

memory_order_acq_rel

适用于load和store, 保证当前线程中,所有之前的内存读写操作在存储操作之前执行,所有后续的内存操作在加载操作之后执行。
即对于store来说,读写要在这之前,即release语义
对于load来说,读写要在这之后,即acquire语义

Reference Counting

1
2
control_block_ptr = other->control_block_ptr;
control_block_ptr->refs.fecth_add(1, memory_order_relaxed);
1
2
3
4
5
some code to use the shared_ptr before refcnt decrease; //A
if (control_block_ptr->refs.fetch_sub(1, memory_order_acq_rel) == 1) { //B
some code to use the shared_ptr befor deletion;
delete shared_ptr;
}

A处只能是memory_order_acq_rel,假设此处是relaxed,那么A和B就可以乱序
假设此时有两个线程进行析构,并且发生乱序执行,B将引用计数从2减到1,然后可能会有分支条件是引用计数到0 再去销毁shared_ptr
此时线程是被阻塞的
此时如果有另一个线程,先执行A,再执行B,此时就把引用计数减1变成0了,然后把shared_ptr销毁了
如果进程1再继续执行,程序就会挂掉
其实就是说明一件事,读写必须要在store之前,要有release语义,不然计数器为0了再去读写就出问题了
acquire语义也很好理解,delete语义显然不能提前到fetch_sub之前执行。
简单来说,fetch_sub是既读又写,所以需要acquire和release

Singleton常见错误

1
2
3
4
5
6
7
8
9
10
MyClass *p = nullptr;
MyClass * get_instance() {
if (p == nullptr) {
mutex1.lock();
if (p == nullptr) {
p = new MyClass();
}
}
mutex1.unlock();
}

这里p = new MyClass();实际上不是原子的,视为 p = malloc(sizeof(MyClass)); new(p) MyClass();
这样,多线程环境下,线程1在malloc的时候,线程2发现p!=nullptr,然后直接返回p导致出错

Singleton正确写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//弱一致性
atomic<MyClass *> p = nullptr;
MyClass* get_instance() {
auto tmp = p.load(memoery_order_acquire);
if (tmp == nullptr) {
mutex1.lock();
tmp = p.load(memory_order_relaxed);
if(tmp == nullptr) {
tmp = new MyClass();
p.store(tmp, memory_order_release);
}
mutex1.unlock();
}
return tmp;
}

实际上就是通过memory model来控制乱序

1
2
3
4
5
6
7
8
9
10
11
12
//强一致性
atomic<MyClass *> p = nullptr;
MyClass* get_instance() {
if (p == nullptr) {
mutex1.lock();
if (p == nullptr) {
p = new MyClass();
}
mutex1.unlock();
}
return p;
}

默认是Sequentially Consistent,强一致性
更好的写法: static保证线程安全, 但是C++11之后才能保证

1
2
3
4
5
6
class MyClass {...};

MyClass& get_instance() {
static MyClass instance(...);
return instance;
}