Skip to content

InvokeSync & Messaging

Modern distributed systems often need to replicate database changes to other services, search indexes, or data warehouses. The SmartSql.InvokeSync package provides a framework for publishing SQL invocation events to message queues, enabling downstream consumers to react to data changes. With companion packages for Kafka and RabbitMQ, you can plug into either messaging infrastructure with minimal configuration.

At a Glance

PackagePurpose
SmartSql.InvokeSyncCore abstraction: IPublisher, ISubscriber, ISyncService, ISyncFilter
SmartSql.InvokeSync.KafkaKafka-backed publisher and subscriber
SmartSql.InvokeSync.RabbitMQRabbitMQ-backed publisher and subscriber

Architecture

mermaid
graph TB
    subgraph Producer["Producer Instance"]
        style Producer fill:#161b22,stroke:#30363d,color:#e6edf3
        SQL["SQL Execution"] --> Listener["InvokeSucceedListener"]
        Listener --> SyncSvc["SyncService"]
        SyncSvc --> Filter["SyncFilter"]
        Filter -->|Allowed| Pub["IPublisher"]
        Filter -->|Blocked| Drop["Drop"]
    end

    subgraph MQ["Message Queue"]
        style MQ fill:#161b22,stroke:#30363d,color:#e6edf3
        Topic["Topic / Exchange"]
    end

    subgraph Consumer["Consumer Instance"]
        style Consumer fill:#161b22,stroke:#30363d,color:#e6edf3
        Sub["ISubscriber"] --> Handler["Event Handler"]
        Handler --> CacheSync["Cache Sync"]
        Handler --> Business["Business Logic"]
    end

    Pub --> Topic
    Topic --> Sub

    style SQL fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Listener fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style SyncSvc fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Filter fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Pub fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Drop fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Topic fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Sub fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Handler fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style CacheSync fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Business fill:#2d333b,stroke:#6d5dfc,color:#e6edf3

Sync Flow Sequence

mermaid
sequenceDiagram
autonumber
    participant App as Application
    participant Pipeline as Middleware Pipeline
    participant Listener as InvokeSucceedListener
    participant Filter as SyncFilter
    participant Service as SyncService
    participant Pub as IPublisher
    participant MQ as Message Queue

    App->>Pipeline: Execute SQL
    Pipeline->>Pipeline: Execute middlewares
    Pipeline->>Listener: InvokeSucceeded event
    Listener->>Service: Sync(executionContext)
    Service->>Filter: Filter(executionContext)
    Filter->>Filter: Check StatementType, Scopes, SqlIds

    alt Filter passes
        Service->>Service: AsSyncRequest(executionContext)
        loop Each IPublisher
            Service->>Pub: PublishAsync(syncRequest)
            Pub->>MQ: Send message
        end
    else Filter blocks
        Service-->>Service: Skip (log debug)
    end

Core Interfaces

IPublisher

Publishes SyncRequest messages to a message queue:

MemberTypeDescription
PublishAsync(SyncRequest)TaskSend a sync request to the queue
Dispose()voidClean up connections

ISubscriber

Receives SyncRequest messages from a message queue:

MemberTypeDescription
Receivedevent EventHandler<SyncRequest>Fired when a message arrives
Start()voidBegin consuming messages
Stop()voidStop consuming messages

ISyncService

Orchestrates the sync process by applying filters and publishing:

MemberTypeDescription
Sync(ExecutionContext)TaskFilter and publish the execution context

ISyncFilter

Determines whether a given execution should be published:

MemberTypeDescription
Filter(ExecutionContext)boolReturns true if the execution should be synced

SyncFilter Configuration

The SyncFilter applies a multi-layer filter to determine which SQL executions should be published:

mermaid
flowchart TD
    subgraph FilterLogic["SyncFilter Decision Logic"]
        style FilterLogic fill:#161b22,stroke:#30363d,color:#e6edf3
        Start["Incoming ExecutionContext"] --> Ignore{"In IgnoreList?<br>(IgnoreStatementType,<br>IgnoreScopes, IgnoreSqlIds)"}
        Ignore -->|Yes| Block["Return false (skip)"]
        Ignore -->|No| Type{"StatementType<br>matches filter?"}
        Type -->|No| Block
        Type -->|Yes| Scope{"Scope in allowed list?<br>(if specified)"}
        Scope -->|No| Block
        Scope -->|Yes| SqlId{"SqlId in allowed list?<br>(if specified)"}
        SqlId -->|No| Block
        SqlId -->|Yes| Pass["Return true (publish)"]
    end

    style Start fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Ignore fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Block fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Type fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Scope fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style SqlId fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style Pass fill:#2d333b,stroke:#6d5dfc,color:#e6edf3

