首页 文章详情

golang并发底层实现竟然都是它!!!

光华路程序猿 | 1225 2021-04-12 10:10 0 0 0
UniSMS (合一短信)




《手摸手系列》把go sync包中的并发组件已经写完了,本文作为完结篇,最后再来探讨下go运行时锁的实现。记得在《手摸手Go 并发编程的基建Semaphore》那篇中我们聊过sync.Mutex最终是依赖sema.go中实现的sleepwakeup原语来实现的。如果细心的小伙伴会发现:

type semaRoot struct {
 lock  mutex
 treap *sudog // root of balanced tree of unique waiters.
 nwait uint32 // Number of waiters. Read w/o the lock.
}

semaRoot里也定义了一个mutex。正所谓“医者不能自医”,故而Go针对运行时的并发实现了一个简版的互斥锁mutex。具体怎么 玩的呢?先来回顾一下:

基本使用

我们以sema.go中的semacquire1方法为例,mutex的使用如下:

  // Easy case.
 if cansemacquire(addr) {
  return
 }
... ...
lockWithRank(&root.lock, lockRankRoot)
... ...
  if cansemacquire(addr) {
....
   unlock(&root.lock)
   break
  }
... ...
  goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
// goparkunlock里的钩子里藏着unlock方法调用
func parkunlock_c(gp *g, lock unsafe.Pointer) bool {
 unlock((*mutex)(lock))
 return true
}

上述逻辑正如之前所说:

  1. 先通过CAS快速判断是否获取信号量,成功直接返回;
  2. CAS失败,则尝试获取排他锁,因为sync.Mutex是基于sema.go实现,故而这里使用了运行时的mutex来进入临界区
  3. 获得运行时锁则再次尝试CAS成功则解除运行时排他锁返回;否则调用gopark挂起当前协程等待唤醒

那么Go的运行时排他锁是如何实现的呢?先来看看其数据结构

数据结构

mutex结构体比较简单,只有一个用于运行时静态锁排名的lockRankStruct和一个uintpter的key(根据不同平台实现方案用处不同)。在无竞争的情况下,它跟自旋锁一样快,类似的也是在用户空间利用atomic.Cas来尝试抢占锁,失败才会掉进系统调用,进行内核中休眠。

// 互斥锁。
// 零值mutex是未上锁状态(不需要初始化锁)
// 初始化有利于静态锁定级 但是不是必须的
type mutex struct {
 // 如果禁用了锁定级则为空结构体否则包括锁定等级
 lockRankStruct
  // 基于futex的实现将其看作uint32的key,
  // 而基于sema的实现将其看作M* waitm
  // 过去曾经是一个union,但是unions破坏了精确的GC
 key uintptr
}

lockRankStruct提供了一种运行时的静态锁排名的机制。静态锁排名会建立文件化的锁之间的总排序顺序,如果违反总的排序,则会报错。只要锁排序是按照文档设计的顺序,锁排序死锁就不会发生。如果要做Go运行时使用这个机制,你需要设置GOEXPERIMENT=staticlockranking

默认未开启,此时lockRanStruct是一个空结构体。lockWithRank()等效于lock()

mutex跟常规的锁一样提供了LockUnlock方法,不过底层根据平台不同实现也不同,总结起来大致分了三类:

  • dragonfly freebsd linux平台下,根据具体平台实现下面两个方法
// 从原子上讲,如果 *addr == val { sleep },可能发生虚假唤醒,但是这是被允许的
// 休眠时间不超过ns,ns<0表示永远休眠
futexsleep(addr *uint32, val uint32, ns int64)
//如果有任何进程在addr上休眠,则最多唤醒cnt
futexwakeup(addr *uint32, cnt uint32)
  • aix darwin nacl netbsd openbsd plan9 solaris windows平台下,根据具体平台实现下面三个方法
// 如果尚未创建mp信号量,则为其创建一个
func semacreate(mp *m)
// 如果ns<0,则获取m的信号量并返回0
// 如果ns>=0,则在ns纳秒内尝试获取m的信号量
// 如果获取了信号量返回0 如果中断或超时返回-1
func semasleep(ns int64) int32
//唤醒已经或即将在其信号量上休眠的mp
func semawakeup(mp *m)
  • js,wasm平台下的实现,因为js/wasm还不支持线程,所以不存在抢占。

