User Aggregate
The User
aggregate is responsible for handling user registration and login commands. It maintains the state of the user, including the username and password.
In this example we will try to implement below flows
State, Command, and Event Types
open FCQRS.Common
type State =
{ Username: string option
Password: string option }
type Command =
| Login of string
| Register of string * string
type Event =
| LoginSucceeded
| LoginFailed
| RegisterSucceeded of string * string
| AlreadyRegistered
For each aggregate, we define State, Command, and Event types. The State
type represents the current state of the aggregate, while the Command
type defines the commands that can be sent to the aggregate.
The Event
type defines the events that can be emitted by the aggregate.
Command Handler
Next we define the command handler which these commands will come from the outside world.
let handleCommand (cmd: Command<_>) state =
match cmd.CommandDetails, state with
| Register(userName, password), { Username = None } ->
RegisterSucceeded(userName, password) |> PersistEvent
| Register _, { Username = Some _ } -> AlreadyRegistered |> DeferEvent
| Login password1,
{ Username = Some _
Password = Some password2 } when password1 = password2 ->
LoginSucceeded |> PersistEvent
| Login _, _ -> LoginFailed |> DeferEvent
The command handler takes a Command<_> object which wraps the command details. Command details is our defined command type. Notice the function either return PersistEvent or DeferEvent. PersistEvent is used to persist the event to the event store and send it to the subscribers. DeferEvent is actually same thing but we don't want to persist it. Even though we don't persist it allows us to wait until the event is generated from the command side.
It handles the Register
command by checking if the username is already registered. If not, it emits a RegisterSucceeded
event. If the username is already registered, it emits an AlreadyRegistered
event.
The Login
command is handled by checking if the username and password match. If they do, it emits a LoginSucceeded
event. If not, it emits a LoginFailed
event.
Event Handler
let applyEvent event state =
match event.EventDetails with
| RegisterSucceeded(userName, password) ->
{ state with
Username = Some userName
Password = Some password }
| _ -> state
Not much going on here. By the time applyEvent is called, the event is already persisted. So we just update the state based on the event. And the cycle continues with the new state when a future command is received.
Wiring to Akka.net
Finally some boilerplate code to bind the above functions to the actor. Also initiates a shard. "User" here is the shard region name acts like a type
let init (env: _) (actorApi: IActor) =
let initialState = { Username = None; Password = None }
actorApi.InitializeActor env initialState "User" handleCommand applyEvent
let factory (env: _) actorApi entityId =
(init env actorApi).RefFor DEFAULT_SHARD entityId
You will call factory to create or resume an actor. Actually clusterd sharded actors are eternal. They are never born and never die. You can pretend they exist since the beginning of time and will exist till the end of time. But you can stop them or after 2 minutes they will be passivated to save memory.
<summary> Contains common types like Events and Commands </summary>
<namespacedoc><summary>Functionality for Write Side.</summary></namespacedoc>
type State = { Username: string option Password: string option }
--------------------
type State<'Command,'Event> = { CommandDetails: CommandDetails<'Command,'Event> Sender: IActorRef }
val string: value: 'T -> string
--------------------
type string = System.String
module Command
--------------------
type Command = | Login of string | Register of string * string
--------------------
type Command<'CommandDetails> = { CommandDetails: 'CommandDetails CreationDate: DateTime Id: MessageId option Sender: ActorId option CorrelationId: CID } interface IMessageWithCID interface ISerializable member Equals: Command<'CommandDetails> * IEqualityComparer -> bool override ToString: unit -> string
<summary> Represents a command to be processed by an aggregate actor. <typeparam name="'CommandDetails">The specific type of the command payload.</typeparam> </summary>
--------------------
type Command<'Command,'Event> = | Execute of CommandDetails<'Command,'Event>
<summary> Represents the message sent to the internal subscription mechanism. <typeparam name="'Command">The type of the command payload.</typeparam> <typeparam name="'Event">The type of the expected event payload.</typeparam> </summary>
module Event from Microsoft.FSharp.Control
--------------------
type Event = | LoginSucceeded | LoginFailed | RegisterSucceeded of string * string | AlreadyRegistered
--------------------
type Event<'EventDetails> = { EventDetails: 'EventDetails CreationDate: DateTime Id: MessageId option Sender: ActorId option CorrelationId: CID Version: Version } interface IMessageWithCID interface ISerializable member Equals: Event<'EventDetails> * IEqualityComparer -> bool override ToString: unit -> string
<summary> Represents an event generated by an aggregate actor as a result of processing a command. <typeparam name="'EventDetails">The specific type of the event payload.</typeparam> </summary>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: obj * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args>
--------------------
new: unit -> Event<'Delegate,'Args>
<summary> The specific details or payload of the command. </summary>
<summary> Persist the event to the journal. The actor's state will be updated using the event handler *after* persistence succeeds. </summary>
<summary> Defer the event. It will be stashed and processed later, potentially after other events. </summary>
<summary> The specific details or payload of the event. </summary>
<summary> Defines the core functionalities and context provided by the FCQRS environment to actors. This interface provides access to essential Akka.NET services and FCQRS initialization methods. </summary>