Add a projection
A projection is a function called once per event, in order. Fold each event into your read model and
record the offset in the same transaction, so processing is exactly-once across a crash. With the
FCQRS.FSharp facade the function has the shape int64 -> obj -> IMessageWithCID list: the offset, the
event, and the read-model events to re-publish to subscribers.
open FCQRS.Common
open FCQRS.FSharp
let handle (connString: string) (offset: int64) (event: obj) : IMessageWithCID list =
use conn = new SqliteConnection(connString)
conn.Open()
use tx = conn.BeginTransaction()
let notify =
match event with
| :? Event<Document.Event> as e ->
match e.EventDetails with
| Document.Updated doc ->
conn.Execute(
"insert or replace into Documents (Id, Title, Body) values (@Id, @Title, @Body)",
{| Id = doc.Id.ToString(); Title = doc.Title.Value; Body = doc.Content.Value |}, tx)
|> ignore
// re-publish so CID subscribers wake
[ e :> IMessageWithCID ]
| _ -> []
| _ -> []
// advance the offset in the SAME transaction as the writes
conn.Execute(
"update Offsets set OffsetCount = @n where OffsetName = 'DocumentProjection'",
{| n = offset |}, tx)
|> ignore
tx.Commit()
notify
|
Register it from your composition root, resuming from the stored offset:
let subscriptions =
Fcqrs.projection api
{ LastOffset = int (Db.getLastOffset connString)
Handle = handle connString }
|
Fcqrs.projection returns an ISubscribe — the same subscription stream .Send and the
read-your-writes wait use. (subscriptions.Subscribe(cid, 1) waits for one event with a given
correlation id.)
Two things matter most. Return the events you handled — those are re-published on the subscription stream, which is what lets a caller know the read side has caught up (see read-your-writes). And rebuild freely: to fix a projection bug, correct this function, delete the read model, reset the offset to 0, and replay — the journal is untouched. Background: The read side.
val string: value: 'T -> string
--------------------
type string = System.String
val int64: value: 'T -> int64 (requires member op_Explicit)
--------------------
type int64 = System.Int64
--------------------
type int64<'Measure> = int64
module Event from Microsoft.FSharp.Control
--------------------
type Event<'T> = new: unit -> Event<'T> member Trigger: arg: 'T -> unit member Publish: IEvent<'T>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: obj * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args>
--------------------
new: unit -> Event<'T>
--------------------
new: unit -> Event<'Delegate,'Args>
val int: value: 'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
FCQRS