01| Mutex介绍

实现机制

临界区: 一个被共享的资源,或者一个整体的一组共享资源

使用互斥锁,限定临界区只能同时由一个线程持有

同步原语:可以看做解决并发问题的一个基础数据结构

  • 使用场景
    • 共享资源,Mutex、RwMutex
    • 任务边盖, WaitGroup、Channel
    • 消息传递, Channel

基本使用

接口Locker

1
2
3
4
type Locker interface {
    Lock()
    Unlock()
}

Mutex实现了Locker的方法

Mutex可以嵌入到其他struct中使用,初始化的时候不必初始化Mutex

1
2
3
4
type Counter struct {
    mu sync.Mutex
    Count uint64
}

可以将操作锁和业务逻辑封装成一个方法

1
2
3
4
5
func (c *Counter) Incr() {
    c.mu.Lock()
    c.Counter++
    c.mu.Unlock
}

02| Mutex实现原理

第一版: 初版

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

   // CAS操作,当时还没有抽象出atomic包
    func cas(val *int32, old, new int32) bool
    func semacquire(*int32)
    func semrelease(*int32)
    // 互斥锁的结构,包含两个字段
    type Mutex struct {
        key  int32 // 锁是否被持有的标识, 0:未被持有, 1:被持有,n:被持有且有n-1个等待者
        sema int32 // 信号量专用,用以阻塞/唤醒goroutine
    }
    
    // 保证成功在val上增加delta的值
    func xadd(val *int32, delta int32) (new int32) {
        for {
            v := *val
            if cas(val, v, v+delta) {
                return v + delta
            }
        }
        panic("unreached")
    }
    
    // 请求锁
    func (m *Mutex) Lock() {
        if xadd(&m.key, 1) == 1 { //标识加1,如果等于1,成功获取到锁
            return
        }
        semacquire(&m.sema) // 否则阻塞等待
    }
    
    func (m *Mutex) Unlock() {
        if xadd(&m.key, -1) == 0 { // 将标识减去1,如果等于0,则没有其它等待者
            return
        }
        semrelease(&m.sema) // 唤醒其它阻塞的goroutine
    }    

第二版(2011-06-30) 给新人机会

主要改动是新来的goroutine也有机会获得锁,甚至一个goroutine可以连续获得锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type Mutex struct {
    state int32
    sema  uint32
}


const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexWaiterShift = iota
)

state是一个复合字段,最低位表示是否被持有,第二低位表示是否有唤醒的goroutine,剩余的位表示等待此锁的goroutine数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

   func (m *Mutex) Lock() {
        // Fast path: 幸运case,能够直接获取到锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }

        awoke := false
        for {
            old := m.state
            new := old | mutexLocked // 新状态加锁
            if old&mutexLocked != 0 {
                new = old + 1<<mutexWaiterShift //等待者数量加一
            }
            if awoke {
                // goroutine是被唤醒的,
                // 新状态清除唤醒标志
                new &^= mutexWoken
            }
            if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
                if old&mutexLocked == 0 { // 锁原状态未加锁
                    break
                }
                runtime.Semacquire(&m.sema) // 请求信号量
                awoke = true
            }
        }
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21

   func (m *Mutex) Unlock() {
        // Fast path: drop lock bit.
        new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志
        if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁
            panic("sync: unlock of unlocked mutex")
        }
    
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者,或者有唤醒的waiter,或者锁原来已加锁
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken // 新状态,准备唤醒goroutine,并设置唤醒标志
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime.Semrelease(&m.sema)
                return
            }
            old = m.state
        }
    }

第三版(2015-02) 多给些机会

加入自旋等待:新来的gorutine或者被唤醒的goroutine首次获取不到锁是会通过自旋尝试几次检查所是否已经被释放

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

   func (m *Mutex) Lock() {
        // Fast path: 幸运之路,正好获取到锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }

        awoke := false
        iter := 0
        for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
            old := m.state // 先保存当前锁的状态
            new := old | mutexLocked // 新状态设置加锁标志
            if old&mutexLocked != 0 { // 锁还没被释放
                if runtime_canSpin(iter) { // 还可以自旋
                    if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                        awoke = true
                    }
                    runtime_doSpin()
                    iter++
                    continue // 自旋,再次尝试请求锁
                }
                new = old + 1<<mutexWaiterShift
            }
            if awoke { // 唤醒状态
                if new&mutexWoken == 0 {
                    panic("sync: inconsistent mutex state")
                }
                new &^= mutexWoken // 新状态清除唤醒标记
            }
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                if old&mutexLocked == 0 { // 旧状态锁已释放,新状态成功持有了锁,直接返回
                    break
                }
                runtime_Semacquire(&m.sema) // 阻塞等待
                awoke = true // 被唤醒
                iter = 0
            }
        }
    }

