Header menu logo FCQRS

Saga

Sagas also like aggregates have a state. They work the same way but instead they take events and issue commands whereas aggregates take commands and issue events. Sagas autostart and don't passivate, also need to implement an additional function called apply side effects. Below image denotes saga starting process
Saga Starter

Defining states

First thing we do is to define the states. There are some constraints though. The first two states must be NotStarted with initial event and Started. This ensures in any crash we can rescover and land in the right state. The rest of the states can be any name you like.

type State =
    | NotStarted
    | Started of SagaStartingEvent<Event<User.Event>>
    | SendingMail of Mail
    | Completed

We can also define a data that is accross state. Here it won't be used but we have to define it.

type SagaData = NA

And the initial state. Apply function doesn't do anything. It is just a placeholder for this particular example. In general , you can use Data as a cross state data, you want to share between states.

let initialState =
    { State = NotStarted
      Data = (NA: SagaData) }


      
let apply (sagaState: SagaState<SagaData, State>) = sagaState

Define event handler

The event handler is the function that will be called when an event is received. These events will come from other aggregates or services. The event handler will be called with the event and the current state of the saga. Then event handler can decide the next state of the saga which issues the side effects.

let handleEvent (event: obj) (state: SagaState<SagaData, State>) = 
    match event, state with

    | :? string as str, _ when str = "mail sent" ->
        Completed |> StateChangedEvent

    | :? (Common.Event<User.Event>) as { EventDetails = userEvent }, state ->
        match userEvent, state with
        | User.VerificationRequested(email, _, code), _ ->
            SendingMail
                { To = email
                  Subject = "Your code"
                  Body = $"Your code is {code} !!" }
            |> StateChangedEvent
        | _ -> UnhandledEvent


    | _ -> UnhandledEvent

The first event we handle is for demo cases is a simple string showing that the mail was sent. The second event is the verification requested event. This is the event that will be sent from the user aggregate. It will contain the email address and the code. We will use this information to send the email. The third event is the unhandled event. This is just a catch all for any other events that are not handled. Observe that for state changes we use StateChangedEvent. These state changes are persisted.

Define side effects handler

This is probably the most complex part of the saga. This is where you issue commands also can switch to next state. The first three parameters aren't required in general but here they denote external dependencies.

let applySideEffects
    (actorRef: unit -> IActorRef<obj>)
    env
    userFactory
    (sagaState: SagaState<SagaData, State>)
    (startingEvent: option<SagaStartingEvent<_>>)
    recovering =

The originator is the actor that started the saga. This is used to send messages back to the actor that started the saga.

    let originator =
            FactoryAndName
                { Factory = userFactory; Name = Originator }

    match sagaState.State with

We must start with NotStarted and then move to Started. This is the first state of the saga. Note that at each stage we return a tuple of three values. The first value is saga's own internal side effect, the second value is the next state and the third value is the commands to be sent. The rest of the states can be any name you like.

     NotStarted -> NoEffect, Some(Started startingEvent.Value), []

Below section is almost boilerplate. We check if recovering is true. If it is a rare case where the saga is recovering from a crash. Therefore we check if actually Aggregate resumed or not. Depending on the case we abort the saga or continue. More on this later.

    | Started _ ->
        if recovering then
            let startingEvent = startingEvent.Value.Event

            NoEffect,
            None,
            [ { TargetActor = originator
                Command = ContinueOrAbort startingEvent
                DelayInMs = None } ]
        else
            ResumeFirstEvent, None, []

Then we define what is going to happen when we switch to SendingMail state. Essentially sending a mail command to actorRef which is coming from SendMail

    | SendingMail mail ->
        NoEffect,
        None,
        [ { TargetActor = ActorRef(actorRef ())
            Command = mail
            DelayInMs = None } ]

Finally we define the completed state. This is the final state of the saga. It stops the saga . Notice the usage of StopActor internal effect.

    | Completed ->
        StopActor, None, []

Initilizers

In the init function we create a factory for our mail sender. Then pass it through the initialize Saga.

let init (env: _) (actorApi: IActor) =
    let userFactory = User.factory env actorApi

    let mailSenderRef =
        fun () -> spawnAnonymous actorApi.System (props behavior) |> retype

    actorApi.InitializeSaga
        env
        initialState
        handleEvent
        (applySideEffects mailSenderRef env userFactory)
        apply
        "UserSaga"


