rust语言基础学习: 并发中的原子操作和rus标准库中的原子类型

rust语言基础学习: 并发中的原子操作和rus标准库中的原子类型

2020-07-29
Rust

原子类型和原子操作 #

原子操作(atomic operation)是指不可分割且不可中断的一个或一系列操作,在并发编程中需要由CPU层面做出一些保证,让一系列操作成为原子操作。 一个原子操作从开始到结束可以是一个操作步骤,也可以包含多个操作步骤,这些步骤的顺序不可以被打乱,执行过程也不会被其他机制打断。

很多编程语言都对原子操作提供支持,例如Java的java.util.concurrent.atomic提供了很多原子类型,Go语言的sync/atomic包提供了对原子操作的支持。

我们从Go语言的sync/atomic包提供的函数来看一下原子类型一般会提供哪些原子操作:

 1func AddInt32(addr *int32, delta int32) (new int32)
 2func AddInt64(addr *int64, delta int64) (new int64)
 3func AddUint32(addr *uint32, delta uint32) (new uint32)
 4func AddUint64(addr *uint64, delta uint64) (new uint64)
 5func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
 6func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
 7func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
 8func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
 9func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
10func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
11func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
12func LoadInt32(addr *int32) (val int32)
13func LoadInt64(addr *int64) (val int64)
14func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
15func LoadUint32(addr *uint32) (val uint32)
16func LoadUint64(addr *uint64) (val uint64)
17func LoadUintptr(addr *uintptr) (val uintptr)
18func StoreInt32(addr *int32, val int32)
19func StoreInt64(addr *int64, val int64)
20func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
21func StoreUint32(addr *uint32, val uint32)
22func StoreUint64(addr *uint64, val uint64)
23func StoreUintptr(addr *uintptr, val uintptr)
24func SwapInt32(addr *int32, new int32) (old int32)
25func SwapInt64(addr *int64, new int64) (old int64)
26func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
27func SwapUint32(addr *uint32, new uint32) (old uint32)
28func SwapUint64(addr *uint64, new uint64) (old uint64)
29func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
30...

上面是Go的sync/atomic包提供的函数,可以归纳为以下5类操作:

  • Add - 对原子类型进行加(或减)运算
  • CAS(CompareAndSwap) - 比较,如果相等则进行交换
  • Load - 从原子类型内部读取值
  • Store - 向原子类型内部写入值
  • Swap - 交换

原子操作作为一个并发原语是实现其他并发原语的基础。在Rust中也是一样的,今天我们来学习Rust是如何通过其在标准库中内置的原子类型支持这些操作的。

Rust中的原子类型 #

Rust中的原子类型位于std::sync::atomic module中。

这个module的文档中对原子类型有如下描述: Rust中的原子类型在线程之间提供原始的共享内存通信,并且是其他并发类型的构建基础。

std::sync::atomic module目前共提供了以下12种原子类型:

  • AtomicBool
  • AtomicI8
  • AtomicI16
  • AtomicI32
  • AtomicI64
  • AtomicIsize
  • AtomicPtr
  • AtomicU8
  • AtomicU16
  • AtomicU32
  • AtomicU64
  • AtomicUsize

原子类型与普通的类型基本上没有太多的区别,例如AtomicBool和bool,只是一个可以在多线程中使用,另一个则更适用于单线程下使用。

AtomicI32为例,它的定义是一个结构体,有以下原子操作相关的方法:

  • pub fn fetch_add(&self, val: i32, order: Ordering) -> i32 - 对原子类型进行加(或减)运算
  • pub fn compare_and_swap(&self, current: i32, new: i32, order: Ordering) -> i32 - CAS(rust 1.50废弃, 由compare_exchange替代)
  • pub fn compare_exchange(&self, current: i32, new: i32, success: Ordering, failure: Ordering) -> Result<i32, i32> - CAS
  • pub fn load(&self, order: Ordering) -> i32 - 从原子类型内部读取值
  • pub fn store(&self, val: i32, order: Ordering) - 向原子类型内部写入值
  • pub fn swap(&self, val: i32, order: Ordering) -> i32 - 交换

可以看到每个方法都有一个Ordering类型的参数,Ordering是一个枚举,表示该操作的内存屏障的强度。

1pub enum Ordering {
2    Relaxed,
3    Release,
4    Acquire,
5    AcqRel,
6    SeqCst,
7}

通过这个Ordering枚举类型的参数,开发者可以自己定制底层的Memory Ordering。

什么是Memory Ordering, 摘录维基百科中的定义:

**Memory Ordering(内存排序)**是指CPU访问主存时的顺序。可以是编译器在编译时产生,也可以是CPU在运行时产生。反映了内存操作重排序,乱序执行,从而充分利用不同内存的总线带宽。现代处理器大都是乱序执行。因此需要内存屏障以确保多线程的同步。

关于对Memory Ordering的理解,有两个线程都要操作AtomicI32类型,假设AtomicI32类型数据初始值是0,一个线程执行读操作,另一个线程执行写操作要将数据写为10。假设写操作执行完成后,读线程再执行读操作就一定能读到数据10吗? 答案是不确定的,由于不同编译器的实现和CPU的优化策略,可能会出现虽然写线程执行完写操作了,但最新的数据还存在CPU的寄存器中,还没有同步到内存中。为了确保寄存器到内存中的数据同步,就需要Memory Ordering了。 Release可以理解为将寄存器的值同步到内存,Acquire是忽略当前寄存器中存的值,而直接去内存中读取最新的值。 例如当我们调用原子类型的store方法时提供的Ordering是release,在调用原子类型的load方法时提供的Ordering是Acquire就可以保证执行读操作的线程一定会读到寄存器里最新的值。

