Business Events: Implement Events and Agents
Publish Eventsâ
Using the Business Event Support extension, additional code for easily publishing events will be generated.
For creating the event and setting the payload the event builder can be used. The event producer service and the built-in event publishing offers capabilities to publish the event to the Kafka topic which was specified at the event while modelling.
Event builderâ
The generated SDK provides Builders for Events which can be easily used to construct events. It allows to set the payload of the event using the setPayload
method. Depending on what has been defined as payload, it allows to use a schema from the schema registry for defining a payload or another entity (deprecated, see Events 1.0).
// Importing overall Event Builder
import de.knowis.cards.sdk.domain.facade.DomainEventBuilder;
// Declare and Inject Domain Event Builder
@Autowired
private DomainEventBuilder eventBuilder;
// Create a new instance of the event that should be published
CardCreatedEvent event = this.eventBuilder.getCc().getCardCreatedEvent().build();
// Create the payload for the event based on a schema and set it
SuccessSchema payloadSchema = new SuccessSchema();
payloadSchema.successful(true);
event.setPayload(payloadSchema);
/**
* Event 1.0 with entity payload
* @deprecated use schemas from the schema registry instead
*/
SuccessEventPayload payloadEntity = entityBuilder.getCc().successEventPayload().build();
event.setPayload(payloadEntity);
DomainEventBuilder is also accessible in project generated implementation files (services, commands, agents) derived from their base classes.
Event producer serviceâ
The autogenerated EventProducerService
allows to easily publish business events to Kafka topics. The topic which is used is the one, that has been defined for the event during modelling. When using it, the connection to the Kafka will be automatically set up and does not require further code.
When publishing the event, it is possible to set custom headers and a custom message key (optional).
// Declare Event Producer Service
@Autowired
private EventProducerService eventProdcuer;
// Create business event
SuccessEvent event = eventBuilder.getCc().successEvent().build();
// set event payload
// ...
// Publish business event
eventProducer.publish(event);
// Alternative 1: publish the event with custom messageKey
eventProducer.publish(event, "customMessageKey");
// Alternative 2: publish the event with messageHeaders
HashMap<String, Object> map = new HashMap();
map.put("headerKey", "headerValue");
MessageHeaders headers = new MessageHeaders(map);
eventProducer.publish(event, headers);
// Alternative 3: publish the event with messageKey and messageHeaders
eventProducer.publish(event, "customMessageKey", headers);
Custom message keys will be ignored in case of publishing event 1.0 (Entity payload).
Built-in event publishingâ
If the event is assigned to a command, service or agent, while modelling the generated base class of those will additionally provide a method to publish the event easily. Also here it is possible to customize the publishing by setting a custom message key and custom headers
// Create business event
SuccessEvent event = eventBuilder.getCc().successEvent().build();
// set event payload
// ...
// Publish business event
this.publishSuccessEvent(event);
// Alternative 1: publish the event with custom messageKey
this.publishSuccessEvent(event, "customMessageKey");
// Alternative 2: publish the event with messageHeaders
HashMap<String, Object> map = new HashMap();
map.put("headerKey", "headerValue");
MessageHeaders headers = new MessageHeaders(map);
this.publishSuccessEvent(event, headers);
// Alternative 3: publish the event with messageKey and messageHeaders
this.publishSuccessEvent(event, "customMessageKey", headers);
Full implementation exampleâ
Implementation example for publishing events assigned to command, service or agent:
//... imports
import myproj.sdk.domain.schemas.SchemaGroup.BalanceCheckedSchema;
@Service
public class CardCommand extends CardCommandBase {
private static Logger log = LoggerFactory.getLogger(CardCommand.class);
@Override
public Card createCreditCard(CreditCard creditCard) throws CreditCardCreationError {
log.info("CardCommands.createCreditCard()");
// ... some command implementation
// Publish an event, using a schema from the schema registry as payload
BalanceCheckedEvent schemaEvent = this.eventBuilder.getCc().getBalanceCheckedEvent().build();
// Create the payload for the event
BalanceCheckedSchema schema = new BalanceCheckedSchema();
schema.setProperty1('value');
schemaEvent.setPayload(schema);
// create custom headers
HashMap<String, Object> map = new HashMap();
map.put("headerKey", "headerValue");
MessageHeaders headers = new MessageHeaders(map);
// publish the event with messageKey and messageHeaders
this.publishBalanceCheckedEvent(schemaEvent, "customMessageKey", headers);
// ... further command implementation
}
}
Implement Agentsâ
Every agent comes with an automatically generated implementation file, which allows to implement business logic that should happen after an event has happened. After publishing an event, all agents bound to the associated topic binding will be executed.
Agent baseâ
-
For each agent there will be an abstract class Agent Base generated in the SDK
-
The Agent Base provides access to the repository, the entity builder, the event builder and the event producer
-
The Agent Base contains one abstract method named onMessage
-
The onMessage method needs to be implemented in the generated implementation file for the agent
-
The default behavior can be changed by overriding the method onMessage( ConsumerRecord
<?, ?>
consumerRecord, MessageHeaders headers) in the agent directly. There is also the possibility to access the initial ConsumerRecord.
onMessage default behavior changes is only available for Domain Services (Java) based on Java Spring Boot Stack 2.0
- The agent implementation file onMessage is automatically triggered when the Kafka event that the agent is modelled against is received.
Payload entityâ
-
Agent onMessage method will receive Payload Entity as first parameter (if event that the agent receives is modelled with a payload).
-
Agent onMessage method will also receive MessageHeaders as a parameter which is a key-value map that contains Kafka message headers.
Implementation exampleâ
Example of Balance service implementation file.
package de.knowis.cards.operations.domain.cc.agent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.messaging.MessageHeaders;
import de.knowis.cards.operations.sdk.domain.cc.agent.CreditCardCreatedAgentBase;
import de.knowis.cards.operations.domain.facade.DomainEntityBuilder;
import de.knowis.cards.operations.domain.facade.DomainEventBuilder;
import de.knowis.cards.operations.sdk.domain.facade.Repository;
import de.knowis.cards.operations.sdk.domain.service.EventProducerService;
import de.knowis.cards.operations.sdk.domain.cc.entity.CreditCard;
import de.knowis.cards.operations.sdk.domain.cc.entity.Card;
@Service
public class CreditCardCreatedAgent extends CreditCardCreatedAgentBase {
private static Logger log = LoggerFactory.getLogger(CreditCardCreatedAgent.class);
// Declare Card Command
private final CardCommandBase cardCommand;
// Adjust your generated agent constructor to inject CardCommandBase so it can be used in agent logic
public CreditCardCreatedAgent(DomainEntityBuilder entityBuilder, DomainEventBuilder eventBuilder, EventProducerService eventProducer, Repository repo, CardCommandBase cardCommand) {
super(entityBuilder,eventBuilder, eventProducer,repo);
this.cardCommand = cardCommand;
}
@Override
public void onMessage(CreditCard creditCard, MessageHeaders headers) {
log.info("Agent CreditCardCreatedAgent received event cc:CreditCardCreated");
// Get root entity through repository
Card card = this.repo.getCc().getCards().findById(creditCard.getId());
// Call activateCard instance command to activate Card
// (this will call implemented logic for activate card command)
cardCommand.activateCard(card);
}
}
Example to override the default behavior of onMessage method (Java Spring Boot Stack 2.0 only) in Balance service implementation file.
package de.knowis.cards.operations.domain.cc.agent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.messaging.MessageHeaders;
import de.knowis.cards.operations.sdk.domain.cc.agent.CreditCardCreatedAgentBase;
import de.knowis.cards.operations.domain.facade.DomainEntityBuilder;
import de.knowis.cards.operations.domain.facade.DomainEventBuilder;
import de.knowis.cards.operations.sdk.domain.facade.Repository;
import de.knowis.cards.operations.sdk.domain.service.EventProducerService;
import de.knowis.cards.operations.sdk.domain.cc.entity.CreditCard;
import de.knowis.cards.operations.sdk.domain.cc.entity.Card;
@Service
public class CreditCardCreatedAgent extends CreditCardCreatedAgentBase {
private static Logger log = LoggerFactory.getLogger(CreditCardCreatedAgent.class);
// Declare Card Command
private final CardCommandBase cardCommand;
// Adjust your generated agent constructor to inject CardCommandBase so it can be used in agent logic
public CreditCardCreatedAgent(DomainEntityBuilder entityBuilder, DomainEventBuilder eventBuilder, EventProducerService eventProducer, Repository repo, CardCommandBase cardCommand) {
super(entityBuilder,eventBuilder, eventProducer,repo);
this.cardCommand = cardCommand;
}
@Override
public void onMessage(CreditCard creditCard, MessageHeaders headers) {
log.info("Agent CreditCardCreatedAgent received event cc:CreditCardCreated");
}
@Override
// optional could be implemented in case there is a need for ConsumerRecord
// Gives the opportunity to access message key, headers and timestamp from ConsumerRecord
// only available for Domain Services (Java) based on Java Spring Boot Stack 2.0
public void onMessage(ConsumerRecord<?, ?> consumerRecord, MessageHeaders headers) {
// get message key from consumerRecord
Object messageKey = consumerRecord.key();
// get message header
String customerHeader = new String(consumerRecord.headers().lastHeader("customerHeaderName").value());
// get message Timestamp from consumerRecord
long messageTimeStamp = consumerRecord.timestamp();
log.info("Agent CreditCardCreatedAgent received event cc:CreditCardCreated");
// Get root entity through repository
Card card = this.repo.getCc().getCards().findById(creditCard.getId());
// Call activateCard instance command to activate Card
// (this will call implemented logic for activate card command)
cardCommand.activateCard(card);
}
}
Kafka customizersâ
SDK provides base interface KafkaCustomizer
that solution engineer can provide implementation for and customize Kafka Producer & Kafka Consumer used properties.
kafkaConsumerCustomizer & kafkaProducerCustomizerâ
-
Implement
KafkaCustomizer
interface fromde.knowis.cards.sdk.domain.config.KafkaCustomizer
where:de.knowis.cards
is service project base path and solution acronym.sdk.domain.config
is the configuration package in the sdk that containsKafkaCustomizer
interface.
-
Override kafka consumer/producer configuration by implementing that interface.
- For kafka consumer customizer create a bean with qualifier "kafkaConsumerCustomizer".
- For kafka producer customizer create a bean with qualifier "kafkaProducerCustomizer".
-
KafkaCustomizer
interface has only one methodgetConfig
that gets called for each kafka topic, identified bytopicAlias
parameter. It will also get theKafkaBinding
configuration for that topic. -
Your implementation logic should return
Map<String, Object>
that contains the kafka configuration that you want to be used for creatng Kafka Producer / Kafka Consumer.
public interface KafkaCustomizer {
/**
*
* @param topicAlias Alias name of Event Topic
* @param kafkaBindingConfig for that topic alias
* @return Customized Configurations to use for that topic alias.
*/
public Map<String, Object> getConfig(String topicAlias, KafkaBinding kafkaBindingConfig);
}
Used Kafka properties from Kafka binding secretâ
These properties apply to both Kafka producer and consumer. Their values come from the messagehub binding secret that is linked with each topic via topic binding configuration.
security.protocolâ
Value comes from securityProtocol
in your kafka binding secret properties, default value is "SASL_SSL".
sasl.mechanismâ
Value comes from saslMechanism
in your kafka binding secret properties, default value is "SCRAM-SHA-512".
ssl.protocolâ
Value comes from sslProtocol
in your kafka binding secret properties, default value is "TLSv1.2".
ssl.enabled.protocolsâ
Value comes from sslEnabledProtocols
in your kafka binding secret properties, default value is "TLSv1.2".
ssl.endpoint.identification.algorithmâ
Value comes from sslIdentificationAlgorithm
your kafka binding secret properties, default value is "HTTPS".
sasl.jaas.configâ
Value is built using template "{} required username=\"{}\" password=\"{}\"
where:
-
First is replaced with
saslJaasConfigLoginModuleQualifiedName
value from kafka binding secret, default value is "org.apache.kafka.common.security.scram.ScramLoginModule" -
Second is replaced with "user" value from kafka binding secret.
-
Third is replaced with "password" value from kafka binding secret.
bootstrap.serversâ
Value comes from kafka_brokers_sasl
in your kafka binding secret properties.
Advanced Topicsâ
Dead Letter queueâ
If any exception (except EventJsonValidationException
) occurs during agent execution, the agent execution will retry up to five times. If it's still failing, the event will be published to a dead-letter-queue. The name for this queue has the following pattern {Original_Topic_Name}.DLT
.
It's also possible to have a different implementation for error handler by providing a bean with the following name {Original_Topic_Name}ErrorHandler
as the below example
@Configuration
public class SomeCustomConfig {
private static final Logger log = LoggerFactory.getLogger(SomeCustomConfig.class);
@Bean(name = "topic_nameErrorHandler")
public CommonErrorHandler Topic_nameErrorHandler(){
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
(record, exception) -> {
log.error("Failed to consume the current event " + record.key(), exception);
},
new FixedBackOff(0L, 1)
);
return errorHandler;
}
}
If an EventJsonValidationException occurred, the event will be published directly to a dead-letter-queue without any retry.