OpenSleigh: tackling state persistence, part 2
Hi All! Welcome back to the second part of this Series on OpenSleigh. Today we’ll continue our discussion about Saga State persistence and we’ll also see some code.
Last time we started talking about the general flow and what OpenSleigh does when it receives a new message.
The important thing to understand here is that a Saga message handler can mutate the Saga State and publish other messages.
We have to make sure that all this happens in a single transaction.
We absolutely don’t want to lose any modification to the State and we also need to publish those new messages safely.
The core happens in the SagaRunner class in its RunAsync() method. Let’s go over a super simplified version, step by step:
public async Task RunAsync<TM>(TM message) where TM : IMessage { var (state, lockId) = await _sagaStateService.GetAsync(message); if (state.IsCompleted() || state.CheckWasProcessed(message)) return;
This first part takes care of fetching the Saga state from the Persistence layer using the Correlation ID of the message.
Internally, the SagaStateService will fetch an existing State. If missing, it’ll create a new one for us, but only if the message can start the current Saga. It will also take care of locking the State and preventing unwanted modifications. This is very important as we don’t want other Saga instances to process the same message concurrently.
var transaction = await _transactionManager.StartTransactionAsync(); try { var saga = _sagaFactory.Create(state); if (null == saga) throw new SagaNotFoundException($"unable to create Saga"); await saga.HandleAsync(message); state.SetAsProcessed(message); await _sagaStateService.SaveAsync(state, lockId); await transaction.CommitAsync(); } catch { await transaction.RollbackAsync(); throw; }
The next thing we do is starting a Transaction. Inside its scope, we do few interesting things:
- run the current message handler
- set the message as processed for the current Saga instance
- persist the Saga State
Point #2 is quite important, as we’ll see in another article, for two good reasons. First of all, we don’t want the same message to be processed by the same Saga (maybe running on another worker instance).
At the same time, however, some messages might be events instead of commands. This means that the very same message can be handled by multiple different Sagas.
If everything goes smoothly, we can finally commit the Transaction and call it a day. Otherwise, we roll it back and rethrow the exception.
That’s all for today, the next time we’ll keep discussing State persistence. We’ll see how OpenSleigh makes sure that outbound messages don’t get lost along with any Saga State modification.
Ciao!