Skip to main content

Business Events: Implement Events and Agents

Publish Events​

Using the Business Event Support extension, additional code for easily publishing events will be generated.

For creating the event and setting the payload the event builder can be used. The event producer service and the built-in event publishing offers capabilities to publish the event to the Kafka topic which was specified at the event while modelling.

Event builder​

The generated SDK provides Builders for Events which can be easily used to construct events. It allows to set the payload of the event using the setPayload method. Depending on what has been defined as payload, it allows to use a schema from the schema registry for defining a payload or another entity (deprecated, see Events 1.0).

// Importing overall Event Builder
import de.knowis.cards.sdk.domain.facade.DomainEventBuilder;

// Declare and Inject Domain Event Builder
@Autowired
private DomainEventBuilder eventBuilder;

// Create a new instance of the event that should be published
CardCreatedEvent event = this.eventBuilder.getCc().getCardCreatedEvent().build();

// Create the payload for the event based on a schema and set it
SuccessSchema payloadSchema = new SuccessSchema();
payloadSchema.successful(true);
event.setPayload(payloadSchema);

/**
* Event 1.0 with entity payload
* @deprecated use schemas from the schema registry instead
*/
SuccessEventPayload payloadEntity = entityBuilder.getCc().successEventPayload().build();
event.setPayload(payloadEntity);

💡tip

DomainEventBuilder is also accessible in project generated implementation files (services, commands, agents) derived from their base classes.

Event producer service​

The autogenerated EventProducerService allows to easily publish business events to Kafka topics. The topic which is used is the one, that has been defined for the event during modelling. When using it, the connection to the Kafka will be automatically set up and does not require further code.

When publishing the event, it is possible to set custom headers and a custom message key (optional).

// Declare Event Producer Service
@Autowired
private EventProducerService eventProdcuer;

// Create business event
SuccessEvent event = eventBuilder.getCc().successEvent().build();

// set event payload
// ...

// Publish business event
eventProducer.publish(event);

// Alternative 1: publish the event with custom messageKey
eventProducer.publish(event, "customMessageKey");

// Alternative 2: publish the event with messageHeaders
HashMap<String, Object> map = new HashMap();
map.put("headerKey", "headerValue");
MessageHeaders headers = new MessageHeaders(map);
eventProducer.publish(event, headers);

// Alternative 3: publish the event with messageKey and messageHeaders
eventProducer.publish(event, "customMessageKey", headers);

⚠warning

Custom message keys will be ignored in case of publishing event 1.0 (Entity payload).

Built-in event publishing​

If the event is assigned to a command, service or agent, while modelling the generated base class of those will additionally provide a method to publish the event easily. Also here it is possible to customize the publishing by setting a custom message key and custom headers


// Create business event
SuccessEvent event = eventBuilder.getCc().successEvent().build();

// set event payload
// ...

// Publish business event
this.publishSuccessEvent(event);

// Alternative 1: publish the event with custom messageKey
this.publishSuccessEvent(event, "customMessageKey");

// Alternative 2: publish the event with messageHeaders
HashMap<String, Object> map = new HashMap();
map.put("headerKey", "headerValue");
MessageHeaders headers = new MessageHeaders(map);
this.publishSuccessEvent(event, headers);

// Alternative 3: publish the event with messageKey and messageHeaders
this.publishSuccessEvent(event, "customMessageKey", headers);

Full implementation example​

Implementation example for publishing events assigned to command, service or agent:


//... imports
import myproj.sdk.domain.schemas.SchemaGroup.BalanceCheckedSchema;

@Service
public class CardCommand extends CardCommandBase {

private static Logger log = LoggerFactory.getLogger(CardCommand.class);

@Override
public Card createCreditCard(CreditCard creditCard) throws CreditCardCreationError {
log.info("CardCommands.createCreditCard()");
// ... some command implementation

// Publish an event, using a schema from the schema registry as payload
BalanceCheckedEvent schemaEvent = this.eventBuilder.getCc().getBalanceCheckedEvent().build();

// Create the payload for the event
BalanceCheckedSchema schema = new BalanceCheckedSchema();
schema.setProperty1('value');
schemaEvent.setPayload(schema);

// create custom headers
HashMap<String, Object> map = new HashMap();
map.put("headerKey", "headerValue");
MessageHeaders headers = new MessageHeaders(map);

// publish the event with messageKey and messageHeaders
this.publishBalanceCheckedEvent(schemaEvent, "customMessageKey", headers);

// ... further command implementation
}
}