第四版 解决饥饿

因为新来的gorotine也参与竞争,可能每次都被新来的gorotine抢到锁,极端情况下等待中的goroutine可能会一直拿不到锁,产生饥饿问题

不公平的等待时间限制在1毫秒

state:

  • mutexLocker, 1bit, 表示是否被持有,
  • mutexWoken, 1bit, 表示是否有唤醒的goroutine,
  • mutexStarving, 1bit, 表示是否有饥饿问题
  • mutexWaiters, 剩余的位, 表示等待此锁的goroutine数

03| Mutex易错场景盘点

1. Lock/Unlock未成对出现

2. Copy已使用的Mutex

因为Mutex是个有状态的对象,state字段记录了锁的状态,如果复制Mutex给一个新的变量,那么新的变量一开始就可能被加锁了

可以使用go vet检测

3. 重入

Mutex不支持重入,因为没有记录归属哪个goroutine

自定义实现方式

  • Lock时记录对应的goroutine id
  • 由goroutine提供一个token,用来表示自己

4. 死锁

四个必要条件

  • 互斥
  • 持有和等待
  • 不可剥夺
  • 环路等待

04| Mutex拓展额外功能

增加TryLock方法

通过unsafe.Pointer(&m.Mutex)拿到state值,判断是否可以获取锁

获取等待者的数量

atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex))) >> mutexWaiterShift

实现线程安全队列

05 | RWMutex: 读写锁的实现原理和避坑指南

读写优先级

  • read-preferring:reader优先获取锁
  • write-prefering:如果有write在等待锁,会阻止后面新来的reader获取到锁
  • 不设定优先级

Golang使用了write-preferring方案

内部结构

1
2
3
4
5
6
7
8
9
type RWMutex struct {
  w           Mutex   // 互斥锁解决多个writer的竞争
  writerSem   uint32  // writer信号量
  readerSem   uint32  // reader信号量
  readerCount int32   // reader的数量, 为负数时表示有writer在等待
  readerWait  int32   // writer需要等待reader的数量
}

const rwmutexMaxReaders = 1 << 30  // 最大reader数量

Rlock/RUnock实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
            // rw.readerCount是负值的时候,意味着此时有writer等待请求锁,因为writer优先级高,所以把后来的reader阻塞休眠
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}
func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        rw.rUnlockSlow(r) // 有等待的writer
    }
}
func (rw *RWMutex) rUnlockSlow(r int32) {
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // 最后一个reader了,writer终于有机会获得锁了
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

RLock上锁:

  • readerCount+1
  • 如果readerCount小于0表示有writer在请求锁,于是reader阻塞休眠

RUnlock解锁:

  • readerCount-1
  • 如果readerCount<0, readerWait-1
  • 如果readerWait ==0, 表示没有要等的reader了,唤醒writer

Lock的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11

func (rw *RWMutex) Lock() {
    // 首先解决其他writer竞争问题
    rw.w.Lock()
    // 反转readerCount,告诉reader有writer竞争锁
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // 如果当前有reader持有锁,那么需要等待
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}
  • 上锁
  • 反转readerCount
  • 如果readerCount != 0 , readWait加上readerCount
  • 如果reaWait !=0, 表示要等待reader

Unlock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

func (rw *RWMutex) Unlock() {
    // 告诉reader没有活跃的writer了
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    
    // 唤醒阻塞的reader们
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // 释放内部的互斥锁
    rw.w.Unlock()
}
  • readerCount 加上 rwmutexMaxReaders
  • 唤醒所有的reader
  • 释放锁

06| WaitGroup: 协同等待,任务编排利器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22

type WaitGroup struct {
    // 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则
    noCopy noCopy
    // 64bit(8bytes)的值分成两段,高32bit是计数值,低32bit是waiter的计数
    // 另外32bit是用作信号量的
    // 因为64bit值的原子操作需要64bit对齐,但是32bit编译器不支持,所以数组中的元素在不同的架构中不一样,具体处理看下面的方法
    // 总之,会找到对齐的那64bit作为state,其余的32bit做信号量
    state1 [3]uint32
}


// 得到state的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        // 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
        return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    } else {
        // 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量
        return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    }
}