因为绝大多数生产环境都是Linux平台下,所以今天主要来看看Linux的具体实现,其主要利用系统内核提供的futex来实现,好了 上代码:

Lock

// +build !goexperiment.staticlockrankingstaticlockranking未开启的情况下,// +build dragonfly freebsd linux的实现如下

//lock_futex.go
func lock(l *mutex) {
 lockWithRank(l, getLockRank(l))
}
//go:nosplit
func lockWithRank(l *mutex, rank lockRank) {
 lock2(l)
}
func lock2(l *mutex) {
 gp := getg()

 if gp.m.locks < 0 {
  throw("runtime·lock: lock count")
 }
  //g绑定的m的lock属性自增加一
 gp.m.locks++

 // 投机抢占锁 运气好抢到直接返回 不需要进行内核调用
 v := atomic.Xchg(key32(&l.key), mutex_locked)
 if v == mutex_unlocked {
  return
 }

 // wait为MUTEX_LOCKED还是MUTEX_SLEEPING取决于此互斥对象mutex是否存在线程在休眠
  // 如果我们曾经将l->key从MUTEX_SLEEPING更改为其他值,则在返回之前必须小心将其更改回MUTEX_SLEEPING,
  // 以确保休眠线程得到其唤醒调用
 wait := v

 // 单处理器上 无自旋
 // 多处理器上,自旋进行ACTIVE_SPIN尝试
 spin := 0
 if ncpu > 1 {
  spin = active_spin
 }
 for {
  // 尝试锁定 自旋
  for i := 0; i < spin; i++ {
   for l.key == mutex_unlocked {
    if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
     return
    }
   }
   procyield(active_spin_cnt)
  }

  // 尝试锁定 重新调度
  for i := 0; i < passive_spin; i++ {
   for l.key == mutex_unlocked {
    if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
     return
    }
   }
   osyield()
  }

  // 休眠
  v = atomic.Xchg(key32(&l.key), mutex_sleeping)
  if v == mutex_unlocked {
   return
  }
  wait = mutex_sleeping
  futexsleep(key32(&l.key), mutex_sleeping, -1)
 }
}

procyield只是执行30次PAUSE

active_spin_cnt = 30
procyield(active_spin_cnt)

// asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX // 将30放入AX寄存器
again:
PAUSE
SUBL $1, AX // 每次减一
JNZ again
RET

linux系统下osyield()为系统调用的封装

osyield()

//sys_linux_amd64.s
TEXT runtime·osyield(SB),NOSPLIT,$0
MOVL $SYS_sched_yield, AX
SYSCALL
RET

可以看到Lock的逻辑也并不复杂:

  1. 首先给当前g绑定的m的lock属性自增加一,禁止P被抢占
  2. atomic.Xchg尝试修改mutex.keymutex_locked,获取原始值为mutex_unlocked表明获取锁,直接返回;否则失败继续执行
  3. 进入for死循环 首先如果时多核机器,尝试进行自旋+atomic.Cas获取锁,尝试自旋4次每次失败执行30次PAUSE, 成功则返回
  4. 调用osyield()尝试一次重新调度
  5. 尝试休眠,如果此时锁资源被释放 则获取锁直接返回
  6. 调用futexsleep() 进入休眠,休眠失败在从步骤1继续尝试 直到获取锁 或 休眠成功为止

可以看出Lock还是尽量先通过自旋、CAS、osyield(),实在没办法才进入futexsleep(),为什么呢?下面说

接下来我们来一起看看关键的futexsleep()如何实现的:

//src/runtime/os_linux.go
// Atomically,
// if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
// Don't sleep longer than ns; ns < 0 means forever.
//go:nosplit
func futexsleep(addr *uint32, val uint32, ns int64) {
 // Some Linux kernels have a bug where futex of
 // FUTEX_WAIT returns an internal error code
 // as an errno. Libpthread ignores the return value
 // here, and so can we: as it says a few lines up,
 // spurious wakeups are allowed.
 if ns < 0 {
  futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nilnil0)
  return
 }

 var ts timespec
 ts.setNsec(ns)
 futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil0)
}