Implement Agents​

Every agent comes with an automatically generated implementation file, which allows to implement business logic that should happen after an event has happened. After publishing an event, all agents bound to the associated topic binding will be executed.

Agent base​

  • For each agent there will be an abstract class Agent Base generated in the SDK

  • The Agent Base provides access to the repository, the entity builder, the event builder and the event producer

  • The Agent Base contains one abstract method named onMessage

  • The onMessage method needs to be implemented in the generated implementation file for the agent

  • The default behavior can be changed by overriding the method onMessage( ConsumerRecord<?, ?> consumerRecord, MessageHeaders headers) in the agent directly. There is also the possibility to access the initial ConsumerRecord.

ℹī¸note

onMessage default behavior changes is only available for Domain Services (Java) based on Java Spring Boot Stack 2.0

  • The agent implementation file onMessage is automatically triggered when the Kafka event that the agent is modelled against is received.

Payload entity​

  • Agent onMessage method will receive Payload Entity as first parameter (if event that the agent receives is modelled with a payload).

  • Agent onMessage method will also receive MessageHeaders as a parameter which is a key-value map that contains Kafka message headers.

Implementation example​

Example of Balance service implementation file.

package de.knowis.cards.operations.domain.cc.agent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.messaging.MessageHeaders;

import de.knowis.cards.operations.sdk.domain.cc.agent.CreditCardCreatedAgentBase;
import de.knowis.cards.operations.domain.facade.DomainEntityBuilder;
import de.knowis.cards.operations.domain.facade.DomainEventBuilder;
import de.knowis.cards.operations.sdk.domain.facade.Repository;
import de.knowis.cards.operations.sdk.domain.service.EventProducerService;
import de.knowis.cards.operations.sdk.domain.cc.entity.CreditCard;
import de.knowis.cards.operations.sdk.domain.cc.entity.Card;

@Service
public class CreditCardCreatedAgent extends CreditCardCreatedAgentBase {

private static Logger log = LoggerFactory.getLogger(CreditCardCreatedAgent.class);

// Declare Card Command
private final CardCommandBase cardCommand;

// Adjust your generated agent constructor to inject CardCommandBase so it can be used in agent logic
public CreditCardCreatedAgent(DomainEntityBuilder entityBuilder, DomainEventBuilder eventBuilder, EventProducerService eventProducer, Repository repo, CardCommandBase cardCommand) {
super(entityBuilder,eventBuilder, eventProducer,repo);
this.cardCommand = cardCommand;
}


@Override
public void onMessage(CreditCard creditCard, MessageHeaders headers) {

log.info("Agent CreditCardCreatedAgent received event cc:CreditCardCreated");

// Get root entity through repository
Card card = this.repo.getCc().getCards().findById(creditCard.getId());

// Call activateCard instance command to activate Card
// (this will call implemented logic for activate card command)
cardCommand.activateCard(card);
}
}

Example to override the default behavior of onMessage method (Java Spring Boot Stack 2.0 only) in Balance service implementation file.

package de.knowis.cards.operations.domain.cc.agent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.messaging.MessageHeaders;

import de.knowis.cards.operations.sdk.domain.cc.agent.CreditCardCreatedAgentBase;
import de.knowis.cards.operations.domain.facade.DomainEntityBuilder;
import de.knowis.cards.operations.domain.facade.DomainEventBuilder;
import de.knowis.cards.operations.sdk.domain.facade.Repository;
import de.knowis.cards.operations.sdk.domain.service.EventProducerService;
import de.knowis.cards.operations.sdk.domain.cc.entity.CreditCard;
import de.knowis.cards.operations.sdk.domain.cc.entity.Card;

