探秘分布式异步通信(1)

分布式朝闻道

共 27834字,需浏览 56分钟

 · 2022-09-23

分布式场景下通常会使用同步和异步方式来进行跨进程通信。同步方式较容易理解,它具有通信实时性高的特点。为了满足系统的高吞吐,产生了异步通信方式。我们重点聊聊分布式场景下的异步通信方式。

狭义上的同步通信方式是指,当请求发送出去后,在没有接收到请求的响应之前,调用方用户线程只能阻塞等待返回,这个时间内做不了别的事情。

相对的,狭义的异步通信方式是指,当请求借助异步通信机制发送出去后,用户线程可以继续执行别的操作,并不会阻塞等待应答返回;当请求返回后,会借助通知等方式通知调用者,或者通过回调函数来执行后续的逻辑。

广义上的同步通信方式是指,服务调用方发送一个请求,需要等待服务提供方执行完成的结果,否则就不能继续执行后续逻辑。

相对应的,广义的异步通信方式是指,上游的服务调用方只要确保请求消息成功发送就可以返回(我们称这样的调用方为消息生产者),继续执行后续的业务。业务逻辑的执行交给下游的服务(我们称这样的服务为消息消费者)。这种异步执行的业务逻辑通常是耗时的长事务,比如说物流发货、视频转码等业务场景。

本系列重点关注广义的异步通信方式,目前主流的异步通信方式分为线程池、队列以及回调机制。

1.线程池

基于线程池能够实现异步通信。线程池是一种基于池化概念产生的线程集合。

线程池原理

线程池是为了避免频繁的创建销毁线程为系统带来额外的内存、CPU压力,而对线程进行了复用。当需要使用线程时,从池中取出一个使用,用完后再将此线程“返还”给线程池。 我们称这里的线程为“工作线程”,通过下图来形象表达上述的过程。

首先,向线程池中提交任务1。此时线程中已经存在6个工作线程,其中一个处于忙碌状态,正在执行已提交的任务。 然后,线程池从剩余的5个空闲工作线程中选择一个,分配给任务1,开始执行任务1,如下图所示。此时,工作线程1被分配给任务1,开始执行任务1的业务逻辑,工作线程1的状态变更为忙碌。

在任务被执行完成后,工作线程并不忙着被关闭,而是被返回线程池中,状态仍旧为空闲,方便执行后续到来的任务。

总结来说,线程池就是将“频繁的创建新的线程”变更为“从线程池中直接获取一个线程”;将“在执行任务结束后关闭线程”变更为“向池中归还线程”。这样就避免了频繁创建、销毁线程带来的额外时间和空间的开销。

使用线程池实现异步通信

从上面的介绍中我们知道,线程池是通过将任务分配给工作线程,实现对任务的执行的。即,如果要实现对外的接口调用(或者网络通信),则完全可以在用户线程中向线程池中提交一个接口调用任务,然后工作线程就可以立即返回执行其他业务操作,待接口调用返回后通知工作线程对结果进行处理即可。

当用户线程提交任务到线程池成功后,就接着执行其他业务逻辑了。在程池中工作线程对外部接口的调用返回后,会通知用户线程继续返回的结果。

实际上,在Java的JUC包中已经实现了这种通知机制,通过线程池+Callable+Future的方式,就能够很方便、快捷地实现高性能的异步通信机制。 而线程池在Java的JUC包中也提供了参考实现,即ThreadPoolExecutor。另外,Java还提供了Executors工具类,以便我们快速创建模板线程池。这些优秀的代码实现在实战开发中都得到了广泛应用。

[实战]使用线程池实现异步通信

下面通过一段代码直观感受一下在Java中使用线程池实现异步通信的过程。

实现远程订单服务,MockRemoteOrderService.java,用于模拟外部系统的订单查询功能。

/**
 * @className MockRemoteOrderService
 * @desc 模拟远端订单查询服务
 */
public class MockRemoteOrderService {

    private static final Logger LOGGER = LoggerFactory.getLogger(MockRemoteOrderService.class);

    public OrderQueryResponse queryOrder(OrderQueryRequest orderQueryRequest) {
        try {
            LOGGER.info("queryOrder 开始, orderQueryRequest:{}", JSON.toJSONString(orderQueryRequest));
            // 模拟网络耗时 800ms
            Thread.sleep(800);
            OrderQueryResponse orderQueryResponse =
                    new OrderQueryResponse(orderQueryRequest.getOrderId(), 19.99);
            LOGGER.info("queryOrder 成功, orderQueryResponse:{}", JSON.toJSONString(orderQueryResponse));
            return orderQueryResponse;
        } catch (Exception e) {
            LOGGER.error("queryOrder 异常, orderQueryRequest:{}", JSON.toJSONString(orderQueryRequest), e);
            return null;
        }
    }
}