//go:noescape
func futex(addr unsafe.Pointer, op int32, val uint32, ts, addr2 unsafe.Pointer, val3 uint32) int32

可以看到,futex_sleep()最终调用了没有函数体的futex()方法,其汇编实现如下:

// src/runtime/sys_linux_amd64.s
// int64 futex(int32 *uaddr, int32 op, int32 val,
// struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
MOVQ addr+0(FP), DI
MOVL op+8(FP), SI
MOVL val+12(FP), DX
MOVQ ts+16(FP), R10
MOVQ addr2+24(FP), R8
MOVL val3+32(FP), R9
MOVL $SYS_futex, AX
SYSCALL
MOVL AX, ret+40(FP)
RET

Linux环境下,系统调用是原生支持的(darwin上需要通过cgo来完成),所以runtime.futex直接通过汇编来实现了,通过将$SYS_futex,也就是202号系统调用编号送入了AX,然后通过SYSCALL来完成对futex的系统调用。

linuxfutex是如何实现的呢?

从Linux内核看futex

啥是futex?

futex(Fast Userspace muTEX快速用户空间互斥体)是低级同步原语,为高级API提支持。它的目标是在无竞争的情况下不进入内核或不分配内核资源。它第一次出现在Linux内核开发的2.5.7版本,其语义在2.5.40版本固定下来,然后在2.6.x系列稳定版内核中出现。

futex出现之前,linux的同步机制主要分为两类:

  • 用户态的同步机制  利用原子指令实现自旋锁,但不适用于临界区执行时间过长的场景
  • 内核同步机制 如semaphore等,使用自旋+等待的形式,lock和unlock都是系统调用,即使没有冲突也要通过系统调用进入内核之后才能识别。

理想状态下,在无冲突的情况下在用户空间通过自旋锁解决,如果仍未拿到锁资源需要等待时再通过系统调用完成sleep和wake的语义。那么自旋失败的情况下,将当前线程挂起,然后是不是由持有锁的线程释放锁的时候再将其唤醒呢?带着这个疑问我们一起粗略看看linux内核如何实现futex的吧

数据结构

Linux内核实现主要在kernel/futex.cinclude/linux/futex.h,翻了下3.10版本内核源码,futex大体是所以它的结构跟Java中的HashMap有些类似。futex_hash_bucket为hash桶,futex_q为存储的元素实体结构。如下图

/**
 * struct futex_q - futex等待队列实体,每个等待任务分配一个
 * @list:  等待在futex上带优先级链表
 * @task:  等待在futex上的任务
 * @lock_ptr:  哈希桶自旋锁指针
 * @key:  the key the futex is hashed on
 * @pi_state:  optional priority inheritance state
 * @rt_waiter:  rt_waiter storage for use with requeue_pi
 * @requeue_pi_key: the requeue_pi target futex key
 * @bitset:  bitset for the optional bitmasked wakeup
 *
 * We use this hashed waitqueue, instead of a normal wait_queue_t, so
 * we can wake only the relevant ones (hashed queues may be shared).
 *
 * A futex_q has a woken state, just like tasks have TASK_RUNNING.
 * It is considered woken when plist_node_empty(&q->list) || q->lock_ptr == 0.
 * The order of wakeup is always to make the first condition true, then
 * the second.
 *
 * PI futexes are typically woken before they are removed from the hash list via
 * the rt_mutex code. See unqueue_me_pi().
 */

struct futex_q {
 struct plist_node list;// 等待在futex上按照优先级排序的任务列表

 struct task_struct *task; //等待在futex上的任务
 spinlock_t *lock_ptr; //哈希桶自旋锁指针
 union futex_key key; //futex进行hash的key
 struct futex_pi_state *pi_state; //可选的 优先级继承状态 控制优先级继承
 struct rt_mutex_waiter *rt_waiter;//
 union futex_key *requeue_pi_key;
 u32 bitset//掩码匹配
};
/*
 * Hash buckets are shared by all the futex_keys that hash to the same
 * location.  Each key may have multiple futex_q structures, one for each task
 * waiting on a futex.
 */

