Skip to content

缓存同步

在多实例部署中,每个应用实例维护自己的本地缓存。当某个实例执行写操作(INSERT、UPDATE、DELETE)时,其他实例的缓存会变得过时。SmartSql.Cache.Sync 包通过监听 InvokeSync 扩展发布的消息队列事件,在其他实例上执行数据变更 SQL 语句时刷新本地缓存,从而解决这一问题。

一览表

特性描述
包名SmartSql.Cache.Sync
关键类SyncCacheManager
继承自AbstractCacheManager
依赖于SmartSql.InvokeSyncISubscriber
机制订阅 ISubscriber.Received 事件,刷新匹配的缓存

工作原理

SyncCacheManager 扩展了标准的 AbstractCacheManager,并覆盖了 ListenInvokeSucceeded() 以订阅消息队列事件,替代(或补充)本地调用事件:

mermaid
sequenceDiagram
autonumber
    participant App1 as Instance A (Writer)
    participant MQ as Message Queue<br>(Kafka / RabbitMQ)
    participant App2 as Instance B (Reader)
    participant SCM as SyncCacheManager
    participant Cache as Local Cache

    App1->>App1: Execute INSERT statement
    App1->>MQ: Publish SyncRequest (via IPublisher)

    MQ->>App2: Deliver to ISubscriber
    App2->>SCM: Subscriber.Received event fires
    SCM->>SCM: Check IsStatementSql
    SCM->>SCM: Build FullSqlId = "{Scope}.{SqlId}"
    SCM->>Cache: FlushOnExecuted(FullSqlId)
    Cache->>Cache: Flush all caches registeredfor this FullSqlId

架构

mermaid
graph TB
    subgraph InstanceA["Instance A"]
        style InstanceA fill:#161b22,stroke:#30363d,color:#e6edf3
        IA_Sql["SQL Execution"] --> IA_Pub["IPublisher"]
    end

    subgraph MQ["Message Queue"]
        style MQ fill:#161b22,stroke:#30363d,color:#e6edf3
        Queue["Kafka / RabbitMQ Topic"]
    end

    subgraph InstanceB["Instance B"]
        style InstanceB fill:#161b22,stroke:#30363d,color:#e6edf3
        IB_Sub["ISubscriber"] --> IB_SCM["SyncCacheManager"]
        IB_SCM --> IB_Cache["Local Cache Flush"]
    end

    subgraph InstanceC["Instance C"]
        style InstanceC fill:#161b22,stroke:#30363d,color:#e6edf3
        IC_Sub["ISubscriber"] --> IC_SCM["SyncCacheManager"]
        IC_SCM --> IC_Cache["Local Cache Flush"]
    end

    IA_Pub --> Queue
    Queue --> IB_Sub
    Queue --> IC_Sub

    style IA_Sql fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style IA_Pub fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Queue fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style IB_Sub fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style IB_SCM fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style IB_Cache fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style IC_Sub fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style IC_SCM fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style IC_Cache fill:#2d333b,stroke:#6d5dfc,color:#e6edf3

SyncCacheManager 内部实现

实现非常简洁。SyncCacheManager 覆盖了 AbstractCacheManagerListenInvokeSucceeded() 钩子:

csharp
protected override void ListenInvokeSucceeded()
{
    _subscriber.Received += SubscriberOnReceived;
}

private void SubscriberOnReceived(object sender, SyncRequest e)
{
    if (!e.IsStatementSql)
    {
        return;
    }
    FlushOnExecuted($"{e.Scope}.{e.SqlId}");
}

关键行为:

  1. 仅处理 IsStatementSql == true 的请求(跳过非 SQL 操作)。
  2. SyncRequestScopeSqlId 构造 FullSqlId
  3. 调用 FlushOnExecuted(),触发为该语句注册的所有缓存刷新处理器。

配置

注册

csharp
services
    .AddSmartSql("SmartSql")
    .AddInvokeSync(options =>
    {
        options.StatementType = StatementType.Write;
    })
    .AddKafkaPublisher(options =>
    {
        options.Servers = "localhost:9092";
        options.Topic = "smartsql-sync";
    })
    .AddKafkaSubscriber(options =>
    {
        options.Servers = "localhost:9092";
        options.Topic = "smartsql-sync";
    });

// In Configure():
app.ApplicationServices.UseSmartSqlSync();
app.ApplicationServices.UseSmartSqlSubscriber(syncRequest => { });

替换默认缓存管理器

要使用 SyncCacheManager 替代默认缓存管理器,注入 ISubscriber 并手动创建:

csharp
var subscriber = sp.GetRequiredService<ISubscriber>();
builder.UseCacheManager(new SyncCacheManager(subscriber));

SyncRequest 结构

SyncRequest 对象携带触发同步的 SQL 操作的所有信息:

属性类型描述
IdGuid唯一请求标识符
ScopestringXML SqlMap scope(如 "User")
SqlIdstring语句 ID(如 "Insert"、"Update")
IsStatementSqlbool是否为真实 SQL 语句
StatementTypeStatementType?Select、Insert、Update、Delete
ParametersIDictionary<string, object>使用的 SQL 参数
Resultobject执行结果

交叉参考

参考资料

基于 MIT 许可证发布。