64位对齐的state的情况

71b5fad6284140986d04c0b6f87aedea.jpeg

Add和Done实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    // 高32bit是计数值v,所以把delta左移32,增加到计数上
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32) // 当前计数值
    w := uint32(state) // waiter count

    if v > 0 || w == 0 {
        return
    }

    // 如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量
    // 将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}


// Done方法实际就是计数器减1
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

Wait实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32) // 当前计数值
        w := uint32(state) // waiter的数量
        if v == 0 {
            // 如果计数值为0, 调用这个方法的goroutine不必再等待,继续执行它后面的逻辑即可
            return
        }
        // 否则把waiter数量加1。期间可能有并发调用Wait的情况,所以最外层使用了一个for循环
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            // 阻塞休眠等待
            runtime_Semacquire(semap)
            // 被唤醒,不再阻塞,返回
            return
        }
    }
}

noCopy: 辅助vet检查

1
2
3
4
5
6

type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

noCopy类型是用来帮助vet检查用的类型,用来检查对象在第一次使用之后是否被复制使用

07 | Cond: 条件变量实现机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Cond struct {
    noCopy noCopy
    L Locker
    notify notifyList
    checker copyChecker
}

func (c *Cond) Wait() {
    c.chcker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) BroadCast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

08| Once: 简约而不简单的并发原语

实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type Once struct {
    done uint32
    m Mutex
}

func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 {
        o.doSlow(f)
    }
}

func(o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

09 | map: 如何实现线程安全的map类型

map不能并发读写,写的时候需要加锁

方案1:使用RWLock读写锁

方案2: 在方法1基础上,对数据分片,使用多个map

方案3:官方实现

10 | Pool

Get方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

func (p *Pool) Get() interface{} {
    // 把当前goroutine固定在当前的P上
    l, pid := p.pin()
    x := l.private // 优先从local的private字段取,快速
    l.private = nil
    if x == nil {
        // 从当前的local.shared弹出一个,注意是从head读取并移除
        x, _ = l.shared.popHead()
        if x == nil { // 如果没有,则去偷一个
            x = p.getSlow(pid) 
        }
    }
    runtime_procUnpin()
    // 如果没有获取到,尝试使用New函数生成一个新的
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}


func (p *Pool) getSlow(pid int) interface{} {

    size := atomic.LoadUintptr(&p.localSize)
    locals := p.local                       
    // 从其它proc中尝试偷取一个元素
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // 如果其它proc也没有可用元素,那么尝试从vintim中获取
    size = atomic.LoadUintptr(&p.victimSize)
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim
    l := indexLocal(locals, pid)
    if x := l.private; x != nil { // 同样的逻辑,先从vintim中的local private获取
        l.private = nil
        return x
    }
    for i := 0; i < int(size); i++ { // 从vintim其它proc尝试偷取
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // 如果victim中都没有,则把这个victim标记为空,以后的查找可以快速跳过了
    atomic.StoreUintptr(&p.victimSize, 0)

    return nil
}

Put方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (p *Pool) Put(x interface{}) {
    if x == nil { // nil值直接丢弃
        return
    }
    l, _ := p.pin()
    if l.private == nil { // 如果本地private没有值,直接设置这个值即可
        l.private = x
        x = nil
    }
    if x != nil { // 否则加入到本地队列中
        l.shared.pushHead(x)
    }
    runtime_procUnpin()
}

可能的问题

  • 内存泄漏:回收过大的对象,如slice cap变得很大后仍Put回去。 可以不回收大于一定大小的对象
  • 内存浪费:buffer比较大,实际使用只需要较小的buffer。 可以将buffer分层

其他

net/rpc Server端Requst和Reponse使用链表结构保留和复用对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Request struct {
	ServiceMethod string   // format: "Service.Method"
	Seq           uint64   // sequence number chosen by client
	next          *Request // for free list in Server
}

func (server *Server) getRequest() *Request {
	server.reqLock.Lock()
	req := server.freeReq
	if req == nil {
		req = new(Request)
	} else {
		server.freeReq = req.next
		*req = Request{}  // 清空对象而非新建一个对象
	}
	server.reqLock.Unlock()
	return req
}

func (server *Server) freeRequest(req *Request) {
	server.reqLock.Lock()
	req.next = server.freeReq
	server.freeReq = req
	server.reqLock.Unlock()
}

11 | Context: 上下文

使用场景

  • 上下文信息传递
  • 控制子goroutine的运行
  • 超时控制
  • 调用取消

基本实现

interface

1
2
3
4
5
6
type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <- chan struct {}
    Err() error
    Value(key interface{}) interface{}
}

如果Done没有被close, Err返回nil, 否则返回Done被close的原因

使用惯例

  • 一般将Context放在第一个函数参数位置
  • 不把nil当做Context参数的值,使用context.Background()
  • Context只用来临时上下文透传,不能持久化到缓存或者数据库
  • key的类型应该是自己定义的类型,否则容易在包之间使用时产生冲突(具体视情况而定)

基础实例

Background和todo 均是emptyCtx的实例

拓展类型

WithValue
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type valueCtx struct {
    Context
    key, val interface()
}

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}