let factory (env: _) actorApi entityId =
    (init env actorApi).RefFor DEFAULT_SHARD entityId
Multiple items
module System from Akkling

--------------------
namespace System
namespace System.IO
namespace Microsoft
namespace Microsoft.Extensions
namespace Microsoft.Extensions.Configuration
namespace Hocon
namespace Hocon.Extensions
namespace Hocon.Extensions.Configuration
namespace FCQRS
module Common from FCQRS
<summary> Contains common types like Events and Commands </summary>
<namespacedoc><summary>Functionality for Write Side.</summary></namespacedoc>
namespace System
module SagaStarter from FCQRS.Common
<summary> Contains types and functions related to the Saga Starter actor (internal implementation detail). </summary>
namespace Akkling
module SendMail
Multiple items
type State = | NotStarted | Started of SagaStartingEvent<Event<Event>> | SendingMail of Mail | Completed

--------------------
type State<'Command,'Event> = { CommandDetails: CommandDetails<'Command,'Event> Sender: IActorRef }
type SagaStartingEvent<'T> = { Event: 'T } interface ISerializable member Equals: SagaStartingEvent<'T> * IEqualityComparer -> bool
<summary> Wraps an event that is intended to start a saga. This is typically the message sent to a saga actor upon its creation. &lt;typeparam name="'T"&gt;The type of the starting event payload.&lt;/typeparam&gt; </summary>
Multiple items
module Event from Microsoft.FSharp.Control

--------------------
type Event<'EventDetails> = { EventDetails: 'EventDetails CreationDate: DateTime Id: MessageId option Sender: ActorId option CorrelationId: CID Version: Version } interface IMessageWithCID interface ISerializable member Equals: Event<'EventDetails> * IEqualityComparer -> bool override ToString: unit -> string
<summary> Represents an event generated by an aggregate actor as a result of processing a command. &lt;typeparam name="'EventDetails"&gt;The specific type of the event payload.&lt;/typeparam&gt; </summary>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: obj * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args>

--------------------
new: unit -> Event<'Delegate,'Args>
module User
type Event = | LoginSucceeded | LoginFailed | AlreadyRegistered | VerificationRequested of string * string | VerificationCodeSet of string | Verified member Equals: Event * IEqualityComparer -> bool member IsAlreadyRegistered: bool member IsLoginFailed: bool member IsLoginSucceeded: bool member IsVerificationCodeSet: bool member IsVerificationRequested: bool member IsVerified: bool
type Mail = { To: string Subject: string Body: string } member Equals: Mail * IEqualityComparer -> bool
type SagaData = | NA
union case SagaData.NA: SagaData
val initialState: SagaState<SagaData,State>
union case State.NotStarted: State
Multiple items
namespace System.Data

--------------------
namespace Microsoft.FSharp.Data
val apply: sagaState: SagaState<SagaData,State> -> SagaState<SagaData,State>
val sagaState: SagaState<SagaData,State>
type SagaState<'SagaData,'State> = { Data: 'SagaData State: 'State } interface ISerializable member Equals: SagaState<'SagaData,'State> * IEqualityComparer -> bool
<summary> Represents the state of a saga instance. &lt;typeparam name="'SagaData"&gt;The type of the custom data held by the saga.&lt;/typeparam&gt; &lt;typeparam name="'State"&gt;The type representing the saga's current state machine state (e.g., an enum or DU).&lt;/typeparam&gt; </summary>
val handleEvent: event: obj -> state: SagaState<SagaData,State> -> EventAction<State>
val event: obj
type obj = Object
val state: SagaState<SagaData,State>
Multiple items
val string: value: 'T -> string

