Implementing a Transactional OutBox With AsyncAPI, SpringModulith and ZenWaveSDK
Introduction
When processes span multiple services without shared transaction boundaries, ensuring atomicity and consistency in distributed systems is challenging.
Distributed transactions with Two-Phase Commits (2PC) are complex, introduce significant performance overhead, and may not even be feasible for certain services.
The Outbox Pattern solves this by using a Database Transaction to store events in a dedicated "outbox" table within the same transaction as a database update. These events are then published to external systems, such as an email service or message broker, ensuring eventual consistency without the need for distributed transactions.
In this post, we’ll explore how we can implement a Transactional Outbox Pattern to:
- Persist data to a supported transactional database (e.g., SQL or MongoDB).
- Send events to an external message broker like Kafka or RabbitMQ using Spring Cloud Stream.
- Leverage Spring Modulith Events transactional features.
- Use ZenWaveSDK Code Generator for AsyncAPI so you don’t need to write a single line of boilerplate code for the transactional outbox and event publishing.
Playground Project
Because working software is worth more than a thousand words, we’ll use a fully functional playground project that you can explore and test yourself.
We'll use the following project as our playground: EDA-TransactionalOutbox-Modulith-JPA.
This project provides a simple API for managing customer details, including REST endpoints for CRUD operations. These operations publish event notifications to a Kafka topic, using Avro as the payload format.
Key components of the project include:
- OpenAPI definition file: openapi.yml
- AsyncAPI definition file: asyncapi.yml
- Avro schemas for events: Avro schemas
Distributed Transactions Problem and The Outbox Pattern
When managing a Customer
entity, we encounter a significant challenge: ensuring the atomicity of persisting customer details in the database and publishing a related event to an external message broker. Since the database and the message broker do not share a transaction boundary, this can lead to potential inconsistencies.
Key Scenarios Highlighting the Problem:
-
Event Sent Before Database Transaction Commits: If the event is published before the database transaction is committed, a rollback of the transaction would leave the system in an inconsistent state, as the published event cannot be undone.
-
Event Sent After Database Transaction Commits: If the event is published after the database transaction is committed, there’s no guarantee the event will actually be sent. A service crash or network failure could prevent the event from being published, resulting in data inconsistency.
@Servicepublic class CustomerService {// ...@Transactionalpublic Customer createCustomer(Customer input) {log.debug("Request to save Customer: {}", input);var customer = mapper.update(new Customer(), input);customer = customerRepository.save(customer); // Persist to DBvar customerEvent = eventsMapper.asCustomerEvent(customer);eventsProducer.onCustomerEvent(customerEvent); // Emit Event to external Brokerreturn customer;}}
With the transactional outbox pattern, instead of directly sending the event to the external system, the call to eventsProducer.onCustomerEvent(customerEvent)
stores the event in a dedicated outbox table as part of the same transaction as the database update. An external process then reads from the outbox table and publishes the event to the message broker.
However, implementing this external process, managing the outbox table, and ensuring that events are published once and in the correct order can be complex and error-prone.
Fortunately, tools like Spring Modulith’s Event Publication Registry and the ZenWaveSDK Code Generator for AsyncAPI simplify this process, so you can focus only on implementing your business logic.
ZenWaveSDK Code Generation for AsyncAPI and Spring Cloud Streams
ZenWaveSDK generates all the boilerplate code needed to send and receive events using Spring Cloud Stream from an AsyncAPI definition file.
From your AsyncAPI definition file, ZenWaveSDK generates:
- DTO/Models for your event payloads.
- Typed Header Objects to ensure consistency.
- "Intention-revealing" Java interfaces with methods named explicitly after your operationIds.
- A lightweight implementation using Spring Cloud Streams that can be
@Autowire
d directly into your services.
You can then configure Spring Cloud Stream to send and receive messages using any of the supported binders, such as Kafka or RabbitMQ. With ZenWaveSDK, you don’t need to write any boilerplate code, allowing you to focus entirely on your business logic.
ZenWave SDK Maven Plugin Configuration
<plugin><groupId>io.github.zenwave360.zenwave-sdk</groupId><artifactId>zenwave-sdk-maven-plugin</artifactId><version>${zenwave.version}</version><configuration><inputSpec>${project.basedir}/src/main/resources/public/apis/asyncapi.yml</inputSpec><addCompileSourceRoot>true</addCompileSourceRoot><addTestCompileSourceRoot>true</addTestCompileSourceRoot></configuration><executions><!-- DTOs --><!-- we skip DTOs generation in this case b/c we are using Avro for that --><!-- Generate PROVIDER --><execution><id>generate-asyncapi</id><phase>generate-sources</phase><goals><goal>generate</goal></goals><configuration><generatorName>spring-cloud-streams3</generatorName><configOptions><role>provider</role><style>imperative</style><transactionalOutbox>modulith</transactionalOutbox><!-- using Spring Modulith implementation --><modelPackage>${asyncApiModelPackage}</modelPackage><producerApiPackage>${asyncApiProducerApiPackage}</producerApiPackage><consumerApiPackage>${asyncApiConsumerApiPackage}</consumerApiPackage></configOptions></configuration></execution></executions><dependencies>// ...</dependencies></plugin>
Avro Maven Plugin Configuration
<plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.1</version><executions><execution><goals><goal>schema</goal></goals><phase>generate-sources</phase></execution></executions><configuration><sourceDirectory>${project.basedir}/src/main/resources/public/apis/avro</sourceDirectory><outputDirectory>${project.basedir}/target/generated-sources/avro</outputDirectory><imports><import>${project.basedir}/src/main/resources/public/apis/avro/PaymentMethodType.avsc</import><import>${project.basedir}/src/main/resources/public/apis/avro/PaymentMethod.avsc</import><import>${project.basedir}/src/main/resources/public/apis/avro/Address.avsc</import></imports></configuration></plugin>
To achieve this, we configure the zenwave-sdk-maven-plugin
and, in this case, also the avro-maven-plugin
in the pom.xml
file. This setup ensures that the necessary code is generated in the target/generated-sources
folder as part of your build process.
Since this configuration runs automatically during the build, every time you update your AsyncAPI definition file, you can be sure that your code remains in sync with your API definition.
And because we configured the transactionalOutbox
option to modulith
, ZenWaveSDK will generate the code to use Spring Modulith Events Publication Registry to manage the transactional outbox for you.
Spring Modulith Events Publication Registry
Because Event Externalization is enabled, and Message<?>
objects are configured for externalization (see below), these events are also stored transactionally in the Spring Modulith Event Publication Registry.
spring:modulith.events.externalization.enabled: truemodulith.events.jdbc.schema-initialization.enabled: truemodulith.events.republish-outstanding-events-on-restart: true
Using Spring Cloud Streams to Externalize Modulith Events
Now that Spring Modulith is managing our Message<?>
events, we need to configure one of the many supported event externalizers.
If we were publishing POJOs in JSON format, we could use spring-modulith-events-kafka
to externalize events to a Kafka topic. However, since we want to externalize Message<?>
objects with Avro payloads, we will use io.zenwave360.sdk:spring-modulith-events-scs
, which supports serializing and deserializing Message<?>
objects, with or without Avro payloads.
For more details, see ZenWave360 Spring Modulith Events for Spring Cloud Stream.
To enable this, add the following dependencies to your pom.xml
file:
<dependency><!-- Spring Cloud Stream Externalization for Message<?> --><groupId>io.zenwave360.sdk</groupId><artifactId>spring-modulith-events-scs</artifactId><version>${spring-modulith-events-scs.version}</version></dependency><dependency><!-- Needed for serializing Avro payloads to db storage as json --><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-avro</artifactId></dependency>
And @EnableSpringCloudStreamEventExternalization
to our Spring Boot Configuration:
import io.zenwave360.modulith.events.scs.config.EnableSpringCloudStreamEventExternalization;import org.springframework.context.annotation.Configuration;@Configuration@EnableSpringCloudStreamEventExternalizationclass ExternalizationConfiguration {}
This will route programmatically route our Message<?>
objects to the correct Spring Cloud Stream binding, but you don't need to worry about any of this as it is handled automatically behind the scenes.
// this is some of what you get when adding @EnableSpringCloudStreamEventExternalization@AutoConfiguration@AutoConfigureAfter(EventExternalizationAutoConfiguration.class)@ConditionalOnProperty(name = "spring.modulith.events.externalization.enabled", havingValue = "true",matchIfMissing = true)public class MessageExternalizationConfiguration {@BeanEventExternalizationConfiguration eventExternalizationConfiguration() {return EventExternalizationConfiguration.externalizing().select(event -> annotatedAsExternalized().test(event)|| event instanceof Message<?> && getTarget(event) != null).route(Message.class, event -> RoutingTarget.forTarget(getTarget(event)).withoutKey()).build();}private String getTarget(Object event) {if (event instanceof Message<?> message) {return message.getHeaders().get(SpringCloudStreamEventExternalizer.SPRING_CLOUD_STREAM_SENDTO_DESTINATION_HEADER, String.class);}return null;}}
Automagic Transactional OutBox Implementation
Our original code is now automagically implementing the Transactional OutBox pattern using Spring Modulith Events Publication Registry and Spring Cloud Stream, all made possible by the ZenWaveSDK AsyncAPI Code Generator. 🚀🚀🚀
@Servicepublic class CustomerService {// ...@Transactionalpublic Customer createCustomer(Customer input) {log.debug("Request to save Customer: {}", input);var customer = mapper.update(new Customer(), input);customer = customerRepository.save(customer); // Persist to DBvar customerEvent = eventsMapper.asCustomerEvent(customer);eventsProducer.onCustomerEvent(customerEvent); // Emit Event to external Broker via Tx OutBoxreturn customer;}}