valueCtx持有一个键值对,Value方法优先比较自己的key, 不匹配则一次调用父ctx的Value方法

WithCancel
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    c := newCancelCtx(parent)
    propagateCancel(parent, &c)// 把c朝上传播
    return &c, func() { c.cancel(true, Canceled) }
}

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
    return cancelCtx{Context: parent}
}

propagateCancel会顺着parent找是否有cancelCtx

  • 如果parent有cancelCtx则加入到cancelCtx的child, cancelCtx取消的时候cancel它
  • 如果没有则启动一个goroutine来监听parent是否close

cancel会向下传递,如果子孙context也是cancelCtx,也会被cancle

WithTimeout、WithDeadline

WithTimeout其实和WithDeadline一样

1
2
3
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}

如果deadline ctx 截止时间晚于parent的,则返回canelCtx,因为parent取消时会同时取消child ctx

否则会返回timerCtx timerCtx的Donw被Close掉主要有某个事件触发

  • 截止时间到了
  • parent ctx 取消
  • 自身的cancel函数被调用
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    // 如果parent的截止时间更早,直接返回一个cancelCtx即可
    if cur, ok := parent.Deadline(); ok && cur.Before(d) {
        return WithCancel(parent)
    }
    c := &timerCtx{
        cancelCtx: newCancelCtx(parent),
        deadline:  d,
    }
    propagateCancel(parent, c) // 同cancelCtx的处理逻辑
    dur := time.Until(d)
    if dur <= 0 { //当前时间已经超过了截止时间,直接cancel
        c.cancel(true, DeadlineExceeded)
        return c, func() { c.cancel(false, Canceled) }
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.err == nil {
        // 设置一个定时器,到截止时间后取消
        c.timer = time.AfterFunc(dur, func() {
            c.cancel(true, DeadlineExceeded)
        })
    }
    return c, func() { c.cancel(true, Canceled) }
}

12| atomic: 原则操作

基本方法

  • Add
  • CompareAndSwap
  • Swap
  • Load
  • Store

Value类型

原子地存取对象

13| Channel: 另辟蹊径解决并发问题

5108954ea36559860e5e5aaa42b2f998.jpeg

基本类型

  • 双向chan: chan int
  • 只能接收:chan <- int
  • 只能发送:<-chan int

实现原理

chan数据结构

  • 循环队列
  • 链表

核心方法

  • send
  • recieve
  • close

场景错误

  • 关闭为nil的chan
  • 关闭已经close的chan
  • send已经close的chan

Channel|14 典型应用

使用反射操作Channel

应用场景

消息交流

数据传递

信号通知

任务编排