struct futex_hash_bucket {
 spinlock_t lock;
 struct plist_head chain;
};
static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];

futex

外部调用入口

mutex源码分析,我们得出Go最终会通过系统调用内核提供的futexfutex原型为:

#include <linux/futex.h>
#include <stdint.h>
#include <sys/time.h>

long futex(uint32_t *uaddr, int futex_op, uint32_t val,
           const struct timespec *timeout,   /* or: uint32_t val2 */
           uint32_t *uaddr2, uint32_t val3)
;

futex方法有6个入参:*uaddr表示futex字的指针;futex表示针对futex的具体操作类型;val的含义依赖于flags参数;*timeout表示超时操作的超时时间;*uaddr2跟操作有关,必填时指向操作使用的第二个futex字;val3取决于具体操作。futex_op取值有很多,这里我们关心的主要是FUTEX_WAITFUTEX_WAKE

kernel/futex.c中可以发现系统调用最终会调用do_futex,而do_futex最终通过op位运算生成cmd来确定具体的调用的futex的形式。

//kernel/futex.c
SYSCALL_DEFINE6(futex, u32 __user *, uaddr, int, op, u32, val,
  struct timespec __user *, utime, u32 __user *, uaddr2,
  u32, val3)
{
 struct timespec ts;
 ktime_t t, *tp = NULL;
 u32 val2 = 0;
 int cmd = op & FUTEX_CMD_MASK;

 if (utime && (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI ||
        cmd == FUTEX_WAIT_BITSET ||
        cmd == FUTEX_WAIT_REQUEUE_PI)) {
  if (copy_from_user(&ts, utime, sizeof(ts)) != 0)
   return -EFAULT;
  if (!timespec_valid(&ts))
   return -EINVAL;

  t = timespec_to_ktime(ts);
  if (cmd == FUTEX_WAIT)
   t = ktime_add_safe(ktime_get(), t);
  tp = &t;
 }
 /*
  * requeue parameter in 'utime' if cmd == FUTEX_*_REQUEUE_*.
  * number of waiters to wake in 'utime' if cmd == FUTEX_WAKE_OP.
  */

 if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE ||
     cmd == FUTEX_CMP_REQUEUE_PI || cmd == FUTEX_WAKE_OP)
  val2 = (u32) (unsigned long) utime;

 return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);
}

long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
  u32 __user *uaddr2, u32 val2, u32 val3)

{
 int cmd = op & FUTEX_CMD_MASK;
 unsigned int flags = 0;

 if (!(op & FUTEX_PRIVATE_FLAG))
  flags |= FLAGS_SHARED;

 if (op & FUTEX_CLOCK_REALTIME) {
  flags |= FLAGS_CLOCKRT;
  if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI)
   return -ENOSYS;
 }

 switch (cmd) {
 case FUTEX_LOCK_PI:
 case FUTEX_UNLOCK_PI:
 case FUTEX_TRYLOCK_PI:
 case FUTEX_WAIT_REQUEUE_PI:
 case FUTEX_CMP_REQUEUE_PI:
  if (!futex_cmpxchg_enabled)
   return -ENOSYS;
 }

 switch (cmd) {
 case FUTEX_WAIT:
  val3 = FUTEX_BITSET_MATCH_ANY;
 case FUTEX_WAIT_BITSET:
      // 若*uaddr==val则阻塞当前进程/线程 放入阻塞队列
  return futex_wait(uaddr, flags, val, timeout, val3);
 case FUTEX_WAKE:
  val3 = FUTEX_BITSET_MATCH_ANY;
 case FUTEX_WAKE_BITSET:
      // 最多唤醒val个在uaddr等待的线程/进程
  return futex_wake(uaddr, flags, val, val3);
 case FUTEX_REQUEUE:
  return futex_requeue(uaddr, flags, uaddr2, val, val2, NULL0);
 case FUTEX_CMP_REQUEUE:
  return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 0);
 case FUTEX_WAKE_OP:
  return futex_wake_op(uaddr, flags, uaddr2, val, val2, val3);
 case FUTEX_LOCK_PI:
  return futex_lock_pi(uaddr, flags, val, timeout, 0);
 case FUTEX_UNLOCK_PI:
  return futex_unlock_pi(uaddr, flags);
 case FUTEX_TRYLOCK_PI:
  return futex_lock_pi(uaddr, flags, 0, timeout, 1);
 case FUTEX_WAIT_REQUEUE_PI:
  val3 = FUTEX_BITSET_MATCH_ANY;
  return futex_wait_requeue_pi(uaddr, flags, val, timeout, val3,
          uaddr2);
 case FUTEX_CMP_REQUEUE_PI:
  return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 1);
 }
 return -ENOSYS;
}

