Java多线程之Semaphore信号量

ProjectDaedalus

共 5411字,需浏览 11分钟

 · 2021-11-30

这里就JUC包中的Semaphore类做相关介绍

abstract.jpeg

概述

JUC包中的Semaphore信号量作为一个并发工具类。其基本思想很简单,对于一个信号量实例而言,其含有指定数量的许可。每当访问资源前,需先向其申请许可。并在处理完毕后释放许可,以供后续申请。其实,这个使用方式就很像现实世界的停车场,即停车场有空余车位,车才可以进车;否则要么等待要么离开(寻找下一个停车场)。当车从停车场的车位驶离时,则会将相应的车位就会空余出来。在整个过程停车场的车位资源是有限的固定的。常见的使用场景是对业务所使用的线程数进行控制,即所谓基于线程数的限流方式。其常用方法及功能如下所示

// 创建一个指定许可数的非公平信号量
public Semaphore(int permits);

// 创建一个指定许可数的公平/非公平信号量
public Semaphore(int permits, boolean fair);

// 释放一个许可
public void release();

// 释放指定数量的许可
public void release(int permits);

// 当前剩余可用的许可数量
public int availablePermits();

/*************************** 获取许可 ******************************/

// 阻塞等待,直到获取一个许可
public void acquire() throws InterruptedException;

// 阻塞等待,直到获取全部所需数量的许可
public void acquire(int permits) throws InterruptedException;

// 阻塞等待(忽略InterruptedException异常),直到获取一个许可
public void acquireUninterruptibly();

// 阻塞等待(忽略InterruptedException异常),直到获取全部所需数量的许可
public void acquireUninterruptibly(int permits);

// 非阻塞式获取一个许可, ture: 获取成功; false: 获取失败
public boolean tryAcquire();

// 非阻塞式获取全部所需数量的许可, ture: 获取成功; false: 获取失败
public boolean tryAcquire(int permits);

// 支持超时机制的tryAcquire方法, 获取一个许可, ture: 获取成功; false: 获取失败
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException;

// 支持超时机制的tryAcquire方法, 获取全部所需数量的许可, ture: 获取成功; false: 获取失败
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException;

// 一次性获取所有剩余可用的许可, 返回成功获取的许可数
public int drainPermits();

/******************************************************************/

可以看到,对于信号量而言,其支持公平和非公平两种类型。默认为非公平的。值得一提的是,对于tryAcquire()方法而言,其是非阻塞的。并且一旦存在可用的许可,会立即分配给它。不论是否存在其他正在等待许可的线程。即使当前这个信号量实例是公平的,换言之tryAcquire()方法会破坏公平信号量实例的公平性。如果既期望使用非阻塞方式,又期望不破坏公平信号量的公平性,可以使用它的超时机制版本,同时将超时时间设为0。即 tryAcquire(0, TimeUnit.SECONDS) 。方法tryAcquire(int permits)同理,此处不再赘述

基本实践

这里通过一个简单的实例,来进行展示其基本的使用流程

public class SemaphoreTest {

    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");

    // 系统最大的并发处理量
    private static Integer maxLimit = 5;

    @Test
    public void test1() {
        System.out.println("---------------------- 系统上线 ----------------------");
        Semaphore semaphore = new Semaphore(maxLimit, true);
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        IntStream.rangeClosed(1,8)
            .mapToObj( num -> new UserReq("用户#"+num, semaphore) )
            .forEach( threadPool::execute );

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
        System.out.println("---------------------- 系统下线 ----------------------");
    }
    

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String log = "["+time+"] "+ msg;
        System.out.println(log);
    }

    @AllArgsConstructor
    private static class UserReq implements Runnable{

        private String name;

        private Semaphore semaphore;

        @Override
        public void run() {
            // 模拟用户不定时发起请求
            try{ Thread.sleep(RandomUtils.nextLong(5002000)); } catch (Exception e) {}
            String msg = name + ": 发起请求, 系统可用资源数: " + semaphore.availablePermits();
            info(msg);

            // 阻塞等待,直到获取许可
            try {
                semaphore.acquire();
            }catch (InterruptedException e) {
                System.out.println( "Happen Exception: " + e.getMessage());
            }

            info(name + ": 系统开始处理请求");
            // 模拟业务耗时
            try{ Thread.sleep(RandomUtils.nextInt(520)*1000); } catch (Exception e) {}

            // 用户请求处理完毕,释放许可
            semaphore.release();
            info(name + ": 系统处理完毕");
        }
    }
}

测试结果如下,符合预期

figure 1.jpeg

实现原理

构造器

Semaphore信号量类的实现过程同样依赖于AQS。具体地,其是对AQS中共享锁的使用。在构建Semaphore实例过程时,一方面,通过sync变量持有AQS的实现类Sync,同时按公平性与否进一步地可细分为NonfairSync、FairSync;另一方面,通过AQS的state字段来存储许可的数量

public class Semaphore implements java.io.Serializable {

    private final Sync sync;

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits);
        }
    }

    static final class NonfairSync extends Sync {
        NonfairSync(int permits) {
            super(permits);
        }
    }

    static final class FairSync extends Sync {
        FairSync(int permits) {
            super(permits);
        }
    }

}

acquire方法

首先来看Semaphore的acquire()方法。其委托sync调用AQS的acquireSharedInterruptibly方法。而在AQS中通过调用tryAcquireShared方法判断是否需要阻塞调用线程。具体地,在Semaphore的NonfairSync、FairSync内部类分别实现了该tryAcquireShared方法的两个版本:非公平、公平。可以看到两种实现基本一致。tryAcquireShared如果返回负值,则说明当前许可数不够,当前线程需要进入AQS阻塞队列;反之则获取成功。只是在公平版本的实现中,会调用AQS的hasQueuedPredecessors方法来判断是否有其他线程已经在AQS队列中进行排队。如果有,则tryAcquireShared直接返回-1,即当前调用线程放弃获取,转而准备进入AQS队列以保障公平性

public class Semaphore implements java.io.Serializable {

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 非公平信号量获取许可
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }    

    static final class NonfairSync extends Sync {
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    static final class FairSync extends Sync {
        // 公平信号量获取许可
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                // 对于公平性实现而言, 如果AQS队列存在排队的节点
                // 则直接返回-1, 即进入AQS队列进行排队以保证公平性
                    return -1;
                // 通过访问AQS的state字段, 获取当前可用的许可数量    
                int available = getState();
                // 计算剩余可用的许可数量
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

...

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        // 线程被中断则直接抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();

        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
    // 需要子类去实现
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
}

release方法

Semaphore的release()方法类似。其同样是委托sync调用AQS的releaseShared方法。然后AQS执行tryReleaseShared方法,如果该方法返回true,则会进一步调用AQS的doReleaseShared方法来唤醒AQS队列中其他线程。可以看到在Semaphore的Sync内部类中,tryReleaseShared总是会返回true。其实现过程也很简单,如下所示

public class Semaphore implements java.io.Serializable {

    public void release() {
        sync.releaseShared(1);
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                // 通过访问AQS的state字段, 获取当前可用的许可数量    
                int current = getState();
                // 将释放的许可数添加到当前可用许可数量上
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                // 通过CAS的方式更新state字段
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    }

}

...

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    // 需要子类去实现
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
}

参考文献

  1. Java并发编程之美 翟陆续、薛宾田著
浏览 16
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报