有了前面对Memory Ordering的理解,再来看一下Rust中Ordering这个枚举的枚举值分别代表什么:

  • Relaxed - 没有内存顺序约束,只有原子操作。Relaxed访问是最宽松的原则。它对编译器和CPU不做任何限制。可以被随意重排,可以乱序执行。
  • Release - Release和Acquire是在不同的线程间对同一个原子类型对象的进行store和load操作时配合使用。当一个线程使用store+Release, 对于当前线程,任何读取或写入操作都不能被乱序排在这个store之后。 而对于其他线程使用load+Acquire时, 它们看到的将是修改后的结果。
  • Acquire - 当一个线程使用load+Acquire读取数据时,对于当前线程,任何读取或写入操作都不能被乱序排在这个读取之前。而对于其他线程使用了store+release来修改数据时,修改的值对当前这个线程是可见的。
  • AcqRel - 同时具有Acquire和Release的效果。对于load,它使用的是Acquire命令。对于store,它使用的是Release命令。AcqRel一般用在fetch_add上。
  • SeqCst - 类似Acquire/Release/AcqRel(分别用于加载、存储和带存储的加载操作),并额外保证所有线程以相同的顺序看到所有顺序一致的操作,是最严格的Ordering。

自旋锁的例子 #

因为原子类型都实现了Sync trait,所以原子类型的变量在线程之间共享是安全的,但因为它们本身没有提供共享机制,因此比较常见的用法是将其放在原子引用计数智能指针Arc中。 下面是官方文档中一个简单的自旋锁的例子:

例1:

 1use std::sync::atomic::{AtomicUsize, Ordering};
 2use std::sync::Arc;
 3use std::thread;
 4
 5fn main() {
 6    let spinlock = Arc::new(AtomicUsize::new(1));
 7
 8    let spinlock_clone = spinlock.clone();
 9    let thread = thread::spawn(move || {
10        spinlock_clone.store(0, Ordering::SeqCst);
11    });
12
13    // Wait for the other thread to release the lock
14    while spinlock.load(Ordering::SeqCst) != 0 {}
15
16    if let Err(panic) = thread.join() {
17        println!("Thread had an error: {:?}", panic);
18    }
19}

注意,自旋锁是指当一个线程尝试去获取某一把锁的时候,如果这个锁此时已经被其他线程获取,那么此线程就无法获取到这把锁,该线程将会等待,间隔一段时间后会再次尝试获取。 自旋锁实际上是通过CPU空转(spin)忙等待(budy wait),例如上面代码中的while循环,来等待某个临界区可用的一种锁。

使用自旋锁可以的减少线程的阻塞,适用于对锁的竞争不激烈,且占用锁时间非常短的场景。 但是如果锁的竞争激烈,或者持有锁的线程需要长时间占用锁,受保护的临界区过大,线程自旋的就消耗大于线程阻塞挂起操作的消耗,自旋操作会一直占用CPU做无用功,就会造成CPU浪费,其他需要CPU的线程反而不能获得CPU,系统性能会急剧下降。

例1中是是自旋锁功能的实现,并且使用的内存排序是Ordering::SeqCst,下面我们尝试实现一个自选锁,并以一个struct封装。

例2:

 1use std::sync::{
 2    atomic::{AtomicBool, Ordering},
 3    Arc,
 4};
 5use std::thread;
 6use std::time::Duration;
 7
 8struct SpinLock {
 9    lock: AtomicBool,
10}
11
12impl SpinLock {
13    pub fn new() -> Self {
14        Self {
15            lock: AtomicBool::new(false),
16        }
17    }
18
19    pub fn lock(&self) {
20        while self
21            .lock
22            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
23            .is_err()
24        // 尝试加锁, 如果加锁失败则一直自旋
25        {
26            // CAS的消耗比较大, 当加锁失败时, 通过简单load读取锁的状态, 只要读取到锁被释放时才会再去尝试CAS加锁
27            while self.lock.load(Ordering::Relaxed) {}
28        }
29    }
30
31    pub fn unlock(&self) {
32        // 解锁
33        self.lock.store(false, Ordering::Release);
34    }
35}
36
37fn main() {
38    let spinlock = Arc::new(SpinLock::new());
39
40    let spinlock1 = spinlock.clone();
41
42    let thread = thread::spawn(move || {
43        spinlock1.lock();
44        thread::sleep(Duration::from_millis(100));
45        println!("do something1!");
46        spinlock1.unlock();
47    });
48    thread.join().unwrap();
49
50    spinlock.lock();
51    println!("do something2!");
52    spinlock.unlock();
53}

上面我们实现的自旋锁,本质就是一个原子类型AtomicBool,它的初始值为false。

当执行lock方法进行加锁操作时,我们利用了原子操作CAS的特性,如果compare_exchange失败,则尝试加锁的线程会卡在这个while循环中自旋。 这里有一个性能上的小优化,因为执行CAS消耗代价比较大,所以在CAS失败时,再不断通过简单load读取锁的状态, 只有读取到锁被释放时才会再去尝试CAS加锁。这样效率更好一些。

当执行unlock方法时,直接将AtomicBool设置(store)为false,采用的Memory Ordering是Release,会将寄存器中的值与内存中的值同步,内存中就为false。 此时,如果有线程卡在lock方法while循环处自旋,CAS操作(compare_exchange)采用的Memory Ordering是Acquire,将会忽略其自己当前寄存器中的值,从内存中读取到新的值为false,CAS将执行成功,也就是加锁成功。

参考 #

© 2024 青蛙小白