通过Thread.sleep(800),模拟网络耗时。

实现订单查询请求和响应实体——OrderQueryRequest.java和OrderQueryResponse.java。

/**
 * @className OrderQueryRequest
 * @desc 订单查询请求
 */
public class OrderQueryRequest {

    /**订单号*/
    private String orderId;

    public OrderQueryRequest(String orderId) {
        this.orderId = orderId;
    }
    省略getter、setter

/**
 * @className OrderQueryResponse
 * @desc 订单查询结果响应
 */
public class OrderQueryResponse {

    /**订单号*/
    private String orderId;
    /**订单金额*/
    private Double orderAmount;

    public OrderQueryResponse(String orderId, Double orderAmount) {
        this.orderId = orderId;
        this.orderAmount = orderAmount;
    }
    省略getter、setter 

实现本地订单查询业务逻辑LocalBizService.java。

/**
 * @className LocalBizService
 * @desc 本地业务逻辑
 */
public class LocalBizService {

    private static final Logger LOGGER = LoggerFactory.getLogger(LocalBizService.class);

    MockRemoteOrderService mockRemoteOrderService;

    /**自定义订单查询线程池*/
    private static final ExecutorService ORDER_QUERY_THREAD_POOL =
            new ThreadPoolExecutor(
                    Runtime.getRuntime().availableProcessors() + 1,
                    Runtime.getRuntime().availableProcessors() * 2,
                    60,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(500),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r, "order-query-thread-pool");
                            thread.setDaemon(true);
                            return thread;
                        }
                    },
                    new ThreadPoolExecutor.CallerRunsPolicy());

    public LocalBizService(MockRemoteOrderService mockRemoteOrderService){
        this.mockRemoteOrderService = mockRemoteOrderService;
    }

    public void execute(String orderId){
        // 参数校验
        if (StringUtils.isBlank(orderId)) {
            throw new RuntimeException("orderId为空! orderId:" + orderId);
        }
        // 组装请求
        OrderQueryRequest orderQueryRequest = new OrderQueryRequest(orderId);
        // 提交订单查询任务
        ORDER_QUERY_THREAD_POOL.submit(new Runnable() {
            @Override
            public void run() {
                mockRemoteOrderService.queryOrder(orderQueryRequest);
            }
        });
        LOGGER.info("订单查询任务提交成功, orderId:{}", orderId);
        // 其他业务逻辑
        doSomething();
    }

    private void doSomething() {
        LOGGER.info("查询订单期间继续执行其他业务逻辑......");
    }

    public static void main(String[] args) {
        // 实例化mock远程订单查询服务
        MockRemoteOrderService mockRemoteOrderService = new MockRemoteOrderService();
        // 实例化本地业务逻辑
        LocalBizService localBizService = new LocalBizService(mockRemoteOrderService);
        // 提交订单查询任务
        String orderId = "ORDER_" + UUID.randomUUID().toString();
        localBizService.execute(orderId);
        while (true) {
            // hold住主线程
        }
    }
}
  • 在main()方法中,首先实例化MockRemoteOrderService对象,模拟调用远程订单查询服务;
  • 接着实例化LocalBizService对象,通过构造注入MockRemoteOrderService对象,模拟本地业务逻辑;
  • 初始化一个订单Id,通过调用localBizService.execute(orderId)方法,通过线程池提交了一个订单查询任务,模拟发起一次异步的远程通信;
  • 在execute()方法中,通过线程池的submit()方法提交了任务到线程池后,继续执行后续的业务逻辑即doSomething()。

最后运行main()方法,运行结果如下:

