使用 C# 14 和 SQL Outbox 模式构建事件驱动 CQRS 架构

360影视 动漫周边 2025-05-18 08:40 2

摘要:在分布式系统中,确保数据库状态变更与事件发布之间的一致性一直是一项挑战。而SQL Outbox 模式则提供了一种可靠的解决方案,它允许我们在同一个数据库事务中同时写入业务数据和事件,从而防止事件丢失或顺序错乱的问题。

在分布式系统中,确保数据库状态变更与事件发布之间的一致性一直是一项挑战。而SQL Outbox 模式则提供了一种可靠的解决方案,它允许我们在同一个数据库事务中同时写入业务数据和事件,从而防止事件丢失或顺序错乱的问题。

本文将带你一步步构建一个使用 C# 14 新特性SQL Outbox 模式以及可靠事件处理机制的安全事件驱动 CQRS 微服务。

核心问题:数据库更新成功但事件发送失败,或者相反,会导致系统状态与事件流不一致。尤其在消息中间件(如 RabbitMQ、Kafka)与数据库是两个独立系统时,这种问题更容易发生。

Outbox 模式的解决方案包括:

在数据库事务中,将事件记录插入一个专用的 Outbox表;

后台异步任务轮询该表并将事件发布到消息队列;

发布成功后将事件标记为“已处理”或从表中删除,防止重复发送。

✏️ 写入端(Write API)

接收命令(如创建订单)

在同一事务中写入业务数据与 Outbox 表

🔄 Outbox 处理器

异步读取 Outbox 表中的新事件

发布事件到消息队列(如 RabbitMQ)

标记为已处理,避免重复发布

👓 读取端(Read API)

独立于写模型,基于投影数据库或缓存读取数据

public class order
{
public string Id { get; }
public decimal Total { get; }

publicOrder(string id, decimal total)
{
Id = id;
Total = total;
}

public OrderPlacedEventToEvent => new(Id, Total);
}

public recordOrderPlacedEvent(string OrderId, decimal Total);
👉Order是订单实体,OrderPlacedEvent是对应的领域事件。

将订单和事件写入数据库,放在同一个事务中处理

public async Task PlaceOrderAsync(Order order)
{
using var conn = new SQLConnection(_dbConn);
await conn.OpenAsync;
using var tx = conn.BeginTransaction;
try
{
// 插入订单记录
var insertOrderQuery = "INSERT INTO Orders (Id, Total) VALUES (@Id, @Total)";
await conn.ExecuteAsync(insertOrderQuery, new { order.Id, order.Total }, tx);

// 生成事件并写入 Outbox 表
var evt = order.ToEvent;
var insertOutboxQuery = "INSERT INTO Outbox (Id, Type, Payload) VALUES (@Id, @Type, @Payload)";
await conn.ExecuteAsync(insertOutboxQuery, new
{
Id = Guid.NewGuid,
Type = evt.GetType.Name,
Payload = JSONSerializer.Serialize(evt)
}, tx);

tx.Commit;// 提交事务
}
catch (Exception ex)
{
tx.Rollback;// 回滚事务
throw new ApplicationException("下单过程中发生错误", ex);
}
}

👆 这样即使发布失败,订单数据和事件都会被一致地回滚,避免不一致。

通过后台任务轮询 Outbox 表,并将事件发布到消息中间件:

public async Task ProcessOutboxAsync
{
using var conn = new SqlConnection(_dbConn);
var events = await conn.QueryAsync(
"SELECT TOP 100 * FROM Outbox WHERE Processed = 0");

foreach (var record in events)
{
var evt = DeserializeEvent(record.Type, record.Payload);
await _publisher.PublishAsync(evt);

// 标记为已处理
await conn.ExecuteAsync(
"UPDATE Outbox SET Processed = 1 WHERE Id = @Id", new { record.Id });
}
}

你可以用 C# 14 的 模式匹配 switch来反序列化 JSON 字符串:

object DeserializeEvent(string type, string payload) =>
type switch
{
nameof(OrderPlacedEvent) => JsonSerializer.Deserialize(payload)!,
_ => throw new InvalidOperationException($"未知事件类型:{type}")
};

在事件结构中添加 EventId(全局唯一标识符)

消费者实现幂等处理逻辑

使用 Redis 的 SET或内存缓存做快速去重判断

优点

说明

✅ 一致性强

数据和事件写入同一事务,天然一致

✅ 可重试性

即使发布失败,事件仍保存在数据库中,可后续重发

✅ 服务解耦

生产者和消费者完全分离

✅ CQRS 清晰

写模型和读模型职责分离,更易维护和扩展

通过将 SQL Outbox 模式C# 14 的新特性(如简化构造函数、记录类型、模式匹配)结合使用,我们可以构建出既强一致又易维护的事件驱动微服务架构。

这种模式不仅提升了系统的健壮性,还为将来接入 Kafka、RabbitMQ 等消息系统打下了坚实基础,也为系统提供了可扩展性与最终一致性的能力。

如果你正在构建现代化 .NET 微服务,这将是你不可错过的架构利器。

SQL Outbox 模式是一种在微服务架构中确保数据一致性与事件可靠投递的设计模式,主要用于解决数据库事务与消息发布之间的“不一致”问题✅ 一句话解释:

SQL Outbox 模式允许我们在同一个数据库事务中,既保存业务数据,又记录需要发布的事件,避免事件丢失或乱序。

🛡️ 它解决了哪些问题?

问题

Outbox 如何解决

分布式事务难以实现

通过单库事务替代复杂的分布式事务

消息投递失败

未处理的事件始终保留在 Outbox 表中,可重试

消费重复事件

可配合幂等处理逻辑、事件去重等手段

服务解耦困难

事件驱动 + Outbox 使服务之间只通过事件通信,无直接依赖

🏗️ Outbox 模式架构图简化版:++ ++ ++
| 写服务 (API)|--写入订单-->| 数据库 Orders | | Outbox 表 |
| | | |---事件记录--> (事务内) |

|
| 异步轮询
v
++
| Outbox Processor |
| → 发布事件到 Kafka/Rabbit|

本文使用chatgpt协助翻译。

原文链接:

c-sharpcorner.com/article/event-driven-cqrs-with-c-sharp-14-and-the-sql-outbox-pattern/

来源:opendotnet

相关推荐