Producing Domain Events

Producing Domain Events with AsyncAPI and ZenWave Maven Plugin

In this section you will learn how to produce domain events from your services and publish them to a message broker.

We will be using AsyncAPI specification to define our message structure, and use ZenWave SDK Maven Plugin to generate models (DTOs) and a producer implementation from your API-First definition.

Using ZenWave ZDL as Definition Language for AsyncAPI

While AsyncAPI is the source of truth for Event-Driven communication, with ZDLToAsyncAPIPlugin plugins you can create complete draft version of AsyncAPI specification from your annotated Services and Events.

jbang zw -p io.zenwave360.sdk.plugins.ZDLToAsyncAPIPlugin \
specFile=src/main/resources/model/orders-model.zdl \
idType=integer \
idTypeFormat=int64 \
targetFile=src/main/resources/model/asyncapi.yml

For instance the following ZDL model:

service OrdersService for (CustomerOrder) {
// only emited events will be included in the asyncapi definition
updateOrder(id, CustomerOrderInput) CustomerOrder withEvents OrderStatusUpdated
}
@asyncapi({channel: "OrderUpdatesChannel", topic: "orders.order_updates"})
event OrderStatusUpdated {
id String
dateTime Instant required
status OrderStatus required
previousStatus OrderStatus
}

Will generate the following sections in an AsyncAPI definition:

  • An schema named OrderStatusUpdated with a payload containing the id, dateTime, status and previousStatus fields.
  • A message named OrderStatusUpdatedMessage pointing to OrderStatusUpdated schema.
  • An a Channel named OrderUpdatesChannel containing a reference to the OrderStatusUpdatedMessage message.
  • It also will generate an Operation named onOrderStatusUpdated with and action sendto the OrderUpdatesChannel channel.

πŸ’‘ This is as a compact format as it can get!! Saving you a lot of typing and giving you very concise representation of your events.

API-First Code Generator from AsyncAPI

You can use API-First AsyncAPI Maven Plugin to generate models (DTOs) and a producer implementation.

Configure zenwave-sdk-maven-plugin as follows:

  • Set inputSpec to point to your AsyncAPI specification. You can use classpath, file, http(s) or https as inputSpec.
  • Add asyncapi-spring-cloud-streams3 and asyncapi-jsonschema2pojo as dependencies.
  • Add two executions to generate the models (DTOs) and an Events Producer implementation.
  • Configure modelPackage, producerApiPackage and consumerApiPackage to match your project structure.
  • Configure role to provider or client if you are generating code from a third-party specification. If you are defining all the operations (client and provider) then you are acting as the provider which is the default.
<plugin>
<groupId>io.github.zenwave360.zenwave-sdk</groupId>
<artifactId>zenwave-sdk-maven-plugin</artifactId>
<version>${zenwave.version}</version>
<configuration>
<inputSpec>classpath:/apis/asyncapi.yml</inputSpec><!-- classpath, file, http(s) -->
<addCompileSourceRoot>true</addCompileSourceRoot><!-- default is true -->
<addTestCompileSourceRoot>true</addTestCompileSourceRoot><!-- default is true -->
</configuration>
<executions>
<!-- DTOs -->
<execution>
<id>generate-asyncapi-provider-dtos</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<generatorName>jsonschema2pojo</generatorName>
<configOptions>
<modelPackage>${basePackage}.core.domain.events</modelPackage>
<jsonschema2pojo.isUseJakartaValidation>true</jsonschema2pojo.isUseJakartaValidation>
<jsonschema2pojo.useLongIntegers>true</jsonschema2pojo.useLongIntegers>
</configOptions>
</configuration>
</execution>
<!-- Generate PROVIDER -->
<execution>
<id>generate-asyncapi-provider-implementation</id>
<phase>generate-sources</phase>
<goals><goal>generate</goal></goals>
<configuration>
<generatorName>spring-cloud-streams3</generatorName>
<configOptions>
<role>provider</role>
<style>imperative</style>
<modelPackage>${basePackage}.core.domain.events</modelPackage>
<producerApiPackage>${basePackage}.core.outbound.events</producerApiPackage>
<consumerApiPackage>${basePackage}.adapters.commands</consumerApiPackage>
</configOptions>
</configuration>
</execution>
</executions>
<!-- add any sdk plugin (custom or standard) as dependency here -->
<dependencies>
<dependency>
<groupId>io.github.zenwave360.zenwave-sdk.plugins</groupId>
<artifactId>asyncapi-spring-cloud-streams3</artifactId>
<version>${zenwave.version}</version>
</dependency>
<dependency>
<groupId>io.github.zenwave360.zenwave-sdk.plugins</groupId>
<artifactId>asyncapi-jsonschema2pojo</artifactId>
<version>${zenwave.version}</version>
</dependency>
</dependencies>
</plugin>