orDone模式
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func or(channels ...<-chan interface{}) <-chan interface{} {
	switch len(channels) {
	case 0:
		return nil
	case 1:
		return channels[0]
	}
	orDone := make(chan interface{})
	go func() {
		defer close(orDone)
		var cases []reflect.SelectCase
		for _, c := range channels {
			cases = append(cases, reflect.SelectCase{
				Dir: reflect.SelectRecv,
				Chan: reflect.ValueOf(c),
			})
		}
		reflect.Select(cases)
	}()
	return orDone

}
扇入模式
扇出模式
Stream流
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func asStream(done <-chan struct{}, values ...interface{}) <-chan interface{} {
	s := make(chan interface{})
	go func(){
		defer close(s)
		for _, v := range values {
			select {
			case <-done:
				return
			case s <- v:
			}
		}
	}()
	return s
}
map-reduce模式
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
	out := make(chan interface{})
	if in == nil {
		close(out)
		return out
	}
	go func() {
		defer close(out)
		for v := range in {
			out <- fn(v)
		}
	}()
	return out
}

func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
	if in == nil {
		return nil
	}
	out := <-in
	for v := range in {
		out = fn(out, v)
	}
	return out
}

15| Go如何保证并发读写顺序

happens-before

概念

  • 如果事件e1 happens before 事件e2, 那么可以说e2 happends after e1
  • 如果事件e1 不happens before e2,也不happens after e2,那么e1和e2同时发生

goroutine内部

程序执行顺序和代码指定的顺序一样,即使有CPU或编译器的读写重排,从行为上来看也和代码指定的顺序一样,即内部重排对读写顺序没有影响

goroutine之间

假设对变量v写操作记为w,读操作记为r, 那么要保证r能观察到w的结果,则应该:

  • w happens before r
  • 其他的w’不会和w、r同时发生,也不会再w和r之间发生

实例

init函数

  • 初始化是在单一的goroutine执行的
  • 如果包p导入了包q,那么q的init函数happens before于p的任何初始化代码
  • main函数在导入的包的init函数之后执行
  • 容一个文件中的变量是按顺序逐个初始化的,除非有依赖关系
  • 同一个包下的多个文件是按文件名排序进行初始化的

16 |Semaphore信号量

官方实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
type Weighted struct {
    size    int64         // 最大资源数
    cur     int64         // 当前已被使用的资源
    mu      sync.Mutex    // 互斥锁,对字段的保护
    waiters list.List     // 等待队列
}


func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()
        // fast path, 如果有足够的资源,都不考虑ctx.Done的状态,将cur加上n就返回
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
      s.cur += n
      s.mu.Unlock()
      return nil
    }
  
        // 如果是不可能完成的任务,请求的资源数大于能提供的最大的资源数
    if n > s.size {
      s.mu.Unlock()
            // 依赖ctx的状态返回,否则一直等待
      <-ctx.Done()
      return ctx.Err()
    }
  
        // 否则就需要把调用者加入到等待队列中
        // 创建了一个ready chan,以便被通知唤醒
    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()
  

        // 等待
    select {
    case <-ctx.Done(): // context的Done被关闭
      err := ctx.Err()
      s.mu.Lock()
      select {
      case <-ready: // 如果被唤醒了,忽略ctx的状态
        err = nil
      default: 通知waiter
        isFront := s.waiters.Front() == elem
        s.waiters.Remove(elem)
        // 通知其它的waiters,检查是否有足够的资源
        if isFront && s.size > s.cur {
          s.notifyWaiters()
        }
      }
      s.mu.Unlock()
      return err
    case <-ready: // 被唤醒了
      return nil
    }
  }
  


func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()
        // fast path, 如果有足够的资源,都不考虑ctx.Done的状态,将cur加上n就返回
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
      s.cur += n
      s.mu.Unlock()
      return nil
    }
  
        // 如果是不可能完成的任务,请求的资源数大于能提供的最大的资源数
    if n > s.size {
      s.mu.Unlock()
            // 依赖ctx的状态返回,否则一直等待
      <-ctx.Done()
      return ctx.Err()
    }
  
        // 否则就需要把调用者加入到等待队列中
        // 创建了一个ready chan,以便被通知唤醒
    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()
  

        // 等待
    select {
    case <-ctx.Done(): // context的Done被关闭
      err := ctx.Err()
      s.mu.Lock()
      select {
      case <-ready: // 如果被唤醒了,忽略ctx的状态
        err = nil
      default: 通知waiter
        isFront := s.waiters.Front() == elem
        s.waiters.Remove(elem)
        // 通知其它的waiters,检查是否有足够的资源
        if isFront && s.size > s.cur {
          s.notifyWaiters()
        }
      }
      s.mu.Unlock()
      return err
    case <-ready: // 被唤醒了
      return nil
    }
  }
  

