原子类型和原子操作

原子操作(atomic operation)是指不可分割且不可中断的一个或一系列操作,在并发编程中需要由CPU层面做出一些保证,让一系列操作成为原子操作。 一个原子操作从开始到结束可以是一个操作步骤,也可以包含多个操作步骤,这些步骤的顺序不可以被打乱,执行过程也不会被其他机制打断。

很多编程语言都对原子操作提供支持,例如Java的java.util.concurrent.atomic提供了很多原子类型,Go语言的sync/atomic包提供了对原子操作的支持。

我们从Go语言的sync/atomic包提供的函数来看一下原子类型一般会提供哪些原子操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
...

上面是Go的sync/atomic包提供的函数,可以归纳为以下5类操作:

  • Add - 对原子类型进行加(或减)运算
  • CAS(CompareAndSwap) - 比较,如果相等则进行交换
  • Load - 从原子类型内部读取值
  • Store - 向原子类型内部写入值
  • Swap - 交换

原子操作作为一个并发原语是实现其他并发原语的基础。在Rust中也是一样的,今天我们来学习Rust是如何通过其在标准库中内置的原子类型支持这些操作的。

Rust中的原子类型

Rust中的原子类型位于std::sync::atomic module中。

这个module的文档中对原子类型有如下描述: Rust中的原子类型在线程之间提供原始的共享内存通信,并且是其他并发类型的构建基础。

std::sync::atomic module目前共提供了以下12种原子类型:

  • AtomicBool
  • AtomicI8
  • AtomicI16
  • AtomicI32
  • AtomicI64
  • AtomicIsize
  • AtomicPtr
  • AtomicU8
  • AtomicU16
  • AtomicU32
  • AtomicU64
  • AtomicUsize

原子类型与普通的类型基本上没有太多的区别,例如AtomicBool和bool,只是一个可以在多线程中使用,另一个则更适用于单线程下使用。

AtomicI32为例,它的定义是一个结构体,有以下原子操作相关的方法:

  • pub fn fetch_add(&self, val: i32, order: Ordering) -> i32 - 对原子类型进行加(或减)运算
  • pub fn compare_and_swap(&self, current: i32, new: i32, order: Ordering) -> i32 - CAS(rust 1.50废弃, 由compare_exchange替代)
  • pub fn compare_exchange(&self, current: i32, new: i32, success: Ordering, failure: Ordering) -> Result<i32, i32> - CAS
  • pub fn load(&self, order: Ordering) -> i32 - 从原子类型内部读取值
  • pub fn store(&self, val: i32, order: Ordering) - 向原子类型内部写入值
  • pub fn swap(&self, val: i32, order: Ordering) -> i32 - 交换

可以看到每个方法都有一个Ordering类型的参数,Ordering是一个枚举,表示该操作的内存屏障的强度。

1
2
3
4
5
6
7
pub enum Ordering {
    Relaxed,
    Release,
    Acquire,
    AcqRel,
    SeqCst,
}

通过这个Ordering枚举类型的参数,开发者可以自己定制底层的Memory Ordering。

什么是Memory Ordering, 摘录维基百科中的定义:

**Memory Ordering(内存排序)**是指CPU访问主存时的顺序。可以是编译器在编译时产生,也可以是CPU在运行时产生。反映了内存操作重排序,乱序执行,从而充分利用不同内存的总线带宽。现代处理器大都是乱序执行。因此需要内存屏障以确保多线程的同步。

关于对Memory Ordering的理解,有两个线程都要操作AtomicI32类型,假设AtomicI32类型数据初始值是0,一个线程执行读操作,另一个线程执行写操作要将数据写为10。假设写操作执行完成后,读线程再执行读操作就一定能读到数据10吗? 答案是不确定的,由于不同编译器的实现和CPU的优化策略,可能会出现虽然写线程执行完写操作了,但最新的数据还存在CPU的寄存器中,还没有同步到内存中。为了确保寄存器到内存中的数据同步,就需要Memory Ordering了。 Release可以理解为将寄存器的值同步到内存,Acquire是忽略当前寄存器中存的值,而直接去内存中读取最新的值。 例如当我们调用原子类型的store方法时提供的Ordering是release,在调用原子类型的load方法时提供的Ordering是Acquire就可以保证执行读操作的线程一定会读到寄存器里最新的值。