SyncFilterOptions

PropertyTypeDefaultDescription
StatementTypeStatementTypeWriteWhich statement types to sync
ScopesIEnumerable<string>nullWhitelist of scopes (null = all)
SqlIdsIEnumerable<string>nullWhitelist of SQL IDs
FullSqlIdsIEnumerable<string>nullWhitelist of full SQL IDs (Scope.SqlId)
IgnoreStatementTypeStatementType?nullStatement types to exclude
IgnoreScopesIEnumerable<string>nullScopes to exclude
IgnoreSqlIdsIEnumerable<string>nullSQL IDs to exclude
IgnoreFullSqlIdsIEnumerable<string>nullFull SQL IDs to exclude

Kafka Implementation

KafkaOptions

PropertyTypeDefaultDescription
Serversstring--Kafka broker addresses
Topicstring--Kafka topic name
ConfigIDictionary<string, string>emptyAdditional Confluent.Kafka config

Registration

csharp
services
    .AddSmartSql("SmartSql")
    .AddInvokeSync(options =>
    {
        options.StatementType = StatementType.Write;
    })
    .AddKafkaPublisher(options =>
    {
        options.Servers = "localhost:9092";
        options.Topic = "smartsql-sync";
    })
    .AddKafkaSubscriber(options =>
    {
        options.Servers = "localhost:9092";
        options.Topic = "smartsql-sync";
    });

The Kafka publisher uses IProducer<string, string> from Confluent.Kafka. Messages are keyed by {Scope}.{SqlId} for partitioning locality.

RabbitMQ Implementation

RabbitMQOptions

PropertyTypeDefaultDescription
HostNamestring"localhost"RabbitMQ host
VirtualHoststring"/"Virtual host
UserNamestring--Authentication username
Passwordstring--Authentication password
Exchangestring"smartsql"Exchange name
ExchangeTypestring"direct"Exchange type
RoutingKeystring"sync"Routing key
RequestedHeartbeatushort60Heartbeat interval
AutomaticRecoveryEnabledbooltrueAuto-reconnect

Registration

csharp
services
    .AddSmartSql("SmartSql")
    .AddInvokeSync(options => { })
    .AddRabbitMQPublisher(options =>
    {
        options.HostName = "localhost";
        options.UserName = "guest";
        options.Password = "guest";
        options.Exchange = "smartsql";
        options.RoutingKey = "smartsql.sync";
    })
    .AddRabbitMQSubscriber(options =>
    {
        options.HostName = "localhost";
        options.Exchange = "smartsql";
        options.RoutingKey = "smartsql.sync";
    });

Wiring It Up

The following sequence shows the full startup flow including publisher and subscriber registration:

mermaid
sequenceDiagram
autonumber
    participant App as Startup
    participant DI as IServiceCollection
    participant SP as IServiceProvider
    participant SmartSql as SmartSqlBuilder
    participant MQ as Message Queue

    App->>DI: AddSmartSql("SmartSql")
    App->>DI: AddInvokeSync(filterOptions)
    DI->>DI: Register SyncFilterOptions, ISyncFilter, ISyncService
    App->>DI: AddKafkaPublisher(kafkaOptions)
    DI->>DI: Register IPublisher -> KafkaPublisher
    App->>DI: AddKafkaSubscriber(kafkaOptions)
    DI->>DI: Register ISubscriber -> KafkaSubscriber

    Note over SP: App starts
    SP->>SmartSql: UseSmartSqlSync()
    SmartSql->>SmartSql: Hook InvokeSucceedListener
    SP->>MQ: UseSmartSqlSubscriber(handler)
    MQ->>MQ: subscriber.Start() - begin consuming

SyncRequest Payload

The SyncRequest object published to the message queue contains:

PropertyTypeDescription
IdGuidUnique message identifier
ScopestringSQL map scope
SqlIdstringStatement ID
StatementTypeStatementType?Select, Insert, Update, Delete
RealSqlstringThe actual SQL executed
ParametersIDictionary<string, object>SQL parameter values
ResultobjectExecution result (row count, entity, etc.)
DataSourceChoiceDataSourceChoiceRead or Write source used
TransactionIsolationLevel?Transaction isolation level if active
IsStatementSqlboolWhether this was a real SQL operation

Cross-References

  • Cache Sync -- Uses ISubscriber to invalidate local caches on remote changes.
  • Redis Cache -- Distributed caching that benefits from cache synchronization.
  • DI Integration -- Registration patterns shared with SmartSql DI.
  • AOP Transactions -- Transactions can encompass multiple sync-triggering operations.

References

Released under the MIT License.