故而,根据参数分析,go运行时调用的futexsleep()futexwakeup()方法

// 原子上,如果 *addr == val { sleep } ;虚假唤醒是被允许的;不会阻塞超过ns,ns<0表示永远休眠
futexsleep(addr *uint32, val uint32, ns int64)
//如果任何线程阻塞在addr上,则唤醒至少cnt个阻塞的任务
futexwakeup(addr *uint32, cnt uint32

实则对应了futex()futex_opFUTEX_WAITFUTEX_WAKE的调用,接下来,我们主要来分析下这两种操作的大概流程。

futex_wait

futex()方法中的futex_opFUTEX_WAIT时会调用``futex_wait()方法。主要逻辑为判断*uaddr指向的值和val值若相等,则将当前线程/进程阻塞在uaddr所指向的futex`的值上,放入等待队列。

static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
        ktime_t *abs_time, u32 bitset)

{
 struct hrtimer_sleeper timeout, *to = NULL;
 struct restart_block *restart;
 struct futex_hash_bucket *hb;
 struct futex_q q = futex_q_init;
 int ret;
...
 // 设置定时任务,超时abs_time后线程仍未被唤醒,则由定时任务唤醒它
 if (abs_time) {
  to = &timeout;
  hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?
          CLOCK_REALTIME : CLOCK_MONOTONIC,
          HRTIMER_MODE_ABS);
  hrtimer_init_sleeper(to, current);
  hrtimer_set_expires_range_ns(&to->timer, *abs_time,
          current->timer_slack_ns);
 }

retry:
  // 准备等待在uaddr上 如果成功则持有hash_bucket的锁
  // 比较uaddr和期望值val是否相等 并初始化q.key
 ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
 if (ret)//val被改变直接返回
  goto out;
  // 插入futex等待队列,等待超时或通过信号来唤醒
 futex_wait_queue_me(hb, &q, to);

 /* If we were woken (and unqueued), we succeeded, whatever. */
 ret = 0;
 /* unqueue_me() drops q.key ref */
  // 若unqueue_me成功,则说明是超时触发(因为futex_wake唤醒时,会将该进程移除等待队列,所以这里会失败)
 if (!unqueue_me(&q))
  goto out;
 ret = -ETIMEDOUT;
 if (to && !to->task)
  goto out;

 /*
  * We expect signal_pending(current), but we might be the
  * victim of a spurious wakeup as well.
  */

 if (!signal_pending(current))
  goto retry;

 ret = -ERESTARTSYS;
 if (!abs_time)
  goto out;

 restart = &current_thread_info()->restart_block;
 restart->fn = futex_wait_restart;
 restart->futex.uaddr = uaddr;
 restart->futex.val = val;
 restart->futex.time = abs_time->tv64;
 restart->futex.bitset = bitset;
 restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;

 ret = -ERESTART_RESTARTBLOCK;

out:
 if (to) {
    // 取消定时任务
  hrtimer_cancel(&to->timer);
  destroy_hrtimer_on_stack(&to->timer);
 }
 return ret;
}


futex_wait()首先设置定时任务,若超时abs_time后线程仍未被唤醒,则由定时任务唤醒它;然后进入goto retry循环:

  1. 调用futex_wait_setup(uaddr, val, flags, &q, &hb)根据uaddrhash找到futex_hash_bucket,并初始化futex_q.key

  2. 调用futex_wait_queue_me(hb, &q, to)插入futex等待队列,等待超时或被信号唤醒

/**
 * futex_wait_setup() - 准备等待在一个futex变量上
 * @uaddr: futex用户空间地址
 * @val: 期望的值
 * @flags: futex标识 (FLAGS_SHARED, etc.)
 * @q:  关联的futex_q
 * @hb:  hash_bucket的指针 返回给调用者
 *
 * Setup the futex_q and locate the hash_bucket.  Get the futex value and
 * compare it with the expected value.  Handle atomic faults internally.
 * Return with the hb lock held and a q.key reference on success, and unlocked
 * with no q.key reference on failure.
 *
 * Return:
 *  0 - uaddr contains val and hb has been locked;
 * <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked
 */

static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
      struct futex_q *q, struct futex_hash_bucket **hb)

{
 u32 uval;
 int ret;

 /*
  * Access the page AFTER the hash-bucket is locked.
  * Order is important:
  *
  *   Userspace waiter: val = var; if (cond(val)) futex_wait(&var, val);
  *   Userspace waker:  if (cond(var)) { var = new; futex_wake(&var); }
  *
  * The basic logical guarantee of a futex is that it blocks ONLY
  * if cond(var) is known to be true at the time of blocking, for
  * any cond.  If we locked the hash-bucket after testing *uaddr, that
  * would open a race condition where we could block indefinitely with
  * cond(var) false, which would violate the guarantee.
  *
  * On the other hand, we insert q and release the hash-bucket only
  * after testing *uaddr.  This guarantees that futex_wait() will NOT
  * absorb a wakeup if *uaddr does not match the desired values
  * while the syscall executes.
  */

retry:
   //初始化futex_q 填充q->key
 ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, VERIFY_READ);
 if (unlikely(ret != 0))
  return ret;

retry_private:
  //1.对q->key进行hash 然后通过&futex_queues[hash & ((1 << FUTEX_HASHBITS)-1)]找到futex_hash_bucket
  //2. spin_lock(&hb->lock)获得自旋锁
 *hb = queue_lock(q);
 //原子的将uaddr的值拷贝到uval中
 ret = get_futex_value_locked(&uval, uaddr);
 if (ret) { //拷贝操作失败 重试
    ...
  goto retry;
 }
 // 如果uval的值(即uaddr)不等于期望值val 则表明其他线程在修改
  // 直接返回无需等待
 if (uval != val) {
  queue_unlock(q, *hb);
  ret = -EWOULDBLOCK;
 }

out:
 if (ret)
  put_futex_key(&q->key);
 return ret;
}

futex_wait_setup()方法主要是为阻塞在uaddr上做准备,主要步骤:

  1. 初始化futex_q,并初始化futex_q.key的引用,
  2. futex_q.key进行hash通过&futex_queues[hash & ((1 << FUTEX_HASHBITS)-1)]找到futex_hash_bucket
  3. 调用spin_lock(&hb->lock)尝试获得自旋锁 失败则进行重试回到步骤1
  4. 判断*uaddr的值跟val是否相等,不相等说明其他线程在修改则释放持有的hb.lock自旋锁,返回-EWOULDBLOCK(\#define EWOULDBLOCK 246 /* Operation would block (Linux returns EAGAIN) */即linux中的EAGAIN)
/**
 * futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal
 * @hb:  the futex hash bucket, must be locked by the caller
 * @q:  the futex_q to queue up on
 * @timeout: the prepared hrtimer_sleeper, or null for no timeout
 */

static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
    struct hrtimer_sleeper *timeout)

{
  // task状态保证在另一个task唤醒它之前被设置,set_current_state利用set_mb()实现
  // 设置task状态为TASK_INTERRUPTIBLE CPU只会调度状态为TASK_RUNNING的任务
 set_current_state(TASK_INTERRUPTIBLE);
  //将q插入到等待队列中然后释放自旋锁
 queue_me(q, hb);
 //启动定时器
 if (timeout) {
  hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
  if (!hrtimer_active(&timeout->timer))
   timeout->task = NULL;
 }

 /*
  * If we have been removed from the hash list, then another task
  * has tried to wake us, and we can skip the call to schedule().
  */

 if (likely(!plist_node_empty(&q->list))) {
  /*
   * If the timer has already expired, current will already be
   * flagged for rescheduling. Only call schedule if there
   * is no timeout, or if it has yet to expire.
   */

    // 未设置过期时间 或过期时间还未到期才进行调度
  if (!timeout || timeout->task)
      //系统重新进行调度,此时CPU会去执行其他任务,当前任务会被阻塞
   schedule();
 }
  // 走到这里说明当前任务被CPU选中执行
 __set_current_state(TASK_RUNNING);
}

futex_wait_queue_me主要做了几件事:

  1. 将设置task状态为TASK_INTERRUPTIBLE (CPU只会调度状态为TASK_RUNNING的任务)
  2. 调用queueme()futex_q插入到等待队列
  3. 启动定时任务
  4. 若未设置过期时间或过期时间未到期 重新调度进程
  5. 获的执行资格 设置task状态为TASK_RUNNING

总结一下futex_wait的工作机制:futex_wait_setup()方法会根据futex_q.keyhash找到对应futex_hash_bucket,并通过spin_lock(futex_hash_bucket.lock)获取自旋锁;futex_wait_queue_me()方法先是将task设置为TASK_INTERRUPTIBLE然后调用queue_me()futex_q入队,然后才spin_unlock(&hb->lock);释放持有的自旋锁。也就是说检查*uaddr值和进程/线程挂起放在了一个临界区中,保证了条件和等待之间的原子性。

futex_wake

futex_wake()唤醒阻塞在*uaddr上的任务。

/*
 * Wake up waiters matching bitset queued on this futex (uaddr).
 */

static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
 struct futex_hash_bucket *hb;
 struct futex_q *this, *next;
 struct plist_head *head;
 union futex_key key = FUTEX_KEY_INIT;
 int ret;

 if (!bitset)
  return -EINVAL;
  //根据uaddr的值填充&key的内容
 ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, VERIFY_READ);
 if (unlikely(ret != 0))
  goto out;
 //根据&key获取对应uaddr所在的futex_hash_bucket
 hb = hash_futex(&key);
  //对该hb加自旋锁
 spin_lock(&hb->lock);
 head = &hb->chain;
 //遍历该hb的链表 注意链表中存储的节点时plist_node类型,而这里的this却是futex_q类型
  //这种类型转换是通过c中的container_of机制实现的
 plist_for_each_entry_safe(this, next, head, list) {
  if (match_futex (&this->key, &key)) {
   if (this->pi_state || this->rt_waiter) {
    ret = -EINVAL;
    break;
   }

   /* Check if one of the bits is set in both bitsets */
   if (!(this->bitset & bitset))
    continue;
   //唤醒对应进程
   wake_futex(this);
   if (++ret >= nr_wake)
    break;
  }
 }
 //释放自旋锁
 spin_unlock(&hb->lock);
 put_futex_key(&key);
out:
 return ret;
}

关于wake_futex,调用wake_futex必须持有futex_hash_bucket的自旋锁,之后不能访问入参的futex_q

static void wake_futex(struct futex_q *q)
{
 struct task_struct *p = q->task;

 if (WARN(q->pi_state || q->rt_waiter, "refusing to wake PI futex\n"))
  return;

  // 在唤醒任务之前将q->lock_ptr设置为NULL
  // 如果另一个CPU发生了非futex方式的唤醒,则任务可能退出。p解引用则是一个不存在的task结构体
  // 为避免这种case,需要持有p引用来进行唤醒
 get_task_struct(p);
 //将q出队列
 __unqueue_futex(q);
 /*
  * 只要写入q-> lock_ptr = NULL,等待的任务就可以释放futex_q,而无需获取任何锁。
   * 这里需要一个内存屏障,以防止lock_ptr的后续存储超越plist_del。
  */

 smp_wmb();
 q->lock_ptr = NULL;
 // 将TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE状态的task唤醒
 wake_up_state(p, TASK_NORMAL);
  // task释放futex_q
 put_task_struct(p);
}

futex_wake()流程如下:

  1. 根据*uaddr填充futex_q->key,调用hash_futex(&key);查找对应的futex_hash_bucket
  2. 调用spin_lock(&hb->lock);尝试获取hb的自旋锁
  3. 遍历futex_hash_bucket挂载的链表,找到uaddr对应的节点
  4. 调用wake_futex唤起等待的进程,其实就是将task设置为TASK_RUNNING并放入调度队列中等待CPU调度同时释放futex_q
  5. 释放自旋锁

此外futex同步机制即可用于线程之间同步,也可用于进程之间同步。

  • 用于线程比较简单,因为线程共享虚拟内存空间,futex变量由唯一的虚拟地址表示,线程即可用虚拟地址访问futex变量
  • 用于进程稍微复杂一些,因为进程间是分配的独立的虚拟内存地址空间。需要通过mmap让进程可以共享一段地址空间来使用futex变量。故而每个进程访问futex的虚拟地址不一样,但是操作系统知道所有这些虚拟地址映射到同一个表示futex变量的物理地址。

Unlock

Lock对应操作,帮助唤醒阻塞的协程。

func unlock(l *mutex) {
 unlockWithRank(l)
}
//go:nosplit
func unlockWithRank(l *mutex) {
 unlock2(l)
}
func unlock2(l *mutex) {
 v := atomic.Xchg(key32(&l.key), mutex_unlocked)
 if v == mutex_unlocked {
  throw("unlock of unlocked lock")
 }
 if v == mutex_sleeping {
  futexwakeup(key32(&l.key), 1)
 }

 gp := getg()
 gp.m.locks--
 if gp.m.locks < 0 {
  throw("runtime·unlock: lock count")
 }
 if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
  gp.stackguard0 = stackPreempt
 }
}

其逻辑相比Lock要更简单一些:

  1. 通过atomic.Xchg更改mutex.key值 若为mutex_unlocked 说明当前没有锁占用直接panic;
  2. 若为mutex_sleeping则调用futexwakeup()唤醒阻塞的任务
  3. 将当前g绑定的m的locks属性减一 解除禁止抢占

关于futexwakeup()最终也是调用了futex()方法进而通过系统调用linux的futex实现。

//src/runtime/os_linux.go
// If any procs are sleeping on addr, wake up at most cnt.
//go:nosplit
func futexwakeup(addr *uint32, cnt uint32) {
 ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nilnil0)
 if ret >= 0 {
  return
 }

 // I don't know that futex wakeup can return
 // EAGAIN or EINTR, but if it does, it would be
 // safe to loop and call futex again.
 systemstack(func() {
  print("futexwakeup addr=", addr, " returned ", ret, "\n")
 })

 *(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}

具体futexwakeup的语义内核实现可以参考,futex的原理分析。

总结

理想状态下,在无冲突的情况下,在用户空间通过自旋锁(yield)即可解决资源竞争同步问题,这也是我们所期盼的毕竟用户态的操作开销相对较小;但是一旦冲突加剧或持有锁的操作过长,自旋锁的方式会让CPU空转浪费宝贵的计算资源;进而需要陷入系统调用将当前线程/进程进行休眠,等锁资源释放再唤醒它们。关于futex,也有这么个说法

刚刚不是用户态的自旋锁已经解决不了问题了吗,你可能会想我们可以这么玩:

func lock() {
 for !tryLock(state) {
  wait() //系统调用 进入休眠
 }
}

func tryLock(state) bool {
 i:=0
 for !compareAndSwap(state, 01) {
  i++
  if i>3 {
   return false
  }
 }
 return true
}
func unlock() {
 compareAndSet(state,1,0)
 wakeup() //系统调用唤醒
}

没错思路很好,但是tryLockwait中间存在一个时间窗口问题。比如,我们在lock()之后调用wait()之前调用了unlock(),这个时候当前线程调用wait()后就没人管它了。怎么办呢?为了解决这个问题,随即引入了futex,多数语言的玩法都大同小异。

如有异议 请轻拍 欢迎交流。


如果阅读过程中发现本文存疑或错误的地方,可以关注公众号留言。点赞在看人灿烂😁 




good-icon 0
favorite-icon 0
收藏
回复数量: 0
    暂无评论~~
    Ctrl+Enter