Saga
Sagas also like aggregates have a state. They work the same way but instead they take events and issue commands whereas aggregates take commands and issue events.
Sagas autostart and don't passivate, also need to implement an additional function called apply side effects.
Below image denotes saga starting process
Defining states
First thing we do is to define the states. There are some constraints though. The first two states must be NotStarted with initial event and Started. This ensures in any crash we can rescover and land in the right state. The rest of the states can be any name you like.
type State =
| NotStarted
| Started of SagaStartingEvent<Event<User.Event>>
| SendingMail of Mail
| Completed
We can also define a data that is accross state. Here it won't be used but we have to define it.
type SagaData = NA
And the initial state. Apply function doesn't do anything. It is just a placeholder for this particular example. In general , you can use Data as a cross state data, you want to share between states.
let initialState =
{ State = NotStarted
Data = (NA: SagaData) }
let apply (sagaState: SagaState<SagaData, State>) = sagaState
Define event handler
The event handler is the function that will be called when an event is received. These events will come from other aggregates or services. The event handler will be called with the event and the current state of the saga. Then event handler can decide the next state of the saga which issues the side effects.
let handleEvent (event: obj) (state: SagaState<SagaData, State>) =
match event, state with
| :? string as str, _ when str = "mail sent" ->
Completed |> StateChangedEvent
| :? (Common.Event<User.Event>) as { EventDetails = userEvent }, state ->
match userEvent, state with
| User.VerificationRequested(email, _, code), _ ->
SendingMail
{ To = email
Subject = "Your code"
Body = $"Your code is {code} !!" }
|> StateChangedEvent
| _ -> UnhandledEvent
| _ -> UnhandledEvent
The first event we handle is for demo cases is a simple string showing that the mail was sent. The second event is the verification requested event. This is the event that will be sent from the user aggregate. It will contain the email address and the code. We will use this information to send the email. The third event is the unhandled event. This is just a catch all for any other events that are not handled. Observe that for state changes we use StateChangedEvent. These state changes are persisted.
Define side effects handler
This is probably the most complex part of the saga. This is where you issue commands also can switch to next state. The first three parameters aren't required in general but here they denote external dependencies.
let applySideEffects
(actorRef: unit -> IActorRef<obj>)
env
userFactory
(sagaState: SagaState<SagaData, State>)
(startingEvent: option<SagaStartingEvent<_>>)
recovering =
The originator is the actor that started the saga. This is used to send messages back to the actor that started the saga.
let originator =
FactoryAndName
{ Factory = userFactory; Name = Originator }
match sagaState.State with
We must start with NotStarted and then move to Started. This is the first state of the saga. Note that at each stage we return a tuple of three values. The first value is saga's own internal side effect, the second value is the next state and the third value is the commands to be sent. The rest of the states can be any name you like.
NotStarted -> NoEffect, Some(Started startingEvent.Value), []
Below section is almost boilerplate. We check if recovering is true. If it is a rare case where the saga is recovering from a crash. Therefore we check if actually Aggregate resumed or not. Depending on the case we abort the saga or continue. More on this later.
| Started _ ->
if recovering then
let startingEvent = startingEvent.Value.Event
NoEffect,
None,
[ { TargetActor = originator
Command = ContinueOrAbort startingEvent
DelayInMs = None } ]
else
ResumeFirstEvent, None, []
Then we define what is going to happen when we switch to SendingMail state. Essentially sending a mail command to actorRef which is coming from SendMail
| SendingMail mail ->
NoEffect,
None,
[ { TargetActor = ActorRef(actorRef ())
Command = mail
DelayInMs = None } ]
Finally we define the completed state. This is the final state of the saga. It stops the saga . Notice the usage of StopActor internal effect.
| Completed ->
StopActor, None, []
Initilizers
In the init function we create a factory for our mail sender. Then pass it through the initialize Saga.
let init (env: _) (actorApi: IActor) =
let userFactory = User.factory env actorApi
let mailSenderRef =
fun () -> spawnAnonymous actorApi.System (props behavior) |> retype
actorApi.InitializeSaga
env
initialState
handleEvent
(applySideEffects mailSenderRef env userFactory)
apply
"UserSaga"
let factory (env: _) actorApi entityId =
(init env actorApi).RefFor DEFAULT_SHARD entityId
module System from Akkling
--------------------
namespace System
<summary> Contains common types like Events and Commands </summary>
<namespacedoc><summary>Functionality for Write Side.</summary></namespacedoc>
<summary> Contains types and functions related to the Saga Starter actor (internal implementation detail). </summary>
type State = | NotStarted | Started of SagaStartingEvent<Event<Event>> | SendingMail of Mail | Completed
--------------------
type State<'Command,'Event> = { CommandDetails: CommandDetails<'Command,'Event> Sender: IActorRef }
<summary> Wraps an event that is intended to start a saga. This is typically the message sent to a saga actor upon its creation. <typeparam name="'T">The type of the starting event payload.</typeparam> </summary>
module Event from Microsoft.FSharp.Control
--------------------
type Event<'EventDetails> = { EventDetails: 'EventDetails CreationDate: DateTime Id: MessageId option Sender: ActorId option CorrelationId: CID Version: Version } interface IMessageWithCID interface ISerializable member Equals: Event<'EventDetails> * IEqualityComparer -> bool override ToString: unit -> string
<summary> Represents an event generated by an aggregate actor as a result of processing a command. <typeparam name="'EventDetails">The specific type of the event payload.</typeparam> </summary>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: obj * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args>
--------------------
new: unit -> Event<'Delegate,'Args>
namespace System.Data
--------------------
namespace Microsoft.FSharp.Data
<summary> Represents the state of a saga instance. <typeparam name="'SagaData">The type of the custom data held by the saga.</typeparam> <typeparam name="'State">The type representing the saga's current state machine state (e.g., an enum or DU).</typeparam> </summary>
val string: value: 'T -> string
--------------------
type string = String
<summary> Indicate that the state of a saga has changed (used internally by sagas for persistence). </summary>
<summary> Represents an event generated by an aggregate actor as a result of processing a command. <typeparam name="'EventDetails">The specific type of the event payload.</typeparam> </summary>
<summary> Indicate that the command or event could not be handled in the current state. </summary>
<summary> Typed version of <see cref="IActorRef" /> interface. Allows to tell/ask using only messages of restricted type. </summary>
union case TargetActor.FactoryAndName: FactoryAndName -> TargetActor
<summary> Specifies the target using a factory function and name. </summary>
--------------------
type FactoryAndName = { Factory: obj Name: TargetName } member Equals: FactoryAndName * IEqualityComparer -> bool
<summary> Represents the information needed to locate or create a target actor, typically used within sagas. </summary>
<summary> Identify the target by its string name (entity ID). </summary>
<summary> Identify the target as the originator actor of the current saga process. </summary>
<summary> The current state machine state of the saga. </summary>
<summary> No specific side effect is required. </summary>
<summary> Represents the target of a command execution triggered by a saga. </summary>
module Command
--------------------
type Command<'CommandDetails> = { CommandDetails: 'CommandDetails CreationDate: DateTime Id: MessageId option Sender: ActorId option CorrelationId: CID } interface IMessageWithCID interface ISerializable member Equals: Command<'CommandDetails> * IEqualityComparer -> bool override ToString: unit -> string
<summary> Represents a command to be processed by an aggregate actor. <typeparam name="'CommandDetails">The specific type of the command payload.</typeparam> </summary>
--------------------
type Command<'Command,'Event> = | Execute of CommandDetails<'Command,'Event>
<summary> Represents the message sent to the internal subscription mechanism. <typeparam name="'Command">The type of the command payload.</typeparam> <typeparam name="'Event">The type of the expected event payload.</typeparam> </summary>
union case ContinueOrAbort.ContinueOrAbort: Event<'EventDetails> -> ContinueOrAbort<'EventDetails>
--------------------
type ContinueOrAbort<'EventDetails> = | ContinueOrAbort of Event<'EventDetails> interface ISerializable member Equals: ContinueOrAbort<'EventDetails> * IEqualityComparer -> bool
<summary> Instructs the saga to re-process the first event that started it (used for retries or specific flows). </summary>
<summary> Specifies the target using its direct IActorRef (usually boxed as obj). </summary>
<summary> Instructs the saga actor to stop itself gracefully. </summary>
<summary> Defines the core functionalities and context provided by the FCQRS environment to actors. This interface provides access to essential Akka.NET services and FCQRS initialization methods. </summary>
<summary> Spawns an anonymous actor with automatically generated name using specified actor <see cref="Props{Message}" />. </summary>
<param name="actorFactory">Either actor system or parent actor</param>
<param name="p">Used by actor for handling response for incoming request</param>
<summary> Gets the hosting ActorSystem. </summary>
<summary> Creates a props describing a way to incarnate actor with behavior described by <paramref name="receive" /> function. </summary>
<summary> Changes type of messages handled by provided typedRef, returning new typed actor ref. </summary>