@Service
public class CreditCardCreatedAgent extends CreditCardCreatedAgentBase {

private static Logger log = LoggerFactory.getLogger(CreditCardCreatedAgent.class);

// Declare Card Command
private final CardCommandBase cardCommand;

// Adjust your generated agent constructor to inject CardCommandBase so it can be used in agent logic
public CreditCardCreatedAgent(DomainEntityBuilder entityBuilder, DomainEventBuilder eventBuilder, EventProducerService eventProducer, Repository repo, CardCommandBase cardCommand) {
super(entityBuilder,eventBuilder, eventProducer,repo);
this.cardCommand = cardCommand;
}

@Override
public void onMessage(CreditCard creditCard, MessageHeaders headers) {
log.info("Agent CreditCardCreatedAgent received event cc:CreditCardCreated");
}

@Override
// optional could be implemented in case there is a need for ConsumerRecord
// Gives the opportunity to access message key, headers and timestamp from ConsumerRecord
// only available for Domain Services (Java) based on Java Spring Boot Stack 2.0
public void onMessage(ConsumerRecord<?, ?> consumerRecord, MessageHeaders headers) {

// get message key from consumerRecord
Object messageKey = consumerRecord.key();

// get message header
String customerHeader = new String(consumerRecord.headers().lastHeader("customerHeaderName").value());

// get message Timestamp from consumerRecord
long messageTimeStamp = consumerRecord.timestamp();

log.info("Agent CreditCardCreatedAgent received event cc:CreditCardCreated");

// Get root entity through repository
Card card = this.repo.getCc().getCards().findById(creditCard.getId());

// Call activateCard instance command to activate Card
// (this will call implemented logic for activate card command)
cardCommand.activateCard(card);
}
}

Kafka customizers​

SDK provides base interface KafkaCustomizer that solution engineer can provide implementation for and customize Kafka Producer & Kafka Consumer used properties.

kafkaConsumerCustomizer & kafkaProducerCustomizer​

  • Implement KafkaCustomizer interface from de.knowis.cards.sdk.domain.config.KafkaCustomizer where:

    1. de.knowis.cards is service project base path and solution acronym.
    2. sdk.domain.config is the configuration package in the sdk that contains KafkaCustomizer interface.
  • Override kafka consumer/producer configuration by implementing that interface.

    1. For kafka consumer customizer create a bean with qualifier "kafkaConsumerCustomizer".
    2. For kafka producer customizer create a bean with qualifier "kafkaProducerCustomizer".
  • KafkaCustomizer interface has only one method getConfig that gets called for each kafka topic, identified by topicAlias parameter. It will also get the KafkaBinding configuration for that topic.

  • Your implementation logic should return Map<String, Object> that contains the kafka configuration that you want to be used for creatng Kafka Producer / Kafka Consumer.

    public interface KafkaCustomizer {

/**
*
* @param topicAlias Alias name of Event Topic
* @param kafkaBindingConfig for that topic alias
* @return Customized Configurations to use for that topic alias.
*/
public Map<String, Object> getConfig(String topicAlias, KafkaBinding kafkaBindingConfig);

}

Used Kafka properties from Kafka binding secret​

These properties apply to both Kafka producer and consumer. Their values come from the messagehub binding secret that is linked with each topic via topic binding configuration.

security.protocol​

Value comes from securityProtocol in your kafka binding secret properties, default value is "SASL_SSL".

sasl.mechanism​

Value comes from saslMechanism in your kafka binding secret properties, default value is "SCRAM-SHA-512".

ssl.protocol​

Value comes from sslProtocol in your kafka binding secret properties, default value is "TLSv1.2".

ssl.enabled.protocols​

Value comes from sslEnabledProtocols in your kafka binding secret properties, default value is "TLSv1.2".

ssl.endpoint.identification.algorithm​

Value comes from sslIdentificationAlgorithm your kafka binding secret properties, default value is "HTTPS".

sasl.jaas.config​

Value is built using template "{} required username=\"{}\" password=\"{}\" where:

  1. First is replaced with saslJaasConfigLoginModuleQualifiedName value from kafka binding secret, default value is "org.apache.kafka.common.security.scram.ScramLoginModule"

  2. Second is replaced with "user" value from kafka binding secret.

  3. Third is replaced with "password" value from kafka binding secret.

bootstrap.servers​

Value comes from kafka_brokers_sasl in your kafka binding secret properties.

Advanced Topics​

Dead Letter queue​

If any exception (except EventJsonValidationException) occurs during agent execution, the agent execution will retry up to five times. If it's still failing, the event will be published to a dead-letter-queue. The name for this queue has the following pattern {Original_Topic_Name}.DLT.

It's also possible to have a different implementation for error handler by providing a bean with the following name {Original_Topic_Name}ErrorHandler as the below example

@Configuration
public class SomeCustomConfig {

private static final Logger log = LoggerFactory.getLogger(SomeCustomConfig.class);

@Bean(name = "topic_nameErrorHandler")
public CommonErrorHandler Topic_nameErrorHandler(){
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
(record, exception) -> {
log.error("Failed to consume the current event " + record.key(), exception);
},
new FixedBackOff(0L, 1)
);
return errorHandler;
}
}
ℹī¸note

If an EventJsonValidationException occurred, the event will be published directly to a dead-letter-queue without any retry.

Course Implement Domain Logic