首页 文章详情

Dapr牵手.NET学习笔记:发布-订阅

DotNet NB | 130 2021-10-26 17:35 0 0 0
UniSMS (合一短信)

queue,是很好的削峰填谷工具,在业内也是主流;发布订阅,可以有效的解耦两个应用,所以dapr把他们进行了有效的封装,我们使用起来更简单高效。

本篇的案例是下完订单后,会把消息发布到redis(当然也可以是其他)中,通知系统和支付系统会订单这个消息,同时,通知系统和支付系统的两个实例中,只会有一个实例接收到这个消息,进行处理,调用示意图如下:


项目结构如下:


一、配置

用docker-compose部署,docker-compose.yml内容

version: '3.4'
services: #┌────────────────────────────────┐ #│ ordersystem app + Dapr sidecar │ #└────────────────────────────────┘ ordersystem: image: ${DOCKER_REGISTRY-}ordersystem depends_on: - redis - placement build: context: ../ dockerfile: OrderSystem/Dockerfile ports: - "3500:3500" volumes: - ../OrderSystem:/OrderSystem networks: - b2c-dapr ordersystem-dapr: image: "daprio/daprd:latest" command: [ "./daprd", "-app-id", "order", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components"] depends_on: - ordersystem network_mode: "service:ordersystem" volumes: - ../components:/components #┌───────────────────────────────────┐ #│ paymentsystem1 app + Dapr sidecar │ #└───────────────────────────────────┘ paymentsystem1: image: ${DOCKER_REGISTRY-}paymentsystem build: context: ../ dockerfile: PaymentSystem/Dockerfile ports: - "3601:3500" volumes: - ../PaymentSystem:/PaymentSystem networks: - b2c-dapr paymentsystem1-dapr: image: "daprio/daprd:latest" command: [ "./daprd", "-app-id", "pay", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ] depends_on: - paymentsystem1 network_mode: "service:paymentsystem1" volumes: - ../components:/components #┌───────────────────────────────────┐ #│ paymentsystem2 app + Dapr sidecar │ #└───────────────────────────────────┘ paymentsystem2: image: ${DOCKER_REGISTRY-}paymentsystem build: context: ../ dockerfile: PaymentSystem/Dockerfile volumes: - ../PaymentSystem:/PaymentSystem ports: - "3602:3500" networks: - b2c-dapr paymentsystem2-dapr: image: "daprio/daprd:latest" command: [ "./daprd", "-app-id", "pay", "-app-port", "80" ,"-placement-host-address", "placement:50006","-components-path","/components"] depends_on: - paymentsystem2 network_mode: "service:paymentsystem2" volumes: - ../components:/components
#┌───────────────────────────────────┐ #│ noticesystem1 app + Dapr sidecar │ #└───────────────────────────────────┘ noticesystem1: image: ${DOCKER_REGISTRY-}noticesystem build: context: ../ dockerfile: NoticeSystem/Dockerfile ports: - "3701:3500" volumes: - ../NoticeSystem:/NoticeSystem networks: - b2c-dapr noticesystem1-dapr: image: "daprio/daprd:latest" command: [ "./daprd", "-app-id", "notice", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ] depends_on: - noticesystem1 network_mode: "service:noticesystem1" volumes: - ../components:/components
  #┌───────────────────────────────────┐ #│ noticesystem2 app + Dapr sidecar │ #└───────────────────────────────────┘ noticesystem2: image: ${DOCKER_REGISTRY-}noticesystem build: context: ../ dockerfile: NoticeSystem/Dockerfile ports: - "3702:3500" volumes: - ../NoticeSystem:/NoticeSystem networks: - b2c-dapr noticesystem2-dapr: image: "daprio/daprd:latest" command: [ "./daprd", "-app-id", "notice", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ] depends_on: - noticesystem2 network_mode: "service:noticesystem2" volumes: - ../components:/components
#┌────────────────────────┐ #│ Dapr placement service │ #└────────────────────────┘ placement: image: "daprio/dapr" command: ["./placement", "-port", "50006"] ports: - "50006:50006" networks: - b2c-dapr
#┌───────────────────┐ #│ Redis state store │ #└───────────────────┘ redis: image: "redis:latest" ports: - "6380:6379" networks: - b2c-daprnetworks: b2c-dapr:


pubsub.yaml(在components文件夹下 )内容是默认,如下

apiVersion: dapr.io/v1alpha1kind: Componentmetadata:  name: pubsubspec:  type: pubsub.redis  version: v1  metadata:  - name: redisHost    value: redis:6379  - name: redisPassword    value: ""

