Skip to content

InvokeSync 与消息传递

现代分布式系统通常需要将数据库变更复制到其他服务、搜索索引或数据仓库。SmartSql.InvokeSync 包提供了一个将 SQL 调用事件发布到消息队列的框架,使下游消费者能够响应数据变更。配合 Kafka 和 RabbitMQ 的伴侣包,你只需最少的配置即可接入任一消息基础设施。

一览表

包名用途
SmartSql.InvokeSync核心抽象:IPublisherISubscriberISyncServiceISyncFilter
SmartSql.InvokeSync.Kafka基于 Kafka 的发布者和订阅者
SmartSql.InvokeSync.RabbitMQ基于 RabbitMQ 的发布者和订阅者

架构

mermaid
graph TB
    subgraph Producer["Producer Instance"]
        style Producer fill:#161b22,stroke:#30363d,color:#e6edf3
        SQL["SQL Execution"] --> Listener["InvokeSucceedListener"]
        Listener --> SyncSvc["SyncService"]
        SyncSvc --> Filter["SyncFilter"]
        Filter -->|Allowed| Pub["IPublisher"]
        Filter -->|Blocked| Drop["Drop"]
    end

    subgraph MQ["Message Queue"]
        style MQ fill:#161b22,stroke:#30363d,color:#e6edf3
        Topic["Topic / Exchange"]
    end

    subgraph Consumer["Consumer Instance"]
        style Consumer fill:#161b22,stroke:#30363d,color:#e6edf3
        Sub["ISubscriber"] --> Handler["Event Handler"]
        Handler --> CacheSync["Cache Sync"]
        Handler --> Business["Business Logic"]
    end

    Pub --> Topic
    Topic --> Sub

    style SQL fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Listener fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style SyncSvc fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Filter fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Pub fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Drop fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Topic fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Sub fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Handler fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style CacheSync fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Business fill:#2d333b,stroke:#6d5dfc,color:#e6edf3

同步流程时序

mermaid
sequenceDiagram
autonumber
    participant App as Application
    participant Pipeline as Middleware Pipeline
    participant Listener as InvokeSucceedListener
    participant Filter as SyncFilter
    participant Service as SyncService
    participant Pub as IPublisher
    participant MQ as Message Queue

    App->>Pipeline: Execute SQL
    Pipeline->>Pipeline: Execute middlewares
    Pipeline->>Listener: InvokeSucceeded event
    Listener->>Service: Sync(executionContext)
    Service->>Filter: Filter(executionContext)
    Filter->>Filter: Check StatementType, Scopes, SqlIds

    alt Filter passes
        Service->>Service: AsSyncRequest(executionContext)
        loop Each IPublisher
            Service->>Pub: PublishAsync(syncRequest)
            Pub->>MQ: Send message
        end
    else Filter blocks
        Service-->>Service: Skip (log debug)
    end

核心接口

IPublisher

向消息队列发布 SyncRequest 消息:

成员类型描述
PublishAsync(SyncRequest)Task向队列发送同步请求
Dispose()void清理连接

ISubscriber

从消息队列接收 SyncRequest 消息:

成员类型描述
Receivedevent EventHandler<SyncRequest>消息到达时触发
Start()void开始消费消息
Stop()void停止消费消息

ISyncService

通过应用过滤器和发布来协调同步流程:

成员类型描述
Sync(ExecutionContext)Task过滤并发布执行上下文

ISyncFilter

决定是否应发布给定的执行:

成员类型描述
Filter(ExecutionContext)bool如果执行应被同步则返回 true

SyncFilter 配置

SyncFilter 应用多层过滤器来确定哪些 SQL 执行应被发布:

mermaid
flowchart TD
    subgraph FilterLogic["SyncFilter Decision Logic"]
        style FilterLogic fill:#161b22,stroke:#30363d,color:#e6edf3
        Start["Incoming ExecutionContext"] --> Ignore{"In IgnoreList?<br>(IgnoreStatementType,<br>IgnoreScopes, IgnoreSqlIds)"}
        Ignore -->|Yes| Block["Return false (skip)"]
        Ignore -->|No| Type{"StatementType<br>matches filter?"}
        Type -->|No| Block
        Type -->|Yes| Scope{"Scope in allowed list?<br>(if specified)"}
        Scope -->|No| Block
        Scope -->|Yes| SqlId{"SqlId in allowed list?<br>(if specified)"}
        SqlId -->|No| Block
        SqlId -->|Yes| Pass["Return true (publish)"]
    end

    style Start fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Ignore fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Block fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Type fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Scope fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style SqlId fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Pass fill:#2d333b,stroke:#6d5dfc,color:#e6edf3