23:38:59.070 [main] INFO com.snowalker.from.distributed.to.cloudnative.section_1_2_2.async_threadpool.LocalBizService - 订单查询任务提交成功, orderId:ORDER_98c0e2e1-964a-4f94-9ec6-f50669d700dd
23:38:59.075 [main] INFO com.snowalker.from.distributed.to.cloudnative.section_1_2_2.async_threadpool.LocalBizService - 查询订单期间继续执行其他业务逻辑......
23:38:59.150 [order-query-thread-pool] INFO com.snowalker.from.distributed.to.cloudnative.section_1_2_2.async_threadpool.service.MockRemoteOrderService - queryOrder 开始, orderQueryRequest:{"orderId":"ORDER_98c0e2e1-964a-4f94-9ec6-f50669d700dd"}
23:38:59.976 [order-query-thread-pool] INFO com.snowalker.from.distributed.to.cloudnative.section_1_2_2.async_threadpool.service.MockRemoteOrderService - queryOrder 成功, orderQueryResponse:{"orderAmount":19.99,"orderId":"ORDER_98c0e2e1-964a-4f94-9ec6-f50669d700dd"}

通过日志打印可以直观的看到,在任务提交成功后,主线程继续执行后续业务逻辑,而耗时的订单查询任务会通过线程池的工作线程异步执行。

这就是通过线程池实现异步通信的代码实现。

消息队列

广义上的队列分为进程内的队列及进程间队列,例如,Java中的ArrayBlockingQueue、LinkedBlockingQueue等就属于进程内的队列;而ActiveMQ、Kafka、RocketMQ、RabbitMQ等就属于进程间的队列。我们在日常开发中说的消息队列,如果没有特指,往往指的是进程间的队列。

那么在分布式系统中,如何基于进程间的消息队列实现异步通信呢? 先看一个实际的案例:这是一个简化版的分布式电商系统,它的订单中心中保存了核心的订单交易数据。在系统建设的初期,商品中心进行库存扣减时,需要关联订单数据;物流中心在发货时也需要同步订单数据;支付中心在支付发起时,也需要同步订单数据。此时的交互过程如图所示。

订单中心直接调用商品中心、物流中心、支付中心提供的数据同步接口,将订单相关数据同步发送给了其他的系统。这看起来很正常,也没有什么问题发生。

随着系统规模逐渐增长,又有更多的系统需要获取订单数据进行各自的业务操作和数据分析,比如:风控中心需要通过订单数据进行风控相关的操作;仓储中心需要通过分析订单数据对货品管理进行调控;广告投放中心需要通过订单数据对广告转化率进行计算和优化;数据中心需要通过对订单数据进行汇总清洗,产出报表和大盘;财务中心需要通过订单数据建立起实时的结算体系等。

于是订单中心逐步增加了对其他系统的数据同步的代码,用以支持它们各自的需求。

由于采用的是同步接口对接方式进行数据同步,因此订单中心中不得不对新增的代码添加异常处理代码,防止因为下游服务异常而导致订单中心自己的业务出现级联影响。随着业务逐步发展,下游系统也在进行改造,下游系统的数据同步接口一旦发生修改,比如增加字段、变更服务地址等,都需要通知订单中心进行代码修改,一时间订单中心的研发人员苦不堪言。

实际的业务场景中,订单中心面临的下游系统不只图上的这些。在大型互联网公司中,下游系统本身也是一个复杂的分布式系统,其中包含了数十上百的子服务。这样一来,订单中心需要对接的系统可能会达到数百上千个,甚至更多。这其中的调用复杂关系可想而知,代码的复杂程度也难以想象,一个数据同步接口中动辄几千行代码不足为奇。

长期发展下去,势必会造成线上问题频出,况且数据同步接口并非核心业务逻辑,但是却需要投入大量的成本去维护,降低了上下游的迭代效率,还使得开发运维人员疲于奔命,对于个人亦或者企业而言都是不值得的。而这一切的根源都是因为采用了接口同步调用的方式去传递订单数据。而同步调用本身就是一种强耦合的通信方式。

那么改造的思路就显而易见,就是想办法优化数据传输方式,降低系统间的耦合度。改造思路就是通过使用消息队列来实现系统之间的松耦合。

上图所示,订单中心不再像之前那样通过调用下游各个中心的订单同步接口,而是直接将订单同步消息发送到MQ消息队列(如Kakfa、RocketMQ等)中的订单同步Topic。下游的中心如果对订单消息感兴趣,则自行订阅该Topic,拉取消息进行消费即可实现订单同步。一旦不需要同步订单消息,则下游的中心主动取消订阅Topic即可,上游的订单中心完全不需要感知下游如何去消费消息。而且一旦又有新的下游的服务也需要进行订单同步,实现新的业务逻辑,则该中心只需要实现消费订单消息的逻辑即可,直接与MQ消息队列进行交互,同样不需要上游的订单消息进行感知。

