2. Wiring and running it
Right now our Document is a pair of pure functions with nowhere to live. They
can't remember anything between calls, and nobody can send them a command. This chapter fixes both, and
ends with a tiny demo that makes event sourcing visible: you'll run the program, run it again, and
watch a number go up because the past was waiting on disk. Keep adding to Program.fs.
Two paths, on purpose: writing and reading
Here's the shape of the thing we're about to build:
decide / fold
command ----------------------> JOURNAL (append-only events; the truth)
|
v
projection
|
v
read model (a derived view; disposable)
The letters CQRS just mean: the path that writes and the path that reads are separate on purpose. Writes flow through the aggregate and land in the journal, an append-only log of events that is the single source of truth. Reads come from a read model derived from that log.
π‘ Mental model. Treat the journal like a bank's transaction ledger and the read model like the balance shown in your banking app. The ledger is authoritative and never edited; the balance is just a convenient running total computed from it. Lose the displayed balance and you recompute it from the ledger β you've lost nothing. Lose the ledger and you've lost everything. That asymmetry is the whole design: the read model is disposable; the journal is not.
Create the actor system
Fcqrs.actor builds the entire Akka.NET system from plain values. The only thing you're required to
hand it is a database connection (via Fcqrs.connect). Everything else β serializers, sharding,
snapshots, a one-node cluster that joins itself β comes from defaults baked into the package.
let buildApi () : IActor =
let config = ConfigurationBuilder().Build()
let loggerFactory = LoggerFactory.Create(fun _ -> ())
let connection =
Fcqrs.connect FCQRS.Actor.DBType.Sqlite "Data Source=tutorial.db;"
Fcqrs.actor config loggerFactory (Some connection) "tutorial"
|
Notice the config is an empty ConfigurationBuilder().Build(). If you've used Akka.NET before, this
is the part to do a double-take at:
π€ Did you know? Akka.NET is normally configured with HOCON β often pages of it for persistence, serialization, and sharding. FCQRS ships those defaults embedded and merges anything you supply over them, so the minimum is no HOCON file at all. (When you eventually need to override a knob, you can β see Configuration. The default is "nothing," not "you can't.")
That tutorial.db file is the journal, and the fact that it's a real file on disk is exactly what
makes the demo at the end work β it outlives the process. (Swapping DBType.Sqlite for Postgres or SQL
Server is a one-line change; the rest of your code doesn't notice. See
Configure the database.)
A read side to listen on
A projection is a function run once per event, in order. A real one folds each event into a SQL
table you can query; ours does the smallest thing that still demonstrates the principle β it recognises
Document events and hands each one back:
let handle (_offset: int64) (event: obj) : IMessageWithCID list =
match event with
| :? Event<Document.Event> as e -> [ e :> IMessageWithCID ]
| _ -> []
|
Why hand the event back instead of just doing the write? Because the list you return is re-published to subscribers, and that re-publish is what lets a caller know its write has reached the read side. That's the mechanism behind the next step. (This projection is deliberately minimal β a production one also writes to a table and tracks its offset, which is its own how-to: Add a projection.)
Send a command β and read your own write
Now the subtle part, and the reason well-built CQRS systems feel solid instead of flaky. Writes are asynchronous: the command is handled, the event is journaled, and then it propagates to the read side a beat later. So the naΓ―ve "save, then immediately query" can read stale data β the classic bug where you create something, redirect to its page, and it's not there yet.
You might reach for a Thread.Sleep or a retry loop. Don't β there's an exact signal. FCQRS threads a
correlation id (CID) through the whole round trip, and the move is:
mint a CID
|
v
SUBSCRIBE to that CID <-- before sending, so the answer can't slip past
|
v
send command --> aggregate --> journal --> projection re-publishes --> your wait wakes
π― Key principle. Subscribe before you send. If you subscribed afterward, the event could be processed in the gap and you'd wait forever for a notification that already fired. Subscribe-then-send closes that window by construction β the same reason you start recording before the rocket launches, not after.
Fcqrs.aggregate registers the aggregate and returns a handle whose .Send ties it together: you give
it the CID, the aggregate id, the command, and a predicate for the event you're waiting on.
let run () =
async {
let api = buildApi ()
let documents =
Fcqrs.aggregate api
{ Name = "Document"
Initial = Document.initial
Decide = Document.decide
Fold = Document.fold }
// No sagas yet β the saga-starter still has to be wired, with none. (Ch. 3 adds one.)
Fcqrs.wireSagaStarters api []
let subs = Fcqrs.projection api { LastOffset = 0; Handle = handle }
let cid = Fcqrs.newCid ()
// A FIXED id, so every run addresses the SAME document β that's what lets the version climb.
let id = Fcqrs.aggregateId "11111111-1111-1111-1111-111111111111"
// Subscribe to this CID *before* sending so the confirmation can't be missed.
use awaiter = subs.Subscribe(cid, 1)
match Document.Root.TryCreate(Guid "11111111-1111-1111-1111-111111111111", "Welcome", "draft") with
| Error e -> printfn "rejected: %s" e
| Ok doc ->
let! event =
documents.Send cid id (Document.CreateOrUpdate doc)
(fun e ->
match e with
| Document.Updated _ -> true)
do! awaiter.Task |> Async.AwaitTask // read side is now up to date
printfn "saved %A at version %A" event.EventDetails event.Version
}
|
Wire it to your entry point:
[<EntryPoint>]
let main _ =
run () |> Async.RunSynchronously
0
Run it, then run it again
This is the moment the whole tutorial has been building toward, so do it for real:
|
Sit with what just happened. Between the two runs the process exited completely β every byte of
in-memory state was thrown away. Yet the second run reports version 2. Nobody wrote "load the
document" code, because there isn't any to write. On startup FCQRS found the document's one stored event,
replayed it through your fold (0 β 1), and then applied the new write (1 β 2). The number climbs
because the journal persisted across the restart and fold is pure enough to reconstruct the exact same
state every time.
π‘ Try this. Delete
tutorial.db*and run again β you're back to version1. That's the asymmetry from the start of the chapter made tangible: erase the derived state and it rebuilds from the log; there's nothing else to lose because the log was the truth.
β οΈ And the honest counterweight: for one document with one field, this is a lot of moving parts to print a number. You wouldn't reach for it here. What you just watched scales, though β to thousands of entities that must stay consistent under concurrent edits, each with a perfect audit trail and free rebuildable views. You got all three without writing the persistence, the loading, or the concurrency control. That's the trade you're actually evaluating, not the line count of a hello-world.
What you now understand
A command doesn't change anything by itself β it produces an event, the event lands in a durable log, and everything else (current state, read models, the answer to your caller) is derived from that log. You also met the one piece of discipline that makes async writes feel synchronous to a caller: subscribe to a correlation id, then send, then wait.
Common mistakes
- Querying right after sending, without waiting on the CID. Intermittent "it's not there yet" bugs that only show under load. Subscribe-then-send-then-await is the fix.
- Subscribing after sending. The event can fire in the gap; your wait never completes. Always subscribe first.
- Treating the read model as precious. It's rebuildable from the journal β back up the journal, not the read model.
-
Forgetting
Fcqrs.wireSagaStarters. Even with zero sagas the starter must be wired (with[]); chapter 3 is where it earns a real argument.
Further study
- The read side β projections, offsets, and why a read model can always be thrown away and rebuilt.
- Consistency and recovery β correlation ids, read-your-writes, snapshots, and what happens across a restart.
- Configure the database β point the journal at Postgres or SQL Server with a one-line change.
Next, a rule a single aggregate structurally cannot enforce β a per-user quota β and the saga that coordinates the two aggregates it takes.
<summary> Contains common types like Events and Commands </summary>
<namespacedoc><summary>Functionality for Write Side.</summary></namespacedoc>
<summary> Idiomatic-F# functional facade for FCQRS. Gives F# consumers the same one-call ergonomics the C# host-builder (HostExtensions.fs) gives C#, but with F# idioms: records-of-functions for the definitions, typed handles for the results, an explicit wiring pipeline, and plain helpers for saga side effects. It is a *pure addition* β it wraps only the existing primitives (IActor.InitializeActor / SagaBuilder.initSimple / Query.init / InitializeSagaStarter / CreateCommandSubscription / Actor.api) and changes nothing in the C# interop layer or the core. open FCQRS.FSharp let api = Fcqrs.actor config loggerFactory (Some (Fcqrs.connect DBType.Sqlite conn)) "Cluster" let documents = Fcqrs.aggregate api { Name="Document"; Initial=...; Decide=...; Fold=... } let users = Fcqrs.aggregate api { Name="User"; ... } let quota = Fcqrs.saga api (quotaDef documents.Factory users.Factory) Fcqrs.wireSagaStarters api [ quota ] let subs = Fcqrs.projection api { LastOffset = 0; Handle = handle } // send a command and await the resulting event (read-your-writes): let! ev = documents.Send (Fcqrs.newCid()) (Fcqrs.aggregateId id) cmd (fun e -> ...) </summary>
[<Struct>] type Guid = new: b: byte array -> unit + 6 overloads member CompareTo: value: Guid -> int + 1 overload member Equals: g: Guid -> bool + 1 overload member GetHashCode: unit -> int member ToByteArray: unit -> byte array + 1 overload member ToString: unit -> string + 2 overloads member TryFormat: utf8Destination: Span<byte> * bytesWritten: byref<int> * ?format: ReadOnlySpan<char> -> bool + 1 overload member TryWriteBytes: destination: Span<byte> -> bool + 1 overload static member (<) : left: Guid * right: Guid -> bool static member (<=) : left: Guid * right: Guid -> bool ...
<summary>Represents a globally unique identifier (GUID).</summary>
--------------------
Guid ()
Guid(b: byte array) : Guid
Guid(b: ReadOnlySpan<byte>) : Guid
Guid(g: string) : Guid
Guid(b: ReadOnlySpan<byte>, bigEndian: bool) : Guid
Guid(a: int, b: int16, c: int16, d: byte array) : Guid
Guid(a: int, b: int16, c: int16, d: byte, e: byte, f: byte, g: byte, h: byte, i: byte, j: byte, k: byte) : Guid
Guid(a: uint32, b: uint16, c: uint16, d: byte, e: byte, f: byte, g: byte, h: byte, i: byte, j: byte, k: byte) : Guid
union case DocumentId.DocumentId: Guid -> DocumentId
--------------------
type DocumentId = | DocumentId of Guid override ToString: unit -> string static member OfGuid: g: Guid -> DocumentId member Value: Guid
Guid.ToString(format: string) : string
Guid.ToString(format: string, provider: IFormatProvider) : string
<summary> Validated non-blank string up to 255 chars inclusive. </summary>
union case Title.Title: ShortString -> Title
--------------------
type Title = | Title of ShortString static member TryCreate: s: string -> Result<Title,string> member Value: string
static member ValueLens.Value: this: 'Wrapped -> 'Inner (requires member Value_)
<summary> Represents any string at least 1 chars </summary>
union case Content.Content: LongString -> Content
--------------------
type Content = | Content of LongString static member TryCreate: s: string -> Result<Content,string> member Value: string
type State = { Document: Root option Version: int64 }
--------------------
type State<'Command,'Event> = { CommandDetails: CommandDetails<'Command,'Event> Sender: IActorRef }
<summary> Aggregate Version </summary>
val int64: value: 'T -> int64 (requires member op_Explicit)
--------------------
type int64 = Int64
--------------------
type int64<'Measure> = int64
type Command = | CreateOrUpdate of Root
--------------------
type Command<'CommandDetails> = { CommandDetails: 'CommandDetails CreationDate: DateTime Id: MessageId Sender: AggregateId option CorrelationId: CID Metadata: Map<string,string> } interface IMessage interface ISerializable member Equals: Command<'CommandDetails> * IEqualityComparer -> bool override ToString: unit -> string
<summary> Represents a command to be processed by an aggregate actor. <typeparam name="'CommandDetails">The specific type of the command payload.</typeparam> </summary>
--------------------
type Command<'Command,'Event> = | Execute of CommandDetails<'Command,'Event>
<summary> Represents the message sent to the internal subscription mechanism. <typeparam name="'Command">The type of the command payload.</typeparam> <typeparam name="'Event">The type of the expected event payload.</typeparam> </summary>
module Event from Microsoft.FSharp.Control
--------------------
type Event = | Updated of Root
--------------------
type Event<'EventDetails> = { EventDetails: 'EventDetails CreationDate: DateTime Id: MessageId Sender: AggregateId option CorrelationId: CID Version: Version Metadata: Map<string,string> } interface IMessage interface ISerializable member Equals: Event<'EventDetails> * IEqualityComparer -> bool override ToString: unit -> string
<summary> Represents an event generated by an aggregate actor as a result of processing a command. <typeparam name="'EventDetails">The specific type of the event payload.</typeparam> </summary>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: obj * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args>
--------------------
new: unit -> Event<'Delegate,'Args>
<summary> The specific details or payload of the command. </summary>
<summary> Persist the event to the journal. The actor's state will be updated using the event handler *after* persistence succeeds. </summary>
<summary> The specific details or payload of the event. </summary>
<summary> Defines the core functionalities and context provided by the FCQRS environment to actors. This interface provides access to essential Akka.NET services and FCQRS initialization methods. </summary>
type ConfigurationBuilder = interface IConfigurationBuilder new: unit -> unit member Add: source: IConfigurationSource -> IConfigurationBuilder member Build: unit -> IConfigurationRoot member Properties: IDictionary<string,obj> member Sources: IList<IConfigurationSource>
<summary> Builds key/value-based configuration settings for use in an application. </summary>
--------------------
ConfigurationBuilder() : ConfigurationBuilder
type LoggerFactory = interface ILoggerFactory interface IDisposable new: unit -> unit + 4 overloads member AddProvider: provider: ILoggerProvider -> unit member CreateLogger: categoryName: string -> ILogger member Dispose: unit -> unit static member Create: configure: Action<ILoggingBuilder> -> ILoggerFactory
<summary>Produces instances of <see cref="T:Microsoft.Extensions.Logging.ILogger" /> classes based on the given providers.</summary>
--------------------
LoggerFactory() : LoggerFactory
LoggerFactory(providers: Collections.Generic.IEnumerable<ILoggerProvider>) : LoggerFactory
LoggerFactory(providers: Collections.Generic.IEnumerable<ILoggerProvider>, filterOptions: LoggerFilterOptions) : LoggerFactory
LoggerFactory(providers: Collections.Generic.IEnumerable<ILoggerProvider>, filterOption: Extensions.Options.IOptionsMonitor<LoggerFilterOptions>) : LoggerFactory
LoggerFactory(providers: Collections.Generic.IEnumerable<ILoggerProvider>, filterOption: Extensions.Options.IOptionsMonitor<LoggerFilterOptions>, ?options: Extensions.Options.IOptions<LoggerFactoryOptions>) : LoggerFactory
<summary> Build a SQLite/etc. Connection from a raw connection string (ShortString hidden). </summary>
<summary> Represents the type of database connection </summary>
<summary> SQLite using Microsoft.Data.Sqlite provider </summary>
<summary> Create the actor system from plain values (cluster name as a string). </summary>
<summary> Interface for messages that carry a Correlation ID (CID). </summary>
module Event from Microsoft.FSharp.Control
--------------------
type Event<'EventDetails> = { EventDetails: 'EventDetails CreationDate: DateTime Id: MessageId Sender: AggregateId option CorrelationId: CID Version: Version Metadata: Map<string,string> } interface IMessage interface ISerializable member Equals: Event<'EventDetails> * IEqualityComparer -> bool override ToString: unit -> string
<summary> Represents an event generated by an aggregate actor as a result of processing a command. <typeparam name="'EventDetails">The specific type of the event payload.</typeparam> </summary>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: obj * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args>
--------------------
new: unit -> Event<'Delegate,'Args>
<summary> Register an aggregate and return its typed handle. Calling this IS the registration (it initializes the sharding region). </summary>
<summary> Identify the target by its string name (entity ID). </summary>
<summary> Wire every registered saga into one saga-starter (or the empty starter if none). Call after the aggregates + sagas are registered. </summary>
<summary> Register the read-model projection and return the subscription stream. </summary>
<summary> A fresh correlation id (UUID v7). </summary>
<summary> An aggregate id from a string (e.g. a document/user key). </summary>
abstract FCQRS.Query.ISubscribe.Subscribe: cid: CID * take: int * ?callback: ('TDataEvent -> unit) * ?cancellationToken: Threading.CancellationToken -> FCQRS.Query.IAwaitableDisposable
abstract FCQRS.Query.ISubscribe.Subscribe: filter: ('TDataEvent -> bool) * take: int * ?callback: ('TDataEvent -> unit) * ?cancellationToken: Threading.CancellationToken -> FCQRS.Query.IAwaitableDisposable
abstract FCQRS.Query.ISubscribe.Subscribe: cid: CID * filter: ('TDataEvent -> bool) * take: int * ?callback: ('TDataEvent -> unit) * ?cancellationToken: Threading.CancellationToken -> FCQRS.Query.IAwaitableDisposable
<summary> Send a command and await the first matching event (read-your-writes). </summary>
type Async = static member AsBeginEnd: computation: ('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit) static member AwaitEvent: event: IEvent<'Del,'T> * ?cancelAction: (unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate) static member AwaitIAsyncResult: iar: IAsyncResult * ?millisecondsTimeout: int -> Async<bool> static member AwaitTask: task: Task<'T> -> Async<'T> + 1 overload static member AwaitWaitHandle: waitHandle: WaitHandle * ?millisecondsTimeout: int -> Async<bool> static member CancelDefaultToken: unit -> unit static member Catch: computation: Async<'T> -> Async<Choice<'T,exn>> static member Choice: computations: Async<'T option> seq -> Async<'T option> static member FromBeginEnd: beginAction: (AsyncCallback * obj -> IAsyncResult) * endAction: (IAsyncResult -> 'T) * ?cancelAction: (unit -> unit) -> Async<'T> + 3 overloads static member FromContinuations: callback: (('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T> ...
--------------------
type Async<'T>
static member Async.AwaitTask: task: Threading.Tasks.Task<'T> -> Async<'T>
type EntryPointAttribute = inherit Attribute new: unit -> EntryPointAttribute
--------------------
new: unit -> EntryPointAttribute
FCQRS