func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()
        // fast path, 如果有足够的资源,都不考虑ctx.Done的状态,将cur加上n就返回
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
      s.cur += n
      s.mu.Unlock()
      return nil
    }
  
        // 如果是不可能完成的任务,请求的资源数大于能提供的最大的资源数
    if n > s.size {
      s.mu.Unlock()
            // 依赖ctx的状态返回,否则一直等待
      <-ctx.Done()
      return ctx.Err()
    }
  
        // 否则就需要把调用者加入到等待队列中
        // 创建了一个ready chan,以便被通知唤醒
    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()
  

        // 等待
    select {
    case <-ctx.Done(): // context的Done被关闭
      err := ctx.Err()
      s.mu.Lock()
      select {
      case <-ready: // 如果被唤醒了,忽略ctx的状态
        err = nil
      default: 通知waiter
        isFront := s.waiters.Front() == elem
        s.waiters.Remove(elem)
        // 通知其它的waiters,检查是否有足够的资源
        if isFront && s.size > s.cur {
          s.notifyWaiters()
        }
      }
      s.mu.Unlock()
      return err
    case <-ready: // 被唤醒了
      return nil
    }
  }
  

Channel实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

  // Semaphore 数据结构,并且还实现了Locker接口
  type semaphore struct {
    sync.Locker
    ch chan struct{}
  }
  
  // 创建一个新的信号量
  func NewSemaphore(capacity int) sync.Locker {
    if capacity <= 0 {
      capacity = 1 // 容量为1就变成了一个互斥锁
    }
    return &semaphore{ch: make(chan struct{}, capacity)}
  }
  
  // 请求一个资源
  func (s *semaphore) Lock() {
    s.ch <- struct{}{}
  }
  
  // 释放资源
  func (s *semaphore) Unlock() {
    <-s.ch
  }

17 | SingleFlight和CyclicBarrier

SingleFlight实现原理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

  // 代表一个正在处理的请求,或者已经处理完的请求
type call struct {
    wg sync.WaitGroup
  

    // 这个字段代表处理完的值,在waitgroup完成之前只会写一次
        // waitgroup完成之后就读取这个值
    val interface{}
    err error
  
        // 指示当call在处理时是否要忘掉这个key
    forgotten bool
    dups  int
    chans []chan<- Result
  }
  
    // group代表一个singleflight对象
type Group struct {
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized
  }
  
type Result struct {
	val interface{}
	err error
	shared bool
}




func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}

	if c, ok := g.m[key]; ok {
		c.dups++
		g.mu.Unlock()
		c.wg.Wait()
		return c.val, c.err, true
	}

	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()
	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	c.val, c.err = fn()
	c.wg.Done()

	g.mu.Lock()
	if !c.forgotten {
		delete(g.m, key)
	}

	for _, ch := range c.chns {
		ch <- Result{c.val, c.err, c.dups > 0}
	}
	g.mu.Unlock()
}

CyclicBarrier循环栅栏

允许一组goroutine彼此等待,到达一个共同的执行点后放行

CyclicBarrier更适合用在固定数量的goroutine等待同一个执行点的场景,且可以复用

18| 分组操作:处理一组子任务

ErrGroup

gollback

Hunch

schedgroup

19| 分布式环境 使用etcd的选举、互斥锁和读写锁

Leader选举

互斥锁

1
2

func NewLocker(s *Session, pfx string) sync.Locker

读写锁

20| 分布式环境 使用etcd的队列、栅栏和STM

队列

栅栏

STM(Software Transactioal Memory)

etcd事务操作

1
Txn().If(cond1, cond2, ...).Then(op1, op2, ...,).Else(op1, op2, )

事务分为条件块、成功块、和失败块 条件块用来检测事务是否成功 如果成功就执行Then,否则执行Else

STM

为了简化事务操作,对API进行了封装

1
2
3
4
5
6
7
8
type STM interface {
  Get(key ...string) string
  Put(key, val string, opts ...v3.OpOption)
  Rev(key string) int64
  Del(key string)
}

apply func(STM) error