对于上游的订单消息而言,它要做的就是专注于将订单同步消息生产出来并发送(投递)到MQ消息队列,并保证消息发送成功。实现了与下游各种中心的松耦合,则在代码量大幅度减少的同时,订单中心的稳定性得到了提高。原先订单中心需要保证订单同步请求被下游的系统接收并处理完成才能继续后续的操作。

假设调用每个服务的平均响应时长是100ms(实际情况中可能会由于网络拥塞变得时间更长,这里是一种理想化的情况),那么上图中的8个中心同步数据的总时长为100ms × 8 = 800ms。而通过MQ消息队列优化后的平均响应时长能够降低数十毫秒,比如20ms。这是因为,消息投递本身是一个高性能的操作,只要保证消息发送成功并被MQ消息队列接收并持久化即可。后续的消费者对消息的消费过程,上游的发送者完全不用关注。

即当订单中心在逻辑0中完成发送订单同步消息的操作后,就可以继续执行逻辑1,逻辑2等后续的业务逻辑。

对于整个链路而言,订单同步过程就变成了一种异步的通信方式,这样降低了上游业务逻辑执行耗时,解耦了上下游之间的交互过程,提升了系统的处理能力和吞吐量。对于用户而言,等待时长也明显变短,提升了用户体验。

[实战]使用消息队列实现异步订单同步

下面通过一段代码模拟改造后的场景,直观地体验一下如何通过消息队列实现异步通信的目的。这里使用到的消息队列是RocketMQ。关于RocketMQ的搭建和原理会在后续的章节中详细展开,这里主要是展示具体的使用方法。

定义一个订单同步类,它的属性是需要通过消息发送方式异步同步给下游各种中心.

/**
 * @className OrderInfo
 * @desc 订单同步实体
 */
public class OrderSyncInfo {

    /**订单id*/
    private String orderId;
    /**订单金额*/
    private Double orderAmount;
    /**支付金额*/
    private Double payAmount;
    /**优惠券id*/
    private long voucherId;
    /**产品id*/
    private long productId;
    /**商品名称*/
    private String productName;
    /**创建时间*/
    private Date createTime;
    /**发货时间*/
    private Date deliverTime;

    public OrderSyncInfo(String orderId,
                         Double orderAmount,
                         Double payAmount,
                         long voucherId,
                         long productId,
                         String productName,
                         Date createTime,
                         Date deliverTime) {
        this.orderId = orderId;
        this.orderAmount = orderAmount;
        this.payAmount = payAmount;
        this.voucherId = voucherId;
        this.productId = productId;
        this.productName = productName;
        this.createTime = createTime;
        this.deliverTime = deliverTime;
    }
    省略getter setter

在上面的代码中,通过构造方法传递对应的订单属性,即可实现对订单同步对象的初始化。

编写一个订单同步生产者类OrderInfoSyncProducer.java,用以发送订单同步消息。

/**
 * @className OrderInfoSyncProducer
 * @desc 订单信息同步生产者
 */
public class OrderInfoSyncProducer {

    private DefaultMQProducer producer;

    public OrderInfoSyncProducer(String producerGroup) {
        // 初始化生产者实例并指定生产者组
        producer = new DefaultMQProducer(producerGroup);
        // NameServer地址
        producer.setNamesrvAddr("192.168.1.108");
        try {
            producer.start();
        } catch (MQClientException e) {
            throw new RuntimeException("DefaultMQProducer init error!", e);
        }
    }

    /**
     * 发送消息
     * @param orderSyncInfo
     * @return
     */
    public SendResult send(OrderSyncInfo orderSyncInfo) {
        // 序列化订单对象为字符串消息体
        String messageBody = JSON.toJSONString(orderSyncInfo);
        try {
            // 构造Message对象
            Message message = new Message(
                    "order_info_sync_topic" /* Topic */,
                    "sync" /* Tag */,
                    messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 发送消息
            SendResult sendResult = producer.send(message);
            return sendResult;
        } catch (Exception e) {
            throw new RuntimeException("send orderInfo sync message error! orderInfo:" + messageBody, e);
        }
    }

    /**
     * 关闭生产者
     */
    public void shutdown() {
        producer.shutdown();
    }
}
  • 通过构造方法,传递了生产者组。一般生产者组都会带业务属性,方便维护;同时指定了NameServer地址,用于发现broker地址;
  • 提供了send()方法,用于将OrderSyncInfo对象序列化为JSON格式的消息体,通过Message对象包装后指定要发送到消息队列的Topic,通过producer的send()方法发送出去。

编写订单同步消费者的代码。不同的中心均可以通过该类实现对订单的同步逻辑。

/**
 * @className OrderInfoSyncConsumer
 * @desc 订单信息同步消费者
 */
public class OrderInfoSyncConsumer {

