Header menu logo FCQRS

User Aggregate

The User aggregate is responsible for handling user registration and login commands. It maintains the state of the user, including the username and password. In this example we will try to implement below flows
User Flow

State, Command, and Event Types

open FCQRS.Common

type State =
    { Username: string option
      Password: string option }

type Command =
    | Login of string
    | Register of string * string

type Event =
    | LoginSucceeded
    | LoginFailed
    | RegisterSucceeded of string * string
    | AlreadyRegistered

For each aggregate, we define State, Command, and Event types. The State type represents the current state of the aggregate, while the Command type defines the commands that can be sent to the aggregate. The Event type defines the events that can be emitted by the aggregate.

Command Handler

Next we define the command handler which these commands will come from the outside world.

let handleCommand (cmd: Command<_>) state =
    match cmd.CommandDetails, state with
    | Register(userName, password), { Username = None } ->
         RegisterSucceeded(userName, password) |> PersistEvent

    | Register _, { Username = Some _ } -> AlreadyRegistered |> DeferEvent

    | Login password1,
        {   Username = Some _
            Password = Some password2 } when password1 = password2 -> 
                LoginSucceeded |> PersistEvent

    | Login _, _ -> LoginFailed |> DeferEvent

The command handler takes a Command<_> object which wraps the command details. Command details is our defined command type. Notice the function either return PersistEvent or DeferEvent. PersistEvent is used to persist the event to the event store and send it to the subscribers. DeferEvent is actually same thing but we don't want to persist it. Even though we don't persist it allows us to wait until the event is generated from the command side.

It handles the Register command by checking if the username is already registered. If not, it emits a RegisterSucceeded event. If the username is already registered, it emits an AlreadyRegistered event. The Login command is handled by checking if the username and password match. If they do, it emits a LoginSucceeded event. If not, it emits a LoginFailed event.

Event Handler

let applyEvent event state =
    match event.EventDetails with
    | RegisterSucceeded(userName, password) ->
        { state with
            Username = Some userName
            Password = Some password }
    | _ -> state

Not much going on here. By the time applyEvent is called, the event is already persisted. So we just update the state based on the event. And the cycle continues with the new state when a future command is received.

Wiring to Akka.net

Finally some boilerplate code to bind the above functions to the actor. Also initiates a shard. "User" here is the shard region name acts like a type

let init (env: _) (actorApi: IActor) =
    let initialState = { Username = None; Password = None }
    actorApi.InitializeActor env initialState "User" handleCommand applyEvent

let factory (env: _) actorApi entityId =
    (init env actorApi).RefFor DEFAULT_SHARD entityId

You will call factory to create or resume an actor. Actually clusterd sharded actors are eternal. They are never born and never die. You can pretend they exist since the beginning of time and will exist till the end of time. But you can stop them or after 2 minutes they will be passivated to save memory.

namespace FCQRS
module Common from FCQRS
<summary> Contains common types like Events and Commands </summary>
<namespacedoc><summary>Functionality for Write Side.</summary></namespacedoc>
Multiple items
type State = { Username: string option Password: string option }

--------------------
type State<'Command,'Event> = { CommandDetails: CommandDetails<'Command,'Event> Sender: IActorRef }
Multiple items
val string: value: 'T -> string

--------------------
type string = System.String
type 'T option = Option<'T>
Multiple items
module Command

--------------------
type Command = | Login of string | Register of string * string

--------------------
type Command<'CommandDetails> = { CommandDetails: 'CommandDetails CreationDate: DateTime Id: MessageId option Sender: ActorId option CorrelationId: CID } interface IMessageWithCID interface ISerializable member Equals: Command<'CommandDetails> * IEqualityComparer -> bool override ToString: unit -> string
<summary> Represents a command to be processed by an aggregate actor. &lt;typeparam name="'CommandDetails"&gt;The specific type of the command payload.&lt;/typeparam&gt; </summary>

--------------------
type Command<'Command,'Event> = | Execute of CommandDetails<'Command,'Event>
<summary> Represents the message sent to the internal subscription mechanism. &lt;typeparam name="'Command"&gt;The type of the command payload.&lt;/typeparam&gt; &lt;typeparam name="'Event"&gt;The type of the expected event payload.&lt;/typeparam&gt; </summary>
Multiple items
module Event from Microsoft.FSharp.Control

--------------------
type Event = | LoginSucceeded | LoginFailed | RegisterSucceeded of string * string | AlreadyRegistered

