Skip to main content

Saga Pattern

ℹ️note

The Saga pattern is only available for Domain Services (Java) based on Java Spring Boot Stack 2.0

Setup local profile

ℹ️note

If you want to use the Saga pattern in your project for implementing transactional use cases, the application-local.yaml needs to have additional configurations. The Saga extension is implemented by Apache Camel which requires the below defined settings:

camel:
lra:
coordinator-url: <camel-server-url>
local-participant-url: ${k5.sdk.springboot.server.baseurl}/camel
local-participant-context-path: ""
ℹ️note

Please consider, that the LRA coordinator might not be able to trigger the callbacks if your service is running locally

Implement Saga Orchestrator

A Saga Orchestrator manages a chain of transactions modeled with the help of participants. For each domain service marked as Saga orchestrator, an implementation stub is generated under "src/main/java/yourProject/domain/yourDomain/service/YourOrchestrator.java"

  • beforeSaga
    • will be triggered before all Participants of this service are executed
    • provides access to the input of the Orchestrator if it was modelled
    • provides access to the SagaContext object
    • lra-id is accessible through SagaContext object
  • afterSaga
    • will be triggered after all Participants of this service are executed
    • provides access to the input of the Orchestrator if it was modelled
    • defines the output of the Orchestrator if it was modelled
    • will not be triggered if one of the Participants failed with an Exception
    • will be generated only if all Participants have no output
  • onComplete
    • covers extra logic what to do on successful completion
    • only exists if On Complete Method was selected while modelling
    • will be called automatically by the lra-coordinator when the Saga completes successfully (asynchronously)
    • if this logic fails, the lra-coordinator will keep triggering it
  • onCompensate
    • covers extra compensation logic for the whole saga
    • only exists if On Compensate Method was selected while modelling
    • will be called automatically by the lra-coordinator when the Saga fails (asynchronously)
    • if this logic fails, the lra-coordinator will keep triggering it

Example:

// Example of orchestrator implementation
@Override
public void beforeSaga(CreditCard input, SagaContext context) {
log.info("Orchestrator01.beforeSaga() for lra {}", context.getLraId());
}

@Override
protected CreditCards afterSaga(CreditCard input) {
log.info("Orchestrator01.afterSaga()");
this.repo.getCc().getCard().insert(input);

return this.repo.getCc().getCard().findAll();
}

@Override
public void onComplete(SagaContext context) {
log.info("Orchestrator01.complete() for lra {}", context.getLraId());
}

@Override
public void onCompensate(SagaContext context) {
log.info("Orchestrator01.compensate() for lra {}", context.getLraId());
}

In order to execute the Orchestrator, you have to trigger the execute or executeAsync method, e.g. from an API operation and set the input if needed:

@Autowired
Orchestrator01 orchestrator;

@Override
public ResponseEntity<Void> executeSagaOrchestrator(String id) {

// create input for Saga
GetByIdInput input = new GetByIdInputBuilder() //
.setExternalID(id) //
.build();

// trigger Saga execution
orchestrator.execute(input); // Synchronous call
// or
orchestrator.executeAsync(input); // Asynchronous call

return new ResponseEntity<Void>(HttpStatusCode.valueOf(202));
}

The executeAsync method runs asynchronously and does not block the request, allowing for non-blocking execution of the Orchestrator.

Overwrite AggregationStrategy

If you have a participant output defined, the last participant will be used for the aggregated result (see UseLatestAggregationStrategy). You may override this behavior and implement it differently by overriding the aggregationStrategy() method the Orchestrator.

@Override
public AggregationStrategy aggregationStrategy() {
// return your custom AggregationStrategy
...
}

Override Configuration for Saga Orchestrator

To customize the behavior of the Saga Orchestrator, you can override specific configuration methods. These methods allow you to define how participants are processed and how the overall Saga definition is configured.

  • configureSagaDefinition: The 'configureSagaDefinition' method can be overridden to specify the completion mode, propagation behavior, and compensation and completion mechanishms for the Saga Orchestrator. This method gives you control over key aspects of the Saga definition, including execution and error handling.

Example:

@Override
protected SagaDefinition configureSagaDefinition(SagaDefinition sagaDefinition) {
return sagaDefinition
.completionMode(SagaCompletionMode.AUTO)
.propagation(SagaPropagation.REQUIRED)
.compensation(SagaParticipantsRegistry.DOMAINSAGA_SAGAORCHESTRATOR_COMPENSATE)
.completion(SagaParticipantsRegistry.DOMAINSAGA_SAGAORCHESTRATOR_COMPLETE);
}
  • configureParticipantsExecution: You can override the 'configureParticipantsExecution' method to customize the processing of Saga participants. This allows you to define execution order, parallel processing, and other participant-specific strategies.

Example:

@Override
protected ProcessorDefinition<?> configureParticipantsExecution(SagaDefinition sagaDefinition) {
return sagaDefinition
.multicast(aggregationStrategy())
.parallelProcessing()
.to(SagaParticipantsRegistry.DOMAINSAGA_SAGAPARTICIPANT01_EXECUTE)
.to(SagaParticipantsRegistry.DOMAINSAGA_SAGAPARTICIPANT02_EXECUTE)
.end();
}

Implement custom Saga orchestrator

To implement a custom orchestrator, for example to provide your own Camel configuration, you need to create a new orchestrator class that implements the orchestrator interface, see following example:

public class CustomSagaServiceOrchestrator implements SagaServiceOrchestratorInterface {

protected final CamelContext ctx;
protected final SagaContextHolder sagaContextHolder;
protected final Tracer tracer;
protected ProducerTemplate template;

private static final Logger log = LoggerFactory.getLogger(SagaServiceOrchestrator.class);

public CustomSagaServiceOrchestrator(CamelContext ctx, SagaContextHolder sagaContextHolder, Tracer tracer) {
super(ctx, sagaContextHolder, tracer);
this.ctx = ctx;
this.sagaContextHolder = sagaContextHolder;
this.tracer = tracer;
}
// implement your custom orchestrator here
YourSagaOrchestratorOutputIfExists execute(YourSagaInputIfExist);

CompletableFuture<Object> executeAsync(YourSagaInputIfExist);
...
}

Additionally, you need to implement a custom configuration, which returns an instance of the custom orchestrator. Method return type can be the interface or the custom concrete class.

@Configuration
@Primary
public class CustomOrchestratorConfig {
@Bean
public SagaServiceOrchestratorInterface createCustomOrchestrator(CamelContext ctx, SagaContextHolder sch, Tracer tracer)
throws Exception {
CustomSagaServiceOrchestrator res = new CustomSagaServiceOrchestrator(ctx, sch, tracer);
return res;
}
}

In the stub implementation file of your designed API, make sure to autowire the new orchestrator. The qualifier for the provided orchestrator has to match the method name of the above configuration.

/**
* A stub that provides implementation for your API
*/
@SuppressWarnings("unused")
@Service
@ComponentScan(basePackages = "k5.springboot.your.package")
public class ExampleSagaApiProvider implements ExampleSagaApiDelegate {

private final SagaServiceOrchestrator orchestrator;

@Autowired
public ExampleSagaApiProvider(@Qualifier("createCustomOrchestrator") SagaServiceOrchestrator sagaOrch) {
this.orchestrator = sagaOrch;
}

@Override
public ResponseEntity<Void> executeSaga() {
orchestrator.execute();
return ResponseEntity.status(200).build();
}
}

Implement Saga participant

For each Domain Service marked as Saga Participant, an implementation stub is generated under "src/main/java/yourProject/domain/yourDomain/service/YourParticipant.java" which comes with three methods:

  • execute
    • covers execution logic of the Participant
  • onComplete
    • covers extra logic what to do on successful completion of the Saga
    • only exists if On Complete Method was selected while modelling
    • will be called automatically by the lra-coordinator when the Saga completes successfully (asynchronously)
    • if this logic fails, the lra-coordinator will keep triggering it
  • onCompensate
    • covers compensation logic (e.g. undoing what happened in the execute) for the participant
    • only exists if On Compensate Method was selected while modelling
    • will be called automatically by the lra-coordinator when the Saga fails (asynchronously)
    • if this logic fails, the lra-coordinator will keep triggering it

Example:

@Service("sagadomainspace_Partcipant")
public class Partcipant extends PartcipantBase {

private static final Logger log = LoggerFactory.getLogger(Partcipant.class);

public Partcipant(DomainEntityBuilder entityBuilder, Repository repo) {
super(entityBuilder, repo);
}

// Example of participant implementation
@Override
public Balance execute(SagaContext context, CreditCard creditCard) throws CreditCardNotFoundError {
log.info("Participant.execute()");

// Use repository to get card instance
Optional<Card> cardRootEntity = this.repo.getCc().getCard().findById(creditCard.getId());

// Use Domain Entity Builder from base class to create an instance of Balance entity
Balance balance = this.entityBuilder.getCc().getBalance().build();
if(cardRootEntity.isPresent()) {
balance.setAvaliableBalance(cardRootEntity.getBalance());
balance.setHoldAmount(cardRootEntity.getHoldAmount());
} else {
String errorMessage = String.format("Credit card with id %s not found", creditCard.getId());
throw new CreditCardNotFoundError(errorMessage);
}
return balance;
}

The LRA id and exchange can be accessed from any Saga participant service using the provided SagaContext:

@Override
public SagaOutput execute(
SagaInput sagaInput,
SagaContext context
) {
log.info("SagaParticipant01.execute()");
log.info("From exchange " + context.getExchange().getIn().getBody(SagaInput.class).getMessage());

return sagaOutput;
}

@Override
public void onComplete(SagaContext context) {
// how LRA can be accessed from context
log.info("Participant.onComplete() for lra {}", context.getLraId());
}

@Override
public void onCompensate(SagaContext context) {
log.info("Participant.onCompensate()");
}

Implement Saga across multiple services

To share a Saga context across multiple services, it is helpful to use the Saga API Header, which marks the API operation as a Saga participant. To do that, the Saga Pattern Participant flag needs to be set in the operation while modelling.

To establish a cross-service Saga use case, the API operation is meant to again trigger the execution of an Orchestrator, which will be automatically executed in the proper Saga context.

Marking the API operation as Saga Pattern Participant will have the following effects:

  • A Long-Running-Action (LRA) header will be added automatically to the operation headers
  • Any Orchestrator triggered from this API Operation will automatically apply the Saga context from the header
  • The LRA header value can be accessed from the NativeWebRequest in the API Operation

Example:


Orchestrator1 orchestrator;
@Override
public ResponseEntity<Void> apiOperation() {

// trigger Saga execution --> saga context is automatically applied
orchestrator.execute();

// optional: get saga header from request
Optional<NativeWebRequest> nativeWebRequest = getRequest();
nativeWebRequest.ifPresent((request) -> {
request.getHeader(SagaConstants.Saga_LONG_RUNNING_ACTION);
});

return ResponseEntity.ok(null);
}
  • When invoking a call against an API operation with a LRA header, the value will be automatically read from the current Saga context and used as header value
  • If needed, the header value can be overwritten with a custom LRA header value

Example:

  @Override
public void execute(SagaContext context) {
log.info("Integration service.execute()");

HttpHeaders headerParams = new HttpHeaders();

// overwrite saga header with a custom header (usually not needed as it is automatically filled from the current saga context)
headerParams.add(SagaConstants.Saga_LONG_RUNNING_ACTION, "CustomLRAHeaderValue");

integrationService.integrationServiceOperation(headerParams);
}

Overwrite CamelConfiguration

In some use cases it might be necessary to apply a custom modification to your camel configuration (for example to use a different LRASagaService). To do so please create a new class with relevant name and add @Configuration annotation to the added class. Then, please define a CamelSagaService bean with your custom modifications.

Below is an example configuration demonstrating how to use a custom CamelSagaService to modify token propagation behavior based on feature flags:

@Configuration
public class CustomCamelConfiguration {
@Bean
public CamelSagaService camelSagaService(
CamelContext cc,
LraServiceConfiguration configuration,
SagaContextHolder sagaContextHolder,
Environment env
) throws Exception {
LRASagaService service;
if (FeatureFlagUtils.isActiveFeature("feature.secure-narayana.enabled", env)) {
service = new AuthenticatingLRASagaService(sagaContextHolder);
} else {
service = new LRASagaService();
}
service.setCoordinatorUrl(configuration.getCoordinatorUrl());
service.setCoordinatorContextPath(configuration.getCoordinatorContextPath());
service.setLocalParticipantUrl(configuration.getLocalParticipantUrl());
service.setLocalParticipantContextPath(configuration.getLocalParticipantContextPath());

cc.addService(service);

return service;
}
}

Token Propagation and Fallback Mechanism

If the LRA coordinator is configured as secure, the application propagates the Authorization Header (e.g., a JWT token) to the coordinator to authenticate requests. If the Authorization Header is missing or invalid, the application falls back to using a technical user token.

To enable the fallback mechanism, configure an additional OAuth client capable of issuing tokens for the technical user. Use the following settings (values can be retrieved from the secret dashboard-oauth-client-secret ):

spring:
security:
oauth2:
client:
provider:
<the_client_registration_id>:
token-uri: "<the_token_uri>"
jwk-set-uri: "<the_jwk_set_uri>"
registration:
<the_client_registration_id>:
client-id: "<the_client_id>"
client-secret: "<the_client_secret>"
authorizationGrantType: "client_credentials"
overwriteParameter:
audience: "<https://your-api.example.com>"
scope: "<custom-scope-value>"

k5:
sdk:
springboot:
oidc:
clientRegistrationId: "<the_client_registration_id>"
ℹ️note

For compatibility with different Identity Providers, you can specify additional parameters such as audience and scope using the overwriteParameter property. Some providers require a specific audience value to identify the intended API, and a custom scope to grant the necessary permissions. This allows flexible, provider-specific adjustments.

If your Identity Provider does not require additional parameters, you can simply omit the overwriteParameter section from the configuration.

ℹ️note

If the LRA coordinator is not configured as secure, token propagation and the fallback mechanism are not required.