概述
包atomic
提供了用于实现同步算法的低级原子内存原语。这些功能需要非常小心才能正确使用。除了特殊的低级应用程序外,最好使用通道或sync
包的工具来完成同步。通过通信共享内存;不要通过共享内存进行通信。
加操作
由 AddT
函数实现的加法操作是原子等价的,其相当于如下操作:
1
2
|
*addr += delta
return *addr
|
示例:使用加操作实现同步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
var (
counter int32
wg sync.WaitGroup
)
func main() {
wg.Add(2) // 设置需要等待的 goroutine 的数量为 2
go addCounter("Gerald")
go addCounter("Seligman")
wg.Wait()
fmt.Println("Final Counter is: ", counter)
}
func addCounter(whoAmI string) {
defer wg.Done()
for count := 0; count < 2; count++ {
atomic.AddInt32(&counter, 1)
runtime.Gosched() // 让出CPU时间片,让调度器安排别的协程执行,并在下次某个时候,从这个地方恢复执行
}
}
|
这个例子就是通过sync.AddInt32
对counter
这个变量实现了同步。
交换操作
由SwapT
函数实现的交换操作与下面是原子等效的:
1
2
3
|
old = *addr
*addr = new
return old
|
示例:使用SwapPointer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
package main
import (
"log"
"sync/atomic"
"time"
"unsafe"
)
var (
data *int
dataPtr = (*unsafe.Pointer)(unsafe.Pointer(&data))
)
func changeData(target int) {
new_dataPtr := unsafe.Pointer(&target)
atomic.SwapPointer(dataPtr, new_dataPtr)
}
func main() {
for i := 0; i < 100000; i++ {
go changeData(i)
}
time.Sleep(time.Second * 2)
log.Println(*(*int)(atomic.LoadPointer(dataPtr)))
}
|
我们可以通过执行go run -race main.go
来判断是否发生了data race。当然,我们上面这个例子通过使用了原子操作从而避免data race的发生的。但是可以发现,整个交换的过程是十分的繁琐的。
交换且比较操作
由CompareAndSwapT
函数实现的比较和交换操作与下面代码是原子等效的:
1
2
3
4
5
|
if *addr == old {
*addr = new
return true
}
return false
|
这个操作,我们常称之为CAS操作,会先比较传入的地址的值是否为old
,如果是的话,就尝试赋新值,如果不是的话,返回False
,若赋值成功,则返回true
。包含如下函数:
1
2
3
4
5
6
|
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
|
源码分析
在sync/atomic
包中的源码除了Value
之外其他的函数都是没有直接的源码的,需要去runtime/internal/atomic
中找寻,这里为CAS
函数为例,其他的都是大同小异,我们这里以amd64的源码为例进行分析:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
// if(*val == old){
// *val = new;
// return 1;
// } else
// return 0;
TEXT ·Cas(SB),NOSPLIT,$0-17
MOVQ ptr+0(FP), BX
MOVL old+8(FP), AX
MOVL new+12(FP), CX
LOCK
CMPXCHGL CX, 0(BX)
SETEQ ret+16(FP)
RET
|
我们看这个具体汇编代码就可以发现,使用了LOCK
指令来保证操作的原子性。
示例
示例:使用CAS实现一个lock-free栈
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package main
import (
"sync/atomic"
"unsafe"
)
// LFStack 无锁栈
// 使用链表实现
type LFStack struct {
head unsafe.Pointer // 栈顶
}
// Node 节点
type Node struct {
val int32
next unsafe.Pointer
}
// NewLFStack NewLFStack
func NewLFStack() *LFStack {
n := unsafe.Pointer(&Node{})
return &LFStack{head: n}
}
// Push 入栈
func (s *LFStack) Push(v int32) {
n := &Node{val: v}
for {
// 先取出栈顶
old := atomic.LoadPointer(&s.head)
n.next = old
if atomic.CompareAndSwapPointer(&s.head, old, unsafe.Pointer(n)) {
return
}
}
}
// Pop 出栈,没有数据时返回 nil
func (s *LFStack) Pop() int32 {
for {
// 先取出栈顶
old := atomic.LoadPointer(&s.head)
if old == nil {
return 0
}
oldNode := (*Node)(old)
// 取出下一个节点
next := atomic.LoadPointer(&oldNode.next)
// 重置栈顶
if atomic.CompareAndSwapPointer(&s.head, old, next) {
return oldNode.val
}
}
}
|
值得注意的是,lock-free的算法的性能不一定比加锁的性能要更好,但是lock-free的算法有一个比加锁更好的点,在于lock-free算法不会导致死锁。
性能对比
lock设计的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package test
import "sync"
type LockStack struct {
head *LockNode
mu sync.Mutex
}
type LockNode struct {
val int32
next *LockNode
}
func NewLockStack() *LockStack {
return &LockStack{}
}
// Push 入栈
func (s *LockStack) Push(v int32) {
n := &LockNode{val: v}
s.mu.Lock()
if s.head == nil{
s.head = n
}else{
n.next = s.head
s.head = n
}
s.mu.Unlock()
}
// Pop 出栈
func (s *LockStack)Pop() int32{
s.mu.Lock()
defer s.mu.Unlock()
if s.head == nil{ // 栈为空
return 0
}else{
tmp := s.head
s.head = s.head.next
return tmp.val
}
}
|
benchmark代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package test
import "testing"
func BenchmarkStack(b *testing.B) {
b.RunParallel(func(p *testing.PB) {
s := NewLFStack()
for p.Next() {
s.Push(1)
s.Pop()
}
})
}
func BenchmarkLockStack(b *testing.B) {
b.RunParallel(func(p *testing.PB) {
s := NewLockStack()
for p.Next() {
s.Push(1)
s.Pop()
}
})
}
func BenchmarkSingleThreadStack(b *testing.B) {
b.Run("lock-free stack", func(b *testing.B) {
b.ResetTimer()
s := NewLFStack()
s.Push(1)
s.Pop()
})
b.Run("lock stack", func(b *testing.B) {
b.ResetTimer()
s := NewLockStack()
s.Push(1)
s.Pop()
})
}
|
结果
运行测试结果如下:
1
2
3
4
5
6
7
8
|
goos: linux
goarch: amd64
pkg: test
cpu: Intel(R) Xeon(R) Silver 4314 CPU @ 2.40GHz
BenchmarkStack-64 41927746 32.49 ns/op 16 B/op 1 allocs/op
BenchmarkLockStack-64 36145209 31.83 ns/op 16 B/op 1 allocs/op
BenchmarkSingleThreadStack/lock-free_stack-64 1000000000 0.0000051 ns/op 0 B/op 0 allocs/op
BenchmarkSingleThreadStack/lock_stack-64 1000000000 0.0000051 ns/op 0 B/op 0 allocs/op
|
可以看到,lock-free算法下的性能,并没有比直接是用lock的来的好。
参考文献
- Go 101
- Go并发编程(五) 深入理解 sync/atomic
- do-lock-free-algorithms-really-perform-better-than-their-lock-full-counterparts