目录

Go并发之原子操作

概述

atomic提供了用于实现同步算法的低级原子内存原语。这些功能需要非常小心才能正确使用。除了特殊的低级应用程序外,最好使用通道或sync包的工具来完成同步。通过通信共享内存;不要通过共享内存进行通信。

加操作

AddT函数实现的加法操作是原子等价的,其相当于如下操作:

1
2
*addr += delta
return *addr
示例:使用加操作实现同步
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
  "fmt"
  "runtime"
  "sync"
  "sync/atomic"
)

var (
  counter  int32
  wg sync.WaitGroup
)

func main() {
  wg.Add(2) // 设置需要等待的 goroutine 的数量为 2

  go addCounter("Gerald")
  go addCounter("Seligman")

  wg.Wait()
  fmt.Println("Final Counter is: ", counter)
}

func addCounter(whoAmI string) {
  defer wg.Done()

  for count := 0; count < 2;  count++ {
    atomic.AddInt32(&counter, 1)
    runtime.Gosched() // 让出CPU时间片,让调度器安排别的协程执行,并在下次某个时候,从这个地方恢复执行
  }
}

这个例子就是通过sync.AddInt32counter这个变量实现了同步。

交换操作

SwapT函数实现的交换操作与下面是原子等效的:

1
2
3
old = *addr
*addr = new
return old
示例:使用SwapPointer
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
	"log"
	"sync/atomic"
	"time"
	"unsafe"
)

var (
	data    *int
	dataPtr = (*unsafe.Pointer)(unsafe.Pointer(&data))
)

func changeData(target int) {
	new_dataPtr := unsafe.Pointer(&target)
	atomic.SwapPointer(dataPtr, new_dataPtr)
}

func main() {
	for i := 0; i < 100000; i++ {
		go changeData(i)
	}
	time.Sleep(time.Second * 2)
	log.Println(*(*int)(atomic.LoadPointer(dataPtr)))
}

我们可以通过执行go run -race main.go来判断是否发生了data race。当然,我们上面这个例子通过使用了原子操作从而避免data race的发生的。但是可以发现,整个交换的过程是十分的繁琐的。

atomic-opeartions.png
SwapPointer执行过程

交换且比较操作

CompareAndSwapT函数实现的比较和交换操作与下面代码是原子等效的:

1
2
3
4
5
if *addr == old {
  *addr = new
  return true
}
return false

这个操作,我们常称之为CAS操作,会先比较传入的地址的值是否为old,如果是的话,就尝试赋新值,如果不是的话,返回False,若赋值成功,则返回true。包含如下函数:

1
2
3
4
5
6
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

源码分析

sync/atomic包中的源码除了Value之外其他的函数都是没有直接的源码的,需要去runtime/internal/atomic中找寻,这里为CAS函数为例,其他的都是大同小异,我们这里以amd64的源码为例进行分析:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
//	if(*val == old){
//		*val = new;
//		return 1;
//	} else
//		return 0;
TEXT ·Cas(SB),NOSPLIT,$0-17
	MOVQ	ptr+0(FP), BX
	MOVL	old+8(FP), AX
	MOVL	new+12(FP), CX
	LOCK
	CMPXCHGL	CX, 0(BX)
	SETEQ	ret+16(FP)
	RET

我们看这个具体汇编代码就可以发现,使用了LOCK指令来保证操作的原子性。

示例

示例:使用CAS实现一个lock-free栈
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
	"sync/atomic"
	"unsafe"
)

// LFStack 无锁栈
// 使用链表实现
type LFStack struct {
	head unsafe.Pointer // 栈顶
}

// Node 节点
type Node struct {
	val  int32
	next unsafe.Pointer
}

// NewLFStack NewLFStack
func NewLFStack() *LFStack {
	n := unsafe.Pointer(&Node{})
	return &LFStack{head: n}
}

// Push 入栈
func (s *LFStack) Push(v int32) {
	n := &Node{val: v}

	for {
		// 先取出栈顶
		old := atomic.LoadPointer(&s.head)
		n.next = old
		if atomic.CompareAndSwapPointer(&s.head, old, unsafe.Pointer(n)) {
			return
		}
	}
}

// Pop 出栈,没有数据时返回 nil
func (s *LFStack) Pop() int32 {
	for {
		// 先取出栈顶
		old := atomic.LoadPointer(&s.head)
		if old == nil {
			return 0
		}

		oldNode := (*Node)(old)
		// 取出下一个节点
		next := atomic.LoadPointer(&oldNode.next)
		// 重置栈顶
		if atomic.CompareAndSwapPointer(&s.head, old, next) {
			return oldNode.val
		}
	}
}

值得注意的是,lock-free的算法的性能不一定比加锁的性能要更好,但是lock-free的算法有一个比加锁更好的点,在于lock-free算法不会导致死锁

性能对比

lock设计的代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package test

import "sync"

type LockStack struct {
	head *LockNode
	mu   sync.Mutex
}

type LockNode struct {
	val  int32
	next *LockNode
}

func NewLockStack() *LockStack {
	return &LockStack{}
}

// Push 入栈
func (s *LockStack) Push(v int32) {
	n := &LockNode{val: v}
	s.mu.Lock()
	if s.head == nil{
		s.head = n
	}else{
		n.next = s.head
		s.head = n
	}
	s.mu.Unlock()
}

// Pop 出栈
func (s *LockStack)Pop() int32{
	s.mu.Lock()
	defer s.mu.Unlock()
	if s.head == nil{ // 栈为空
		return 0
	}else{
		tmp := s.head
		s.head = s.head.next
		return tmp.val
	}
}
benchmark代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package test

import "testing"

func BenchmarkStack(b *testing.B) {
	b.RunParallel(func(p *testing.PB) {
		s := NewLFStack()
		for p.Next() {
			s.Push(1)
			s.Pop()
		}
	})
}

func BenchmarkLockStack(b *testing.B) {
	b.RunParallel(func(p *testing.PB) {
		s := NewLockStack()
		for p.Next() {
			s.Push(1)
			s.Pop()
		}
	})
}

func BenchmarkSingleThreadStack(b *testing.B) {
	b.Run("lock-free stack", func(b *testing.B) {
		b.ResetTimer()
		s := NewLFStack()
		s.Push(1)
		s.Pop()
	})
	b.Run("lock stack", func(b *testing.B) {
		b.ResetTimer()
		s := NewLockStack()
		s.Push(1)
		s.Pop()
	})
}
结果

运行测试结果如下:

1
2
3
4
5
6
7
8
goos: linux
goarch: amd64
pkg: test
cpu: Intel(R) Xeon(R) Silver 4314 CPU @ 2.40GHz
BenchmarkStack-64                	41927746	        32.49 ns/op	      16 B/op	       1 allocs/op
BenchmarkLockStack-64            	36145209	        31.83 ns/op	      16 B/op	       1 allocs/op
BenchmarkSingleThreadStack/lock-free_stack-64         	1000000000	         0.0000051 ns/op	       0 B/op	       0 allocs/op
BenchmarkSingleThreadStack/lock_stack-64              	1000000000	         0.0000051 ns/op	       0 B/op	       0 allocs/op

可以看到,lock-free算法下的性能,并没有比直接是用lock的来的好。

参考文献

  1. Go 101
  2. Go并发编程(五) 深入理解 sync/atomic
  3. do-lock-free-algorithms-really-perform-better-than-their-lock-full-counterparts