--------------------
type string = String
val str: string
union case State.Completed: State
union case EventAction.StateChangedEvent: 'T -> EventAction<'T>
<summary> Indicate that the state of a saga has changed (used internally by sagas for persistence). </summary>
type Event<'EventDetails> = { EventDetails: 'EventDetails CreationDate: DateTime Id: MessageId option Sender: ActorId option CorrelationId: CID Version: Version } interface IMessageWithCID interface ISerializable member Equals: Event<'EventDetails> * IEqualityComparer -> bool override ToString: unit -> string
<summary> Represents an event generated by an aggregate actor as a result of processing a command. &lt;typeparam name="'EventDetails"&gt;The specific type of the event payload.&lt;/typeparam&gt; </summary>
val userEvent: User.Event
union case User.Event.VerificationRequested: string * string -> User.Event
val email: string
val code: obj
union case State.SendingMail: Mail -> State
union case EventAction.UnhandledEvent: EventAction<'T>
<summary> Indicate that the command or event could not be handled in the current state. </summary>
val applySideEffects: actorRef: (unit -> IActorRef<obj>) -> env: 'a -> userFactory: 'b -> sagaState: SagaState<SagaData,State> -> startingEvent: SagaStartingEvent<Event<User.Event>> option -> recovering: bool -> Effect * State option * ExecuteCommand list
val actorRef: (unit -> IActorRef<obj>)
type unit = Unit
type IActorRef<'Message> = inherit IComparable inherit ISurrogated inherit IComparable<IActorRef<'Message>> inherit IEquatable<IActorRef<'Message>> inherit ICanTell<'Message> inherit IInternalTypedActorRef abstract Forward: 'Message -> unit abstract Retype: unit -> IActorRef<'T> abstract Path: ActorPath
<summary> Typed version of <see cref="IActorRef" /> interface. Allows to tell/ask using only messages of restricted type. </summary>
val env: 'a
val userFactory: 'b
val startingEvent: SagaStartingEvent<Event<User.Event>> option
type 'T option = Option<'T>
val recovering: bool
val originator: TargetActor
Multiple items
union case TargetActor.FactoryAndName: FactoryAndName -> TargetActor
<summary> Specifies the target using a factory function and name. </summary>

--------------------
type FactoryAndName = { Factory: obj Name: TargetName } member Equals: FactoryAndName * IEqualityComparer -> bool
<summary> Represents the information needed to locate or create a target actor, typically used within sagas. </summary>
union case TargetName.Name: string -> TargetName
<summary> Identify the target by its string name (entity ID). </summary>
union case TargetName.Originator: TargetName
<summary> Identify the target as the originator actor of the current saga process. </summary>
SagaState.State: State
<summary> The current state machine state of the saga. </summary>
union case Effect.NoEffect: Effect
<summary> No specific side effect is required. </summary>
union case Option.Some: Value: 'T -> Option<'T>
union case State.Started: SagaStartingEvent<Event<User.Event>> -> State
property Option.Value: SagaStartingEvent<Event<User.Event>> with get
val startingEvent: Event<User.Event>
union case Option.None: Option<'T>
type TargetActor = | FactoryAndName of FactoryAndName | ActorRef of obj | Sender | Self member Equals: TargetActor * IEqualityComparer -> bool member IsActorRef: bool member IsFactoryAndName: bool member IsSelf: bool member IsSender: bool
<summary> Represents the target of a command execution triggered by a saga. </summary>
Multiple items
module Command

--------------------
type Command<'CommandDetails> = { CommandDetails: 'CommandDetails CreationDate: DateTime Id: MessageId option Sender: ActorId option CorrelationId: CID } interface IMessageWithCID interface ISerializable member Equals: Command<'CommandDetails> * IEqualityComparer -> bool override ToString: unit -> string
<summary> Represents a command to be processed by an aggregate actor. &lt;typeparam name="'CommandDetails"&gt;The specific type of the command payload.&lt;/typeparam&gt; </summary>

--------------------
type Command<'Command,'Event> = | Execute of CommandDetails<'Command,'Event>
<summary> Represents the message sent to the internal subscription mechanism. &lt;typeparam name="'Command"&gt;The type of the command payload.&lt;/typeparam&gt; &lt;typeparam name="'Event"&gt;The type of the expected event payload.&lt;/typeparam&gt; </summary>
Multiple items
union case ContinueOrAbort.ContinueOrAbort: Event<'EventDetails> -> ContinueOrAbort<'EventDetails>

