Header menu logo FCQRS

Add a projection

A projection is a function called once per event, in order. Fold each event into your read model and record the offset in the same transaction, so processing is exactly-once across a crash. With the FCQRS.FSharp facade the function has the shape int64 -> obj -> IMessageWithCID list: the offset, the event, and the read-model events to re-publish to subscribers.

open FCQRS.Common
open FCQRS.FSharp

let handle (connString: string) (offset: int64) (event: obj) : IMessageWithCID list =
    use conn = new SqliteConnection(connString)
    conn.Open()
    use tx = conn.BeginTransaction()

    let notify =
        match event with
        | :? Event<Document.Event> as e ->
            match e.EventDetails with
            | Document.Updated doc ->
                conn.Execute(
                    "insert or replace into Documents (Id, Title, Body) values (@Id, @Title, @Body)",
                    {| Id = doc.Id.ToString(); Title = doc.Title.Value; Body = doc.Content.Value |}, tx)
                |> ignore
                // re-publish so CID subscribers wake
                [ e :> IMessageWithCID ]
            | _ -> []
        | _ -> []

    // advance the offset in the SAME transaction as the writes
    conn.Execute(
        "update Offsets set OffsetCount = @n where OffsetName = 'DocumentProjection'",
        {| n = offset |}, tx)
    |> ignore
    tx.Commit()
    notify
// C#: the same projection as a method returning the events to re-publish.
using static FCQRS.Common;        // Event<>
using static FCQRS.Model.Data;     // IMessageWithCID
using Dapper;
using Microsoft.Data.Sqlite;

public static IList<IMessageWithCID> HandleEventWrapper(
    string connString, long offset, object eventObj)
{
    using var conn = new SqliteConnection(connString);
    conn.Open();
    using var tx = conn.BeginTransaction();
    var notify = new List<IMessageWithCID>();

    if (eventObj is Event<DocumentEvent> e && e.EventDetails is DocumentEvent.Updated u)
    {
        conn.Execute(
            "insert or replace into Documents (Id, Title, Body) values (@Id, @Title, @Body)",
            new { Id = u.Document.Id.ToString(), Title = u.Document.Title.ToString(), Body = u.Document.Content.ToString() }, tx);
        notify.Add(e);   // re-publish so CID subscribers wake
    }

    // advance the offset in the SAME transaction as the writes
    conn.Execute(
        "update Offsets set OffsetCount = @n where OffsetName = 'DocumentProjection'",
        new { n = offset }, tx);
    tx.Commit();
    return notify;
}

Register it from your composition root, resuming from the stored offset:

let subscriptions =
    Fcqrs.projection api
        { LastOffset = int (Db.getLastOffset connString)
          Handle = handle connString }
// C#: register via the DI host-builder, resuming from the stored offset.
services.AddProjection(
    handler: sp => (offset, evt) => HandleEventWrapper(connString, offset, evt),
    lastOffset: _ => (int)ServerQuery.GetLastOffset(connString));

Fcqrs.projection returns an ISubscribe — the same subscription stream .Send and the read-your-writes wait use. (subscriptions.Subscribe(cid, 1) waits for one event with a given correlation id.)

Two things matter most. Return the events you handled — those are re-published on the subscription stream, which is what lets a caller know the read side has caught up (see read-your-writes). And rebuild freely: to fix a projection bug, correct this function, delete the read model, reset the offset to 0, and replay — the journal is untouched. Background: The read side.

val handle: connString: string -> offset: int64 -> event: obj -> 'a list
val connString: string
Multiple items
val string: value: 'T -> string

--------------------
type string = System.String
val offset: int64
Multiple items
val int64: value: 'T -> int64 (requires member op_Explicit)

--------------------
type int64 = System.Int64

--------------------
type int64<'Measure> = int64
val event: obj
type obj = System.Object
type 'T list = List<'T>
val conn: System.IDisposable
val tx: System.IDisposable
val notify: 'a list
Multiple items
module Event from Microsoft.FSharp.Control

--------------------
type Event<'T> = new: unit -> Event<'T> member Trigger: arg: 'T -> unit member Publish: IEvent<'T>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: obj * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args>

--------------------
new: unit -> Event<'T>

--------------------
new: unit -> Event<'Delegate,'Args>
val e: obj
val ignore: value: 'T -> unit
Multiple items
val int: value: 'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int

Type something to start searching.