有了前面对Memory Ordering的理解,再来看一下Rust中Ordering这个枚举的枚举值分别代表什么:

  • Relaxed - 没有内存顺序约束,只有原子操作。Relaxed访问是最宽松的原则。它对编译器和CPU不做任何限制。可以被随意重排,可以乱序执行。
  • Release - Release和Acquire是在不同的线程间对同一个原子类型对象的进行store和load操作时配合使用。当一个线程使用store+Release, 对于当前线程,任何读取或写入操作都不能被乱序排在这个store之后。 而对于其他线程使用load+Acquire时, 它们看到的将是修改后的结果。
  • Acquire - 当一个线程使用load+Acquire读取数据时,对于当前线程,任何读取或写入操作都不能被乱序排在这个读取之前。而对于其他线程使用了store+release来修改数据时,修改的值对当前这个线程是可见的。
  • AcqRel - 同时具有Acquire和Release的效果。对于load,它使用的是Acquire命令。对于store,它使用的是Release命令。AcqRel一般用在fetch_add上。
  • SeqCst - 类似Acquire/Release/AcqRel(分别用于加载、存储和带存储的加载操作),并额外保证所有线程以相同的顺序看到所有顺序一致的操作,是最严格的Ordering。

自旋锁的例子

因为原子类型都实现了Sync trait,所以原子类型的变量在线程之间共享是安全的,但因为它们本身没有提供共享机制,因此比较常见的用法是将其放在原子引用计数智能指针Arc中。 下面是官方文档中一个简单的自旋锁的例子:

例1:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    let spinlock = Arc::new(AtomicUsize::new(1));

    let spinlock_clone = spinlock.clone();
    let thread = thread::spawn(move || {
        spinlock_clone.store(0, Ordering::SeqCst);
    });

    // Wait for the other thread to release the lock
    while spinlock.load(Ordering::SeqCst) != 0 {}

    if let Err(panic) = thread.join() {
        println!("Thread had an error: {:?}", panic);
    }
}

注意,自旋锁是指当一个线程尝试去获取某一把锁的时候,如果这个锁此时已经被其他线程获取,那么此线程就无法获取到这把锁,该线程将会等待,间隔一段时间后会再次尝试获取。 自旋锁实际上是通过CPU空转(spin)忙等待(budy wait),例如上面代码中的while循环,来等待某个临界区可用的一种锁。

使用自旋锁可以的减少线程的阻塞,适用于对锁的竞争不激烈,且占用锁时间非常短的场景。 但是如果锁的竞争激烈,或者持有锁的线程需要长时间占用锁,受保护的临界区过大,线程自旋的就消耗大于线程阻塞挂起操作的消耗,自旋操作会一直占用CPU做无用功,就会造成CPU浪费,其他需要CPU的线程反而不能获得CPU,系统性能会急剧下降。

例1中是是自旋锁功能的实现,并且使用的内存排序是Ordering::SeqCst,下面我们尝试实现一个自选锁,并以一个struct封装。

例2:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};
use std::thread;
use std::time::Duration;

struct SpinLock {
    lock: AtomicBool,
}

impl SpinLock {
    pub fn new() -> Self {
        Self {
            lock: AtomicBool::new(false),
        }
    }

    pub fn lock(&self) {
        while self
            .lock
            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
            .is_err()
        // 尝试加锁, 如果加锁失败则一直自旋
        {
            // CAS的消耗比较大, 当加锁失败时, 通过简单load读取锁的状态, 只要读取到锁被释放时才会再去尝试CAS加锁
            while self.lock.load(Ordering::Relaxed) {}
        }
    }

    pub fn unlock(&self) {
        // 解锁
        self.lock.store(false, Ordering::Release);
    }
}

fn main() {
    let spinlock = Arc::new(SpinLock::new());

    let spinlock1 = spinlock.clone();

    let thread = thread::spawn(move || {
        spinlock1.lock();
        thread::sleep(Duration::from_millis(100));
        println!("do something1!");
        spinlock1.unlock();
    });
    thread.join().unwrap();

    spinlock.lock();
    println!("do something2!");
    spinlock.unlock();
}

上面我们实现的自旋锁,本质就是一个原子类型AtomicBool,它的初始值为false。

当执行lock方法进行加锁操作时,我们利用了原子操作CAS的特性,如果compare_exchange失败,则尝试加锁的线程会卡在这个while循环中自旋。 这里有一个性能上的小优化,因为执行CAS消耗代价比较大,所以在CAS失败时,再不断通过简单load读取锁的状态, 只有读取到锁被释放时才会再去尝试CAS加锁。这样效率更好一些。

当执行unlock方法时,直接将AtomicBool设置(store)为false,采用的Memory Ordering是Release,会将寄存器中的值与内存中的值同步,内存中就为false。 此时,如果有线程卡在lock方法while循环处自旋,CAS操作(compare_exchange)采用的Memory Ordering是Acquire,将会忽略其自己当前寄存器中的值,从内存中读取到新的值为false,CAS将执行成功,也就是加锁成功。

参考