Golang的并发安全 在有多个goroutines
同时访问并且至少有一个goroutines
在修改数据的情况下就会存在并发问题。Golang处理并发安全有锁和channel
两种方案,前者通过加锁方式保证同一时刻只有一个操作在访问数据,后者是将操作串行化来来实现同一时刻只能有一个操作访问数据。这两种方法都是在通过约束并发访问/修改数据来解决并发安全问题。
在Golang
官网有一段关于并发安全的建议:
Advice
Programs that modify data being simultaneously accessed by multiple goroutines must serialize such access. To serialize access, protect the data with channel operations or other synchronization primitives such as those in the sync and sync/atomic packages. If you must read the rest of this document to understand the behavior of your program, you are being too clever. Don’t be clever.
下面我将从实现一个并发安全的队列来介绍Golang
里面的并发工具。
并发安全的队列 一个简单的队列最少有Add
和Pop
两个操作,队列内部一般通过列表或者链表来存放数据。这两个操作都是对底层的列表或者链表的写操作。底层的列表和链表并不是并发安全的,在多个goroutines
在同时Add
或Pop
时就会有并发问题。
要实现一个并发安全的队列,就要在Add
和Pop
操作加锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type Queue struct { elements []interface {} lock *sync.Mutex } func (q *Queue) Pop () (ele interface {}) { q.lock.Lock() defer q.lock.Unlock() ele = q.elements[q.Size()-1 ] q.elements = q.elements[:q.Size()-1 ] return } func (q *Queue) Add (ele interface {}) { q.lock.Lock() defer q.lock.Unlock() q.elements = append (q.elements, ele) }
考虑到现实的情况,我们队列的容量不能是没有限制的,这会有内存方面的问题,我们要限制队列的容量。队列空的时候Pop
时我们要阻塞直到有值,队列满时我们Add
要阻塞到队列不满,这种情况就需要sync.Cond
来阻塞。它和Java中的内置条件队列
类似,可以使当前goroutine
在某个状态上一直等待,直到这个状态被激活。
Java
Golang
wait
Wait
notify
Signal
notifyAll
Broadcast
我们需要两个sync.Cond
条件来分别表示队列为空和队列满两种状态,这两个sync.Cond
内部要使用同一把锁用在操作Add
和Pop
来避免并发问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 type Queue struct { elements []interface {} capacity int notEmptyCond *sync.Cond notFullCond *sync.Cond } func (q *Queue) Size () int { return len (q.elements) } func (q *Queue) isFull () bool { return q.Size() >= q.capacity } func (q *Queue) isEmpty () bool { return q.Size() == 0 } func (q *Queue) Pop () (ele interface {}) { q.notEmptyCond.L.Lock() defer q.notEmptyCond.L.Unlock() for q.isEmpty() { q.notEmptyCond.Wait() } ele = q.elements[q.Size()-1 ] q.elements = q.elements[:q.Size()-1 ] q.notFullCond.Signal() return } func (q *Queue) Add (ele interface {}) (err error) { q.notEmptyCond.L.Lock() defer q.notEmptyCond.L.Unlock() for q.isFull() { q.notFullCond.Wait() } q.elements = append (q.elements, ele) q.notEmptyCond.Signal() return } func NewQueue (capacity int ) *Queue { var lock sync.Mutex notEmptyCond := sync.NewCond(&lock) notFullCond := sync.NewCond(&lock) return &Queue{ elements: make ([]interface {}, 0 , capacity), capacity: capacity, notEmptyCond: notEmptyCond, notFullCond: notFullCond, } }
Cond.Wait()
在没有收到条件满足信号时会一直阻塞,有时会出现父goroutine
异常退出时子goroutine
还在等待的情况。比如下面的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func TestQueue_Add (t *testing.T) { queue := NewQueue(10 ) expectedNumGoroutine := runtime.NumGoroutine() done := make (chan interface {}, 1 ) go func () { ele, _ := queue.Pop() done <- ele }() select { case <-time.After(100 * time.Millisecond): assert.Equal(t, expectedNumGoroutine+1 , runtime.NumGoroutine()) case <-done: assert.Equal(t, expectedNumGoroutine, runtime.NumGoroutine()) } queue.Add(1 ) time.Sleep(time.Millisecond) assert.Equal(t, 1 , <-done) assert.Equal(t, 0 , queue.Size()) }
要解决上面问题,我们就需要使用来Context
来取消子goroutine
。下面是完整的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 import ( "context" "sync" "github.com/pkg/errors" ) func waitWithCancel (ctx context.Context, cond *sync.Cond) error { if ctx.Done() != nil { done := make (chan struct {}) go func () { cond.Wait() close (done) }() select { case <-ctx.Done(): return errors.Wrap(ctx.Err(), "cancel wait" ) case <-done: return nil } } else { cond.Wait() return nil } } type Queue struct { elements []interface {} capacity int notEmptyCond *sync.Cond notFullCond *sync.Cond } func (q *Queue) Size () int { return len (q.elements) } func (q *Queue) isFull () bool { return q.Size() >= q.capacity } func (q *Queue) isEmpty () bool { return q.Size() == 0 } func (q *Queue) Pop (ctx context.Context) (ele interface {}, err error) { q.notEmptyCond.L.Lock() defer func () { if originalErr := errors.Cause(err); originalErr != context.DeadlineExceeded && originalErr != context.Canceled { q.notEmptyCond.L.Unlock() } }() for q.isEmpty() { err = waitWithCancel(ctx, q.notEmptyCond) if err != nil { return } } ele = q.elements[q.Size()-1 ] q.elements = q.elements[:q.Size()-1 ] q.notFullCond.Signal() return } func (q *Queue) Add (ctx context.Context, ele interface {}) (err error) { q.notEmptyCond.L.Lock() defer func () { if originalErr := errors.Cause(err); originalErr != context.DeadlineExceeded && originalErr != context.Canceled { q.notEmptyCond.L.Unlock() } }() for q.isFull() { err = waitWithCancel(ctx, q.notFullCond) if err != nil { return } } q.elements = append (q.elements, ele) q.notEmptyCond.Signal() return } func NewQueue (capacity int ) *Queue { var lock sync.Mutex notEmptyCond := sync.NewCond(&lock) notFullCond := sync.NewCond(&lock) return &Queue{ elements: make ([]interface {}, 0 , capacity), capacity: capacity, notEmptyCond: notEmptyCond, notFullCond: notFullCond, } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func TestQueue_Add (t *testing.T) { queue := NewQueue(10 ) ctx, cancel := context.WithTimeout(context.Background(), 100 * time.Millisecond) defer cancel() go func () { _, err := queue.Pop(ctx) assert.Equal(t, context.DeadlineExceeded, errors.Cause(err)) }() time.Sleep(200 * time.Millisecond) queue.Add(context.Background(), 1 ) assert.Equal(t, 1 , queue.Size()) }
参考