SyncFilterOptions

属性类型默认值描述
StatementTypeStatementTypeWrite同步哪些语句类型
ScopesIEnumerable<string>nullscope 白名单(null = 全部)
SqlIdsIEnumerable<string>nullSQL ID 白名单
FullSqlIdsIEnumerable<string>null完整 SQL ID 白名单(Scope.SqlId)
IgnoreStatementTypeStatementType?null排除的语句类型
IgnoreScopesIEnumerable<string>null排除的 scope
IgnoreSqlIdsIEnumerable<string>null排除的 SQL ID
IgnoreFullSqlIdsIEnumerable<string>null排除的完整 SQL ID

Kafka 实现

KafkaOptions

属性类型默认值描述
Serversstring--Kafka 代理地址
Topicstring--Kafka 主题名称
ConfigIDictionary<string, string>额外的 Confluent.Kafka 配置

注册

csharp
services
    .AddSmartSql("SmartSql")
    .AddInvokeSync(options =>
    {
        options.StatementType = StatementType.Write;
    })
    .AddKafkaPublisher(options =>
    {
        options.Servers = "localhost:9092";
        options.Topic = "smartsql-sync";
    })
    .AddKafkaSubscriber(options =>
    {
        options.Servers = "localhost:9092";
        options.Topic = "smartsql-sync";
    });

Kafka 发布者使用 Confluent.Kafka 的 IProducer<string, string>。消息以 {Scope}.{SqlId} 作为 Key,用于分区局部性。

RabbitMQ 实现

RabbitMQOptions

属性类型默认值描述
HostNamestring"localhost"RabbitMQ 主机
VirtualHoststring"/"虚拟主机
UserNamestring--认证用户名
Passwordstring--认证密码
Exchangestring"smartsql"交换机名称
ExchangeTypestring"direct"交换机类型
RoutingKeystring"sync"路由键
RequestedHeartbeatushort60心跳间隔
AutomaticRecoveryEnabledbooltrue自动重连

注册

csharp
services
    .AddSmartSql("SmartSql")
    .AddInvokeSync(options => { })
    .AddRabbitMQPublisher(options =>
    {
        options.HostName = "localhost";
        options.UserName = "guest";
        options.Password = "guest";
        options.Exchange = "smartsql";
        options.RoutingKey = "smartsql.sync";
    })
    .AddRabbitMQSubscriber(options =>
    {
        options.HostName = "localhost";
        options.Exchange = "smartsql";
        options.RoutingKey = "smartsql.sync";
    });

完整配置

以下时序展示了包含发布者和订阅者注册的完整启动流程:

mermaid
sequenceDiagram
autonumber
    participant App as Startup
    participant DI as IServiceCollection
    participant SP as IServiceProvider
    participant SmartSql as SmartSqlBuilder
    participant MQ as Message Queue

    App->>DI: AddSmartSql("SmartSql")
    App->>DI: AddInvokeSync(filterOptions)
    DI->>DI: Register SyncFilterOptions, ISyncFilter, ISyncService
    App->>DI: AddKafkaPublisher(kafkaOptions)
    DI->>DI: Register IPublisher -> KafkaPublisher
    App->>DI: AddKafkaSubscriber(kafkaOptions)
    DI->>DI: Register ISubscriber -> KafkaSubscriber

    Note over SP: App starts
    SP->>SmartSql: UseSmartSqlSync()
    SmartSql->>SmartSql: Hook InvokeSucceedListener
    SP->>MQ: UseSmartSqlSubscriber(handler)
    MQ->>MQ: subscriber.Start() - begin consuming

SyncRequest 载荷

发布到消息队列的 SyncRequest 对象包含:

属性类型描述
IdGuid唯一消息标识符
ScopestringSQL 映射 scope
SqlIdstring语句 ID
StatementTypeStatementType?Select、Insert、Update、Delete
RealSqlstring实际执行的 SQL
ParametersIDictionary<string, object>SQL 参数值
Resultobject执行结果(行数、实体等)
DataSourceChoiceDataSourceChoice使用的读或写数据源
TransactionIsolationLevel?活动事务的隔离级别
IsStatementSqlbool是否为真实 SQL 操作

交叉参考

  • 缓存同步 -- 使用 ISubscriber 在远程变更时使本地缓存失效。
  • Redis 缓存 -- 受益于缓存同步的分布式缓存。
  • DI 集成 -- 与 SmartSql DI 共享的注册模式。
  • AOP 事务 -- 事务可包含多个触发同步的操作。

参考资料

基于 MIT 许可证发布。