概念
CQRS的三种实现
单一数据库的CQRS
双数据库的CQRS
事件溯源 (Event Sourcing) CQRS
设计
入门
1.新建ASP.NET Core 空项目Assignment.CqrsDemo
,并安装Masa.Contrib.Dispatcher.Events
,Masa.Contrib.Dispatcher.IntegrationEvents
,Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
,Masa.Contrib.ReadWriteSplitting.Cqrs
,Masa.Contrib.Development.DaprStarter.AspNetCore
dotnet new web -o Assignment.CqrsDemo
cd Assignment.CqrsDemo
dotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.9 //使用进程内事件总线
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents --version 0.7.0-preview.9 //使用跨进程事件总线
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.7.0-preview.9 //使用Dapr提供pubsub能力
dotnet add package Masa.Contrib.ReadWriteSplitting.Cqrs --version 0.7.0-preview.9 //使用CQRS
dotnet add package Masa.Contrib.Development.DaprStarter.AspNetCore --version 0.7.0-preview.9 //开发环境下协助 Dapr Sidecar, 用于通过Dapr发布集成事件
Program.cs
PubSub
(https://docs.dapr.io/zh-hans/developing-applications/building-blocks/pubsub/pubsub-overview/)能力即可builder.Services.AddIntegrationEventBus(dispatcherOptions =>
{
dispatcherOptions.UseDapr();//使用 Dapr 提供的PubSub能力
dispatcherOptions.UseEventBus();//使用进程内事件总线
});
Dapr Sidecar
(开发环境使用)if (builder.Environment.IsDevelopment())
builder.Services.AddDaprStarter();
Program.cs
app.MapPost("/goods/add", async (AddGoodsCommand command, IEventBus eventBus) =>
{
await eventBus.PublishAsync(command);
});
/// <summary>
/// 添加商品参数, 用于接受商品参数
/// </summary>
public record AddGoodsCommand : Command
{
public string Name { get; set; }
public string Cover { get; set; }
public decimal Price { get; set; }
public int Count { get; set; }
}
Program.cs
app.MapGet("/goods/{id}", async (Guid id, IEventBus eventBus) =>
{
var query = new GoodsItemQuery(id);
await eventBus.PublishAsync(query);
return query.Result;
});
/// <summary>
/// 用于接收查询商品信息参数
/// </summary>
public record GoodsItemQuery : Query<GoodsItemDto>
{
public Guid Id { get; set; } = default!;
public override GoodsItemDto Result { get; set; }
public GoodsItemQuery(Guid id)
{
Id = id;
}
}
/// <summary>
/// 用于返回商品信息
/// </summary>
public class GoodsItemDto
{
public Guid Id { get; set; }
public string Name { get; set; }
public string Cover { get; set; }
public decimal Price { get; set; }
public int Count { get; set; }
public DateTime DateTime { get; set; }
}
Command
处理程序, 添加类CommandHandler.cs
public class CommandHandler
{
/// <summary>
/// 将商品添加到Db,并发送跨进程事件
/// </summary>
/// <param name="command"></param>
/// <param name="integrationEventBus"></param>
[EventHandler]
public async Task AddGoods(AddGoodsCommand command, IIntegrationEventBus integrationEventBus)
{
//todo: 模拟添加商品到db并发送添加商品集成事件
var goodsId = Guid.NewGuid(); //模拟添加到db后并获取商品id
await integrationEventBus.PublishAsync(new AddGoodsIntegrationEvent(goodsId, command.Name, command.Cover, command.Price,
command.Count));
}
}
/// <summary>
/// 跨进程事件, 发送添加商品事件
/// </summary>
/// <param name="Id"></param>
/// <param name="Name"></param>
/// <param name="Cover"></param>
/// <param name="Price"></param>
/// <param name="Count"></param>
public record AddGoodsIntegrationEvent(Guid Id, string Name, string Cover, decimal Price, int Count) : IntegrationEvent
{
public Guid Id { get; set; } = Id;
public string Name { get; set; } = Name;
public string Cover { get; set; } = Cover;
public decimal Price { get; set; } = Price;
public int Count { get; set; } = Count;
public override string Topic { get; set; } = nameof(AddGoodsIntegrationEvent);
}
Query
处理程序, 添加类QueryHandler.cs
public class QueryHandler
{
/// <summary>
/// 从缓存查询商品信息
/// </summary>
/// <param name="query"></param>
/// <returns></returns>
[EventHandler]
public Task GetGoods(GoodsItemQuery query)
{
//todo: 模拟从cache获取商品
var goods = new GoodsItemDto();
query.Result = goods;
return Task.CompletedTask;
}
}
Program.cs
app.MapPost(
"/integration/goods/add",
[Topic("pubsub", nameof(AddGoodsIntegrationEvent))]
(AddGoodsIntegrationEvent @event, ILogger<Program> logger) =>
{
//todo: 模拟添加商品到缓存
logger.LogInformation("添加商品到缓存, {Event}", @event);
});
// 使用 dapr 来订阅跨进程事件
app.UseRouting();
app.UseCloudEvents();
app.UseEndpoints(endpoint =>
{
endpoint.MapSubscribeHandler();
});
本章源码
参考
开源地址
推荐阅读: .NET周报【12月第1期 2022-12-08】 .NET 7 新增的 IParsable 接口介绍 .NET 云原生架构师训练营(基于 OP Storming 和 Actor 的大型分布式架构一)--学习笔记 一个.NetCore前后端分离、模块化、插件式的通用框架 .NET 为什么推荐Kestrel作为网络开发框架 用最少的代码打造一个Mini版的gRPC框架 点击下方卡片关注DotNet NB
一起交流学习
▲ 点击上方卡片关注DotNet NB,一起交流学习
请在公众号后台
回复 【路线图】获取.NET 2021开发者路线图 回复 【原创内容】获取公众号原创内容 回复 【峰会视频】获取.NET Conf开发者大会视频 回复 【个人简介】获取作者个人简介 回复 【年终总结】获取作者年终总结 回复 【加群】加入DotNet NB 交流学习群 长按识别下方二维码,或点击阅读原文。和我一起,交流学习,分享心得。