rust语言基础学习: 并发中的原子操作和rus标准库中的原子类型
2020-07-29
原子类型和原子操作 #
原子操作(atomic operation)是指不可分割且不可中断的一个或一系列操作,在并发编程中需要由CPU层面做出一些保证,让一系列操作成为原子操作。 一个原子操作从开始到结束可以是一个操作步骤,也可以包含多个操作步骤,这些步骤的顺序不可以被打乱,执行过程也不会被其他机制打断。
很多编程语言都对原子操作提供支持,例如Java的java.util.concurrent.atomic
提供了很多原子类型,Go语言的sync/atomic
包提供了对原子操作的支持。
我们从Go语言的sync/atomic
包提供的函数来看一下原子类型一般会提供哪些原子操作:
1func AddInt32(addr *int32, delta int32) (new int32)
2func AddInt64(addr *int64, delta int64) (new int64)
3func AddUint32(addr *uint32, delta uint32) (new uint32)
4func AddUint64(addr *uint64, delta uint64) (new uint64)
5func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
6func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
7func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
8func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
9func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
10func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
11func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
12func LoadInt32(addr *int32) (val int32)
13func LoadInt64(addr *int64) (val int64)
14func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
15func LoadUint32(addr *uint32) (val uint32)
16func LoadUint64(addr *uint64) (val uint64)
17func LoadUintptr(addr *uintptr) (val uintptr)
18func StoreInt32(addr *int32, val int32)
19func StoreInt64(addr *int64, val int64)
20func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
21func StoreUint32(addr *uint32, val uint32)
22func StoreUint64(addr *uint64, val uint64)
23func StoreUintptr(addr *uintptr, val uintptr)
24func SwapInt32(addr *int32, new int32) (old int32)
25func SwapInt64(addr *int64, new int64) (old int64)
26func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
27func SwapUint32(addr *uint32, new uint32) (old uint32)
28func SwapUint64(addr *uint64, new uint64) (old uint64)
29func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
30...
上面是Go的sync/atomic
包提供的函数,可以归纳为以下5类操作:
Add
- 对原子类型进行加(或减)运算CAS
(CompareAndSwap
) - 比较,如果相等则进行交换Load
- 从原子类型内部读取值Store
- 向原子类型内部写入值Swap
- 交换
原子操作作为一个并发原语是实现其他并发原语的基础。在Rust中也是一样的,今天我们来学习Rust是如何通过其在标准库中内置的原子类型支持这些操作的。
Rust中的原子类型 #
Rust中的原子类型位于std::sync::atomic
module中。
这个module的文档中对原子类型有如下描述: Rust中的原子类型在线程之间提供原始的共享内存通信,并且是其他并发类型的构建基础。
std::sync::atomic
module目前共提供了以下12种原子类型:
- AtomicBool
- AtomicI8
- AtomicI16
- AtomicI32
- AtomicI64
- AtomicIsize
- AtomicPtr
- AtomicU8
- AtomicU16
- AtomicU32
- AtomicU64
- AtomicUsize
原子类型与普通的类型基本上没有太多的区别,例如AtomicBool和bool,只是一个可以在多线程中使用,另一个则更适用于单线程下使用。
以AtomicI32
为例,它的定义是一个结构体,有以下原子操作相关的方法:
pub fn fetch_add(&self, val: i32, order: Ordering) -> i32
- 对原子类型进行加(或减)运算pub fn compare_and_swap(&self, current: i32, new: i32, order: Ordering) -> i32
- CAS(rust 1.50废弃, 由compare_exchange替代)pub fn compare_exchange(&self, current: i32, new: i32, success: Ordering, failure: Ordering) -> Result<i32, i32>
- CASpub fn load(&self, order: Ordering) -> i32
- 从原子类型内部读取值pub fn store(&self, val: i32, order: Ordering)
- 向原子类型内部写入值pub fn swap(&self, val: i32, order: Ordering) -> i32
- 交换
可以看到每个方法都有一个Ordering
类型的参数,Ordering是一个枚举,表示该操作的内存屏障的强度。
1pub enum Ordering {
2 Relaxed,
3 Release,
4 Acquire,
5 AcqRel,
6 SeqCst,
7}
通过这个Ordering
枚举类型的参数,开发者可以自己定制底层的Memory Ordering。
什么是Memory Ordering, 摘录维基百科中的定义:
**Memory Ordering(内存排序)**是指CPU访问主存时的顺序。可以是编译器在编译时产生,也可以是CPU在运行时产生。反映了内存操作重排序,乱序执行,从而充分利用不同内存的总线带宽。现代处理器大都是乱序执行。因此需要内存屏障以确保多线程的同步。
关于对Memory Ordering的理解,有两个线程都要操作AtomicI32类型,假设AtomicI32类型数据初始值是0,一个线程执行读操作,另一个线程执行写操作要将数据写为10。假设写操作执行完成后,读线程再执行读操作就一定能读到数据10吗? 答案是不确定的,由于不同编译器的实现和CPU的优化策略,可能会出现虽然写线程执行完写操作了,但最新的数据还存在CPU的寄存器中,还没有同步到内存中。为了确保寄存器到内存中的数据同步,就需要Memory Ordering了。 Release可以理解为将寄存器的值同步到内存,Acquire是忽略当前寄存器中存的值,而直接去内存中读取最新的值。 例如当我们调用原子类型的store方法时提供的Ordering是release,在调用原子类型的load方法时提供的Ordering是Acquire就可以保证执行读操作的线程一定会读到寄存器里最新的值。
有了前面对Memory Ordering的理解,再来看一下Rust中Ordering这个枚举的枚举值分别代表什么:
- Relaxed - 没有内存顺序约束,只有原子操作。Relaxed访问是最宽松的原则。它对编译器和CPU不做任何限制。可以被随意重排,可以乱序执行。
- Release - Release和Acquire是在不同的线程间对同一个原子类型对象的进行store和load操作时配合使用。当一个线程使用store+Release, 对于当前线程,任何读取或写入操作都不能被乱序排在这个store之后。 而对于其他线程使用load+Acquire时, 它们看到的将是修改后的结果。
- Acquire - 当一个线程使用load+Acquire读取数据时,对于当前线程,任何读取或写入操作都不能被乱序排在这个读取之前。而对于其他线程使用了store+release来修改数据时,修改的值对当前这个线程是可见的。
- AcqRel - 同时具有Acquire和Release的效果。对于load,它使用的是Acquire命令。对于store,它使用的是Release命令。AcqRel一般用在
fetch_add
上。 - SeqCst - 类似Acquire/Release/AcqRel(分别用于加载、存储和带存储的加载操作),并额外保证所有线程以相同的顺序看到所有顺序一致的操作,是最严格的Ordering。
自旋锁的例子 #
因为原子类型都实现了Sync trait,所以原子类型的变量在线程之间共享是安全的,但因为它们本身没有提供共享机制,因此比较常见的用法是将其放在原子引用计数智能指针Arc中。 下面是官方文档中一个简单的自旋锁的例子:
例1:
1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::thread;
4
5fn main() {
6 let spinlock = Arc::new(AtomicUsize::new(1));
7
8 let spinlock_clone = spinlock.clone();
9 let thread = thread::spawn(move || {
10 spinlock_clone.store(0, Ordering::SeqCst);
11 });
12
13 // Wait for the other thread to release the lock
14 while spinlock.load(Ordering::SeqCst) != 0 {}
15
16 if let Err(panic) = thread.join() {
17 println!("Thread had an error: {:?}", panic);
18 }
19}
注意,自旋锁是指当一个线程尝试去获取某一把锁的时候,如果这个锁此时已经被其他线程获取,那么此线程就无法获取到这把锁,该线程将会等待,间隔一段时间后会再次尝试获取。 自旋锁实际上是通过CPU空转(spin)忙等待(budy wait),例如上面代码中的while循环,来等待某个临界区可用的一种锁。
使用自旋锁可以的减少线程的阻塞,适用于对锁的竞争不激烈,且占用锁时间非常短的场景。 但是如果锁的竞争激烈,或者持有锁的线程需要长时间占用锁,受保护的临界区过大,线程自旋的就消耗大于线程阻塞挂起操作的消耗,自旋操作会一直占用CPU做无用功,就会造成CPU浪费,其他需要CPU的线程反而不能获得CPU,系统性能会急剧下降。
例1中是是自旋锁功能的实现,并且使用的内存排序是Ordering::SeqCst
,下面我们尝试实现一个自选锁,并以一个struct封装。
例2:
1use std::sync::{
2 atomic::{AtomicBool, Ordering},
3 Arc,
4};
5use std::thread;
6use std::time::Duration;
7
8struct SpinLock {
9 lock: AtomicBool,
10}
11
12impl SpinLock {
13 pub fn new() -> Self {
14 Self {
15 lock: AtomicBool::new(false),
16 }
17 }
18
19 pub fn lock(&self) {
20 while self
21 .lock
22 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
23 .is_err()
24 // 尝试加锁, 如果加锁失败则一直自旋
25 {
26 // CAS的消耗比较大, 当加锁失败时, 通过简单load读取锁的状态, 只要读取到锁被释放时才会再去尝试CAS加锁
27 while self.lock.load(Ordering::Relaxed) {}
28 }
29 }
30
31 pub fn unlock(&self) {
32 // 解锁
33 self.lock.store(false, Ordering::Release);
34 }
35}
36
37fn main() {
38 let spinlock = Arc::new(SpinLock::new());
39
40 let spinlock1 = spinlock.clone();
41
42 let thread = thread::spawn(move || {
43 spinlock1.lock();
44 thread::sleep(Duration::from_millis(100));
45 println!("do something1!");
46 spinlock1.unlock();
47 });
48 thread.join().unwrap();
49
50 spinlock.lock();
51 println!("do something2!");
52 spinlock.unlock();
53}
上面我们实现的自旋锁,本质就是一个原子类型AtomicBool,它的初始值为false。
当执行lock方法进行加锁操作时,我们利用了原子操作CAS的特性,如果compare_exchange
失败,则尝试加锁的线程会卡在这个while循环中自旋。
这里有一个性能上的小优化,因为执行CAS消耗代价比较大,所以在CAS失败时,再不断通过简单load读取锁的状态, 只有读取到锁被释放时才会再去尝试CAS加锁。这样效率更好一些。
当执行unlock方法时,直接将AtomicBool设置(store)为false,采用的Memory Ordering是Release,会将寄存器中的值与内存中的值同步,内存中就为false。 此时,如果有线程卡在lock方法while循环处自旋,CAS操作(compare_exchange)采用的Memory Ordering是Acquire,将会忽略其自己当前寄存器中的值,从内存中读取到新的值为false,CAS将执行成功,也就是加锁成功。