Producing Domain Events with Generated Code

By default, ZenWave generates Spring Cloud Streams implementation that counts with different binders for virtually any message broker:

ZenWave360 AsyncAPI Spring Cloud Streams
πŸ“¦ target/generated-sources/zenwave
πŸ“¦ src/main/java
└─ <modelPackage> models (DTOs)
└─ AddressDTO.java
└─ CustomerDTO.java
└─ <producerPackage>
└─ ICustomerEventsProducer (interface and header objects)
└─ CustomerEventsProducer (spring-cloud-streams producer)
πŸ“¦ src/test/java
└─ <producerPackage>
└─ CustomerEventsProducerCaptor (in-memory producer/captor)
└─ EventsProducerInMemoryContext (spring-boot/manual context)

With this schema you just need to:

  • Autowire ICustomerEventsProducer wherever you would like to produce events.
  • Add CustomerEventsProducer to your Spring Boot context and set any required configuration for spring.streams.bindings in application.yml.
public CustomerServiceImpl {
@Autowired
ICustomerEventsProducer eventsProducer;
// ...
public Customer createCustomer(Customer input) {
// ...
eventsProducer.onCustomerEvent(customerEvent);
// ...
}
}
spring:
stream:
bindings:
on-order-event-out-0: ## you can get this name from 'CustomerEventsProducer' class
destination: orders.orders

InMemory Event Producer (Captor) for your Tests

ZenWave SDK Maven Plugin will generate an in-memory implementation of your Events Producer that you can use in your tests to capture events and perform assertions on them. All this without a message broker, TestContainers or Mockito.

@Configuration
@Profile("in-memory")
public class ServicesInMemoryConfig extends RepositoriesInMemoryConfig {
protected final EventsProducerInMemoryContext eventsProducerInMemoryContext = new EventsProducerInMemoryContext();
protected final CustomerServiceImpl customerService = new CustomerServiceImpl(
customerRepository(),
eventsProducerInMemoryContext.customerEventsProducer());
@Bean
public CustomerServiceImpl customerService() {
return customerService;
}
}

Now you can perform assertions on captured events in your tests:

public class CustomerServiceTest {
CustomerEventsProducerCaptor eventsProducerCaptor = serviceInMemoryContext.customerEventsProducerCaptor();
@Test
void createCustomerTest() {
var input = new Customer();
// TODO fill input data
var customer = customerService.createCustomer(input);
assertNotNull(customer.getId());
// Assertions on captured events
var customerEvents = eventsProducerCaptor.getCapturedMessages(eventsProducerCaptor.onCustomerEventBindingName);
Assertions.assertEquals(1, customerEvents.size());
}
}

Letting ZenWave SDK Include Event Producers in Your Services

Because this depends on ZenWave AsyncAPI Plugins naming conventions, you need to explicitly set includeEmitEventsImplementation to true in ZenWave SDK Backend Plugin option to include event publishing code in your core services.

// This will only be generated if includeEmitEventsImplementation is set to true
private final EventsMapper eventsMapper = EventsMapper.INSTANCE;
private final ICustomerEventsProducer eventsProducer;
@Transactional
public Customer createCustomer(Customer input) {
log.debug("Request to save Customer: {}", input);
var customer = customerServiceMapper.update(new Customer(), input);
customer = customerRepository.save(customer);
// This will only be generated if includeEmitEventsImplementation is set to true
var customerEvent = eventsMapper.asCustomerEvent(customer);
eventsProducer.onCustomerEvent(customerEvent);
return customer;
}