    private DefaultMQPushConsumer consumer;

    private String consumerGroup;

    public OrderInfoSyncConsumer(String consumerGroup) {
        this.consumerGroup = consumerGroup;
        // 根据consumerGroup初始化DefaultMQPushConsumer
        consumer = new DefaultMQPushConsumer(consumerGroup);
        // 集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 从哪里开始消费,此处为从offset头部开始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // NameServer地址
        consumer.setNamesrvAddr("192.168.1.108");
        // 订阅主题
        try {
            consumer.subscribe("order_info_sync_topic""*");
        } catch (MQClientException e) {
            throw new RuntimeException("subscribe topic error!", e);
        }
        //注册消息消费回调,用以执行消费逻辑
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    }

    public void start() {
        try {
            consumer.start();
        } catch (MQClientException e) {
            throw new RuntimeException("start consumer error! consumerGroup:" + consumerGroup, e);
        }
        System.out.printf("Consumer Started. consumerGroup:%s", consumerGroup);
    }
}
  • 首先通过构造方法传递当前消费者实例的消费者组consumerGroup,通过consumerGroup对DefaultMQPushConsumer进行实例化;
  • 为DefaultMQPushConsumer设置各种属性,包括:从何处开始消费、消费模式、NameServer地址、注册消息消费回调接口(案例中通过匿名内部类实现默认的消费逻辑)、通过subscribe订阅订单同步主题;
  • consumeMessage方法即为核心的消费业务逻辑;
  • 通过调用DefaultMQPushConsumer的start方法启动消费者,并开始执行消费。

最后看一下代码具体是如何使用的。

/**
 * @className Client
 * @desc 订单同步测试类
 */
public class Client {

    public static void main(String[] args) {
        // 生产者发送订单同步消息
        OrderInfoSyncProducer orderInfoSyncProducer = new OrderInfoSyncProducer("order_info_sync_group");
        OrderSyncInfo orderSyncInfo = new OrderSyncInfo(
                    "order_" + UUID.randomUUID().toString(),
                    19.99,
                    19.99,
                    100001,
                    200001,
                    "IPhone11手机壳",
                    new Date(System.currentTimeMillis()),
                    new Date(System.currentTimeMillis())
                );
        // 发送消息
        orderInfoSyncProducer.send(orderSyncInfo);

        // 订单中心消费订单同步消息
        OrderInfoSyncConsumer orderCenterConsumer = new OrderInfoSyncConsumer("order_center_sync_group");
        orderCenterConsumer.start();
        // 数据中心消费订单同步消息
        OrderInfoSyncConsumer dataCenterConsumer = new OrderInfoSyncConsumer("data_center_sync_group");
        dataCenterConsumer.start();
        // 仓储中心消费订单同步消息
        OrderInfoSyncConsumer storageCenterConsumer = new OrderInfoSyncConsumer("storage_center_sync_group");
        storageCenterConsumer.start();
    }
}

下面对上一段代码中的main()方法进行简单的讲解:

  • 初始化了生产组名为“order_info_sync_group”的订单同步生产者实例。这样在构造方法执行完成后会自行调用start启动生产者;
  • 实例化了一个订单同步对象。通过构造方法赋值属性,可以看到该订单同步实体为一个IPhone11手机壳。
  • 调用send()方法发送消息。
  • 实例化了orderCenterConsumer(订单中心订单同步消费者)、dataCenterConsumer(数据中心订单同步消费者)、storageCenterConsumer(仓储中心订单同步消费者)等消费者实例。在构造方法初始化过程中,每个消费者组会分别订阅订单同步主题,并分别开启消费,互不影响,这与RocketMQ消费消息的机制有关,此处不详细展开。最后分别调用各自的start()方法开启消费。

通过代码案例可以发现,下游的各种中心只要实现自己的OrderInfoSyncConsumer逻辑并开启消费,就能够实现在上游生产者不耦合下游消费者的前提下,完成跨多个系统的异步订单同步功能。这就是消息队列的解耦特性在实战应用的体现。

通过图例分析、案例解析结合代码,我们对消息队列的异步通信方式建立起了一个感性的认知。

下一篇中,我们将继续对异步通信中的回调机制进行学习。敬请期待。


浏览 43
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报