--------------------
type Event<'EventDetails> = { EventDetails: 'EventDetails CreationDate: DateTime Id: MessageId option Sender: ActorId option CorrelationId: CID Version: Version } interface IMessageWithCID interface ISerializable member Equals: Event<'EventDetails> * IEqualityComparer -> bool override ToString: unit -> string
<summary> Represents an event generated by an aggregate actor as a result of processing a command. &lt;typeparam name="'EventDetails"&gt;The specific type of the event payload.&lt;/typeparam&gt; </summary>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: obj * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args>

--------------------
new: unit -> Event<'Delegate,'Args>
val handleCommand: cmd: Command<Command> -> state: State -> EventAction<Event>
val cmd: Command<Command>
val state: State
Command.CommandDetails: Command
<summary> The specific details or payload of the command. </summary>
union case Command.Register: string * string -> Command
val userName: string
val password: string
union case Option.None: Option<'T>
union case Event.RegisterSucceeded: string * string -> Event
union case EventAction.PersistEvent: 'T -> EventAction<'T>
<summary> Persist the event to the journal. The actor's state will be updated using the event handler *after* persistence succeeds. </summary>
union case Option.Some: Value: 'T -> Option<'T>
union case Event.AlreadyRegistered: Event
union case EventAction.DeferEvent: 'T -> EventAction<'T>
<summary> Defer the event. It will be stashed and processed later, potentially after other events. </summary>
union case Command.Login: string -> Command
val password1: string
val password2: string
union case Event.LoginSucceeded: Event
union case Event.LoginFailed: Event
val applyEvent: event: Event<Event> -> state: State -> State
val event: Event<Event>
Event.EventDetails: Event
<summary> The specific details or payload of the event. </summary>
val init: env: 'a -> actorApi: IActor -> Akkling.Cluster.Sharding.EntityFac<obj> (requires 'a :> IConfigurationWrapper and 'a :> ILoggerFactoryWrapper)
val env: 'a (requires 'a :> IConfigurationWrapper and 'a :> ILoggerFactoryWrapper)
val actorApi: IActor
type IActor = abstract CreateCommandSubscription: (string -> IEntityRef<obj>) -> CID -> ActorId -> 'b -> ('c -> bool) -> Async<Event<'c>> abstract InitializeActor: 'a -> 'a0 -> string -> (Command<'c> -> 'a0 -> EventAction<'b>) -> (Event<'b> -> 'a0 -> 'a0) -> EntityFac<obj> (requires 'a :> IConfigurationWrapper and 'a :> ILoggerFactoryWrapper) abstract InitializeSaga: 'a -> SagaState<'SagaState,'State> -> (obj -> SagaState<'SagaState,'State> -> EventAction<'State>) -> (SagaState<'SagaState,'State> -> SagaStartingEvent<Event<'c>> option -> bool -> Effect * 'State option * ExecuteCommand list) -> (SagaState<'SagaState,'State> -> SagaState<'SagaState,'State>) -> string -> EntityFac<obj> (requires 'a :> IConfigurationWrapper and 'a :> ILoggerFactoryWrapper) abstract InitializeSagaStarter: (obj -> ((string -> IEntityRef<obj>) * PrefixConversion * obj) list) -> unit abstract Stop: unit -> Task abstract SubscribeForCommand: Command<'a,'b> -> Async<Event<'b>> abstract LoggerFactory: ILoggerFactory abstract Materializer: ActorMaterializer abstract Mediator: IActorRef abstract System: ActorSystem ...
<summary> Defines the core functionalities and context provided by the FCQRS environment to actors. This interface provides access to essential Akka.NET services and FCQRS initialization methods. </summary>
val initialState: State
abstract IActor.InitializeActor: 'a -> 'a0 -> string -> (Command<'c> -> 'a0 -> EventAction<'b>) -> (Event<'b> -> 'a0 -> 'a0) -> Akkling.Cluster.Sharding.EntityFac<obj> (requires 'a :> IConfigurationWrapper and 'a :> ILoggerFactoryWrapper)
val factory: env: 'a -> actorApi: IActor -> entityId: string -> Akkling.Cluster.Sharding.IEntityRef<obj> (requires 'a :> IConfigurationWrapper and 'a :> ILoggerFactoryWrapper)
val entityId: string
[<Literal>] val DEFAULT_SHARD: string = "default-shard"

Type something to start searching.