订阅配置文件如下subscription.yaml(在components文件夹下 )

apiVersion: dapr.io/v1alpha1kind: Subscriptionmetadata:  name: myevent-subscriptionspec:  topic: orderComplete  route: /ordercomplete  pubsubname: pubsubscopes:- pay- notice


二、代码

OrderSystem项目的appsettings.json

 "PublishUrl": "http://localhost:3500/v1.0/publish/pubsub/orderComplete"


OrderSystem项目的发布方法

    [HttpGet("/orderpub/{orderno}")]    public async Task<IActionResult> OrderPub(string orderno)    {        try        {            _logger.LogInformation($"Order,publish");            await Task.Delay(400);            var client = _clientFactory.CreateClient();
var stringContent = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(new { OrderNo = orderno, Amount = 30000, OrderTime = DateTime.UtcNow}), System.Text.Encoding.UTF8, "application/json"); _logger.LogInformation(stringContent.ToString()); var content = await client.PostAsync(_publishUrl, stringContent); return new JsonResult(new { order_result = "Order success,and publish", pay_result = content }); } catch (Exception exc) { _logger.LogCritical(exc, exc.Message); return new JsonResult(new { order_result = "Order success,and publish,pay exception", message = exc.Message }); } }


PaymentSystem和NoticeSystem项目中的订阅实现

两个实体类

public class PubBody{    public string id { get; set; }    public string source { get; set; }    public string pubsubname { get; set; }    public string traceid { get; set; }    public PubOrder data { get; set; }    public string specversion { get; set; }    public string datacontenttype { get; set; }    public string type { get; set; }    public string topic { get; set; }}
public class PubOrder{ public string OrderNo { get; set; } public decimal Amount { get; set; } public DateTime OrderTime { get; set; }}


NoticeSystem和PaymentSystem两个项目中的订阅方法如下

    [HttpPost("/ordercomplete")]    public async Task<IActionResult> OrderComplete()    {        try        {            _logger.LogInformation("PaymentSystem OrderComplete runing……");            using var reader = new StreamReader(Request.Body, System.Text.Encoding.UTF8);            var content = await reader.ReadToEndAsync();            var pubBody = Newtonsoft.Json.JsonConvert.DeserializeObject<PubBody>(content);            _logger.LogInformation($"---------  HostName:{Dns.GetHostName()},OrderNo:{pubBody?.data.OrderNo},OrderAmount:{pubBody?.data.Amount},OrderTime:{pubBody?.data.OrderTime} -----------");            await Task.Delay(200);            _logger.LogInformation($"subscription pay complete");            _logger.LogInformation($"return  SUCCESS");            return new JsonResult(new            {                Status = "SUCCESS"            });        }        catch (Exception exc)        {            _logger.LogCritical(exc, exc.Message);            _logger.LogInformation($"return  RETRY");            return new JsonResult(new            {                Status = "RETRY"            });        }    }


三、发布测试

进入在B2C目发,用命令行启动docker compose

docker-compose up -d


可以测试了,调用OrderSystem的对外地址,下订单NO0001,和NO0002

localhost:3500/v1.0/invoke/order/method/orderpub/NO0001和

localhost:3500/v1.0/invoke/order/method/orderpub/NO0001

查看容器noticesystem1

查看容器noticesystem2

查看容器paymentsystem1

查看容器paymentsystem2


NoticeSystem和PaymentSystem同时订阅OrderSystem项目的发布orderComplete,两个实例会轮询处理订阅结果。Dapr就这样,把复杂的发布订阅,封装成一个api一样的简单调用和接收,项目中没有一点的痕迹。


推荐阅读:
Kubernetes全栈架构师(Kubeadm高可用安装k8s集群)--学习笔记
.NET 云原生架构师训练营(模块一 架构师与云原生)--学习笔记
.NET Core开发实战(第1课:课程介绍)--学习笔记

点击下方卡片关注DotNet NB

一起交流学习

▲ 点击上方卡片关注DotNet NB,一起交流学习

请在公众号后台


回复 【路线图】获取.NET 2021开发者路线图
回复 【原创内容】获取公众号原创内容
回复 【峰会视频】获取.NET Conf开发者大会视频
回复 【个人简介】获取作者个人简介
回复 【年终总结】获取作者年终总结
回复 加群加入DotNet NB 交流学习群

长按识别下方二维码,或点击阅读原文。和我一起,交流学习,分享心得。


good-icon 0
favorite-icon 0
收藏
回复数量: 0
    暂无评论~~
    Ctrl+Enter