--------------------
type ContinueOrAbort<'EventDetails> = | ContinueOrAbort of Event<'EventDetails> interface ISerializable member Equals: ContinueOrAbort<'EventDetails> * IEqualityComparer -> bool
union case Effect.ResumeFirstEvent: Effect
<summary> Instructs the saga to re-process the first event that started it (used for retries or specific flows). </summary>
val mail: Mail
union case TargetActor.ActorRef: obj -> TargetActor
<summary> Specifies the target using its direct IActorRef (usually boxed as obj). </summary>
union case Effect.StopActor: Effect
<summary> Instructs the saga actor to stop itself gracefully. </summary>
val init: env: 'a -> actorApi: IActor -> Cluster.Sharding.EntityFac<obj> (requires 'a :> IConfigurationWrapper and 'a :> ILoggerFactoryWrapper)
val env: 'a (requires 'a :> IConfigurationWrapper and 'a :> ILoggerFactoryWrapper)
val actorApi: IActor
type IActor = abstract CreateCommandSubscription: (string -> IEntityRef<obj>) -> CID -> ActorId -> 'b -> ('c -> bool) -> Async<Event<'c>> abstract InitializeActor: 'a -> 'a0 -> string -> (Command<'c> -> 'a0 -> EventAction<'b>) -> (Event<'b> -> 'a0 -> 'a0) -> EntityFac<obj> (requires 'a :> IConfigurationWrapper and 'a :> ILoggerFactoryWrapper) abstract InitializeSaga: 'a -> SagaState<'SagaState,'State> -> (obj -> SagaState<'SagaState,'State> -> EventAction<'State>) -> (SagaState<'SagaState,'State> -> SagaStartingEvent<Event<'c>> option -> bool -> Effect * 'State option * ExecuteCommand list) -> (SagaState<'SagaState,'State> -> SagaState<'SagaState,'State>) -> string -> EntityFac<obj> (requires 'a :> IConfigurationWrapper and 'a :> ILoggerFactoryWrapper) abstract InitializeSagaStarter: (obj -> ((string -> IEntityRef<obj>) * PrefixConversion * obj) list) -> unit abstract Stop: unit -> Task abstract SubscribeForCommand: Command<'a,'b> -> Async<Event<'b>> abstract LoggerFactory: ILoggerFactory abstract Materializer: ActorMaterializer abstract Mediator: IActorRef abstract System: ActorSystem ...
<summary> Defines the core functionalities and context provided by the FCQRS environment to actors. This interface provides access to essential Akka.NET services and FCQRS initialization methods. </summary>
val userFactory: (string -> Cluster.Sharding.IEntityRef<obj>)
val factory: env: 'a -> actorApi: IActor -> entityId: string -> Cluster.Sharding.IEntityRef<obj> (requires 'a :> IConfigurationWrapper and 'a :> ILoggerFactoryWrapper)
val mailSenderRef: unit -> IActorRef<'b>
val spawnAnonymous: actorFactory: Akka.Actor.IActorRefFactory -> p: Props<'Message> -> IActorRef<'Message>
<summary> Spawns an anonymous actor with automatically generated name using specified actor <see cref="Props{Message}" />. </summary>
<param name="actorFactory">Either actor system or parent actor</param>
<param name="p">Used by actor for handling response for incoming request</param>
property IActor.System: Akka.Actor.ActorSystem with get
<summary> Gets the hosting ActorSystem. </summary>
val props: receive: (Actor<'Message> -> Effect<'Message>) -> Props<'Message>
<summary> Creates a props describing a way to incarnate actor with behavior described by <paramref name="receive" /> function. </summary>
val behavior: m: Actor<obj> -> Effect<obj>
val retype: typedRef: IActorRef<'T> -> IActorRef<'U>
<summary> Changes type of messages handled by provided typedRef, returning new typed actor ref. </summary>
abstract IActor.InitializeSaga: 'a -> SagaState<'SagaState,'State> -> (obj -> SagaState<'SagaState,'State> -> EventAction<'State>) -> (SagaState<'SagaState,'State> -> SagaStartingEvent<Event<'c>> option -> bool -> Effect * 'State option * ExecuteCommand list) -> (SagaState<'SagaState,'State> -> SagaState<'SagaState,'State>) -> string -> Cluster.Sharding.EntityFac<obj> (requires 'a :> IConfigurationWrapper and 'a :> ILoggerFactoryWrapper)
val entityId: string
[<Literal>] val DEFAULT_SHARD: string = "default-shard"

Type something to start searching.