AsyncAPI and Spring Cloud Stream 3

Maven Central GitHub

Note: This plugin has been superseded by AsyncAPI Generator and is scheduled for removal.

AsyncAPI and Spring Cloud Streams 3

Generating Consumer & Producer APIs

With ZenWave's spring-cloud-streams3 and jsonschema2pojo generator plugins you can generate:

  • Strongly typed business interfaces
  • Payload DTOs and
  • Header objects from AsyncAPI definitions.

It uses Spring Cloud Streams as default implementation, so it can connect to many different brokers via provided binders.

And because everything is hidden behind interfaces we can encapsulate many Enterprise Integration Patterns:

  • Transactional Outbox: with Spring Modulith, MongoDB ChangeStreams, Plain SQL or a custom solution
  • Business DeadLetter Queues: allowing you to route different business Exceptions to different DeadLetter queues for non-retrayable errors.
  • Enterprise Envelope: when your organization uses a common Envelope for messages, you can still express your AsyncAPI definition in terms of your business payload.

It supports AsyncAPI v2 (publish/subscribe) and AsyncAPI v3 (send/receive) styles.

It also lets you reverse how the API is generated using the client role, so you don't need to define a new API definition just to consume an existing API.

Maven Plugin Configuration (API-First)

Configure ZenWave Maven Plugin to generate code during your build process:

  1. Add generator dependencies to zenwave-sdk-maven-plugin
  2. Configure <execution> blocks for each generator (jsonschema2pojo, spring-cloud-streams3, etc)
  3. Reference AsyncAPI files from dependencies using classpath: prefix
  4. Set generator options using <configOptions> (see SpringCloudStreams3Plugin Options )

Maven Plugin Base Configuration

Use this as base configuration:

<plugin>
<groupId>io.zenwave360.sdk</groupId>
<artifactId>zenwave-sdk-maven-plugin</artifactId>
<version>${zenwave.version}</version>
<configuration>
<addCompileSourceRoot>true</addCompileSourceRoot><!-- default is true -->
<addTestCompileSourceRoot>true</addTestCompileSourceRoot><!-- default is true -->
</configuration>
<executions>
<!-- Add executions for each generation here: -->
<execution>
<id>generate-asyncapi-xxx</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<generatorName>spring-cloud-streams3</generatorName>
<inputSpec>classpath:model/asyncapi.yml</inputSpec>
<configOptions>
<!-- ... -->
</configOptions>
</configuration>
</execution>
</executions>
<!-- add any sdk plugin (custom or standard) as dependency here -->
<dependencies>
<dependency><!-- optional dependency containing AsyncAPI definition files -->
<groupId>com.example.apis</groupId>
<artifactId>asyncapis</artifactId>
<version>${apis.version}</version>
</dependency>
<dependency>
<groupId>io.zenwave360.sdk.plugins</groupId>
<artifactId>asyncapi-spring-cloud-streams3</artifactId>
<version>${zenwave.version}</version>
</dependency>
<dependency>
<groupId>io.zenwave360.sdk.plugins</groupId>
<artifactId>asyncapi-jsonschema2pojo</artifactId>
<version>${zenwave.version}</version>
</dependency>
</dependencies>
</plugin>

Generate Model DTOs using jsonschema2pojo

Add this execution to generate model DTOs from AsyncAPI definitions:

<execution>
<id>generate-asyncapi-producer-dtos</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<generatorName>jsonschema2pojo</generatorName>
<inputSpec>${pom.basedir}/src/main/resources/model/asyncapi.yml</inputSpec>
<configOptions>
<modelPackage>io.zenwave360.example.api.events.model</modelPackage>
</configOptions>
</configuration>
</execution>

Generate Spring Cloud Streams Provider Implementation with spring-cloud-streams3

Add this execution to generate Spring Cloud Streams producer/consumer classes from AsyncAPI definitions:

<execution>
<id>generate-asyncapi-producer</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<generatorName>spring-cloud-streams3</generatorName>
<inputSpec>classpath:model/asyncapi.yml</inputSpec>
<configOptions>
<role>provider</role><!-- use `client` to reverse code generation -->
<transactionalOutbox>none</transactionalOutbox> <!-- `modulith` (preferred), `mongodb`, `jdbc` or `none` -->
<modelPackage>io.zenwave360.example.api.events.model</modelPackage>
<apiPackage>io.zenwave360.example.api.events</apiPackage>
<!-- use <producerApiPackage></producerApiPackage> if you want to differentiate producer/consumer packages, it overrides apiPackage -->
<!-- use <consumerApiPackage></consumerApiPackage> if you want to differentiate producer/consumer packages, it overrides apiPackage -->
</configOptions>
</configuration>
</execution>

Reverse Code Generation using Client role.

If you want to generate the consumer side of an existing API, you can use the client role to reverse how the API is generated:

<execution>
<id>generate-asyncapi-client-imperative</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<generatorName>spring-cloud-streams3</generatorName>
<inputSpec>${pom.basedir}/src/main/resources/model/asyncapi.yml</inputSpec>
<configOptions>
<role>client</role>
<modelPackage>io.zenwave360.example.api.events.model</modelPackage>
<apiPackage>io.zenwave360.example.api.events</apiPackage>
</configOptions>
</configuration>
</execution>

Because APIs mediated by a broker are inherently reciprocal it's difficult to establish the roles of client/server: what represents a publish operation from one side will be a subscribe operation seen from the other side. Also, a given service can act as a publisher and subscriber on the same API.

For these reasons, to avoid defining the same API operations multiple times from each perspective, we propose to define de API only once from the perspective of the provider of the functionality, which may be a producer, a consumer or both.

Some definitions:

  • SERVICE: An independent piece of software, typically a microservice, that provides a set of capabilities to other services.
  • PROVIDER: The service that implements the functionality of the API. It may be accepting asynchronous command request or publishing business domain events.
  • CLIENT/s: The service/s that makes use of the functionality of the API. It may be requesting asynchronous commands or subscribing to business domain events.
  • PRODUCER: A service that writes a given message.
  • CONSUMER: A service that reads a given message.

Use the table to understand which section of AsyncAPI (publish or subscribe) to use for each topic, and which role (provider or client) to use on the plugin configuration.

EventsCommands
ProviderProduces (publish)Consumes (subscribe)
ClientConsumes (subscribe)Produces (publish)
OperationId Suggested Prefixon<Event Name>do<Command Name>

Getting Help

jbang zw -p io.zenwave360.sdk.plugins.SpringCloudStreams3Plugin --help

SpringCloudStreams3Plugin Options

OptionDescriptionTypeDefaultValues
apiFileAPI Specification FileURI
apiFilesAPI Spec files to parse (comma separated)List
roleProject role: provider/clientAsyncapiRoleTypeproviderprovider, client
styleProgramming styleProgrammingStyleimperativeimperative, reactive
modelPackageJava Models package nameString
producerApiPackageJava API package name for outbound (producer) services. It can override apiPackage for producers.String{{apiPackage}}
consumerApiPackageJava API package name for inbound (consumer) services. It can override apiPackage for consumer.String{{apiPackage}}
apiPackageJava API package, if producerApiPackage and consumerApiPackage are not set.String
transactionalOutboxTransactional outbox type for message producers.TransactionalOutboxTypenonenone, modulith, mongodb, jdbc
bindingPrefixSC Streams Binding Name Prefix (used in @Component name)String
bindingSuffixSpring-Boot binding suffix. It will be appended to the operation name kebab-cased. E.g. <operation-id>-in-0String-0
generatedAnnotationClassAnnotation class to mark generated code (e.g. org.springframework.aot.generate.Generated). When retained at runtime, this prevents code coverage tools like Jacoco from including generated classes in coverage reports.String
targetFolderTarget folder to generate code to. If left empty, it will print to stdout.File
modelNamePrefixSets the prefix for model classes and enumsString
modelNameSuffixSets the suffix for model classes and enumsString
runtimeHeadersPropertyAsyncAPI extension property name for runtime auto-configuration of headers.Stringx-runtime-expression
includeApplicationEventListenerInclude ApplicationEvent listener for consuming messages within the modulith.booleanfalse
skipProducerImplementationGenerate only the producer interface and skip the implementation.booleanfalse
exposeMessageWhether to expose underlying spring Message to consumers or not.booleanfalse
useEnterpriseEnvelopeInclude support for enterprise envelop wrapping/unwrapping.booleanfalse
envelopeJavaTypeExtensionNameAsyncAPI Message extension name for the envelop java type for wrapping/unwrapping.Stringx-envelope-java-type
methodAndMessageSeparatorTo avoid method erasure conflicts, when exposeMessage or reactive style this character will be used as separator to append message payload type to method names in consumer interfaces.String$
consumerPrefixSC Streams Binder class prefixString
consumerSuffixSC Streams Binder class suffixStringConsumer
consumerServicePrefixBusiness/Service interface prefixStringI
consumerServiceSuffixBusiness/Service interface suffixStringConsumerService
includeKafkaCommonHeadersInclude Kafka common headers 'kafka_messageKey' as x-runtime-headerbooleanfalse
bindingTypesBinding names to include in code generation. Generates code for ALL bindings if left emptyList
operationIdsOperation ids to include in code generation. Generates code for ALL if left emptyList[]
excludeOperationIdsOperation ids to exclude in code generation. Skips code generation if is not included or is excluded.List[]
formatterCode formatter implementationFormatterspalantirpalantir, spring, google
skipFormattingSkip java sources output formattingbooleanfalse
haltOnFailFormattingHalt on formatting errorsbooleantrue

Advanced Features

Transactional Outbox Pattern

ZenWave SDK supports sending messages transactionaly using the Transactional Outbox Pattern.

Transactional Outbox with AsyncAPI and Spring Modulith

See Implementing a Transactional OutBox With AsyncAPI, SpringModulith and ZenWaveSDK for complete details.

Populating Headers at Runtime Automatically

ZenWave SDK provides x-runtime-expression for automatic header population at runtime. Values for this extension property are:

  • $message.payload#/<json pointer fragment>: follows the same format as AsyncAPI Correlation ID object.
  • $message.payload#{ <SpEL expression> }: will use the SpEL expression to populate the header value.
  • $supplierBeanName: will use a bean named supplierBeanName (you can use any other name) of type java.function.Supplier configured in your Spring context.
CustomerEventMessage:
name: CustomerEventMessage
// [...] other properties omitted for brevity
headers:
type: object
properties:
kafka_messageKey:
type: string
description: This one will be populated automatically at runtime
x-runtime-expression: $message.payload#/customer/id
tracingId:
type: string
description: This one will be populated automatically at runtime
x-runtime-expression: $supplierBeanName
# CloudEvents Attributes:
# these examples showcase how you can use SpEL expressions to populate runtime headers
ce-id:
type: string
description: Unique identifier for the event
x-runtime-expression: $message.payload#{#this.id}
ce-source:
type: string
description: URI identifying the context where event happened
x-runtime-expression: $message.payload#{"CustomersService"}
ce-specversion:
type: string
description: CloudEvents specification version
x-runtime-expression: $message.payload#{"1.0"}
ce-type:
type: string
description: Event type
x-runtime-expression: $message.payload#{#this.getClass().getSimpleName()}
ce-time:
type: string
description: Timestamp of when the event happened
x-runtime-expression: $message.payload#{T(java.time.Instant).now().toString()}

You can also override the runtimeHeadersProperty extension property name (in the rare case you need to):

<configOption>
<runtimeHeadersProperty>x-custom-runtime-expression</runtimeHeadersProperty><!-- you can also override this extension property name -->
</configOption>

And provide a bean of type java.function.Supplier in your Spring context:

@Bean("supplierBeanName")
public Supplier supplierBeanName() {
return () -> "some-value";
}

InMemory Events Producer for Tests (Mocks)

// autogenerate in: target/generated-sources/zenwave/src/test/java/.../InMemoryCustomerOrderEventsProducer.java
public class InMemoryCustomerOrderEventsProducer implements ICustomerOrderEventsProducer {
protected Map<String, List<Message>> capturedMessages = new HashMap<>();
public Map<String, List<Message>> getCapturedMessages() {
return capturedMessages;
}
// other details omitted for brevity
/**
* CustomerOrder Domain Events
*/
public boolean onCustomerOrderEvent(CustomerOrderEventPayload payload, CustomerOrderEventPayloadHeaders headers) {
log.debug("Capturing message to topic: {}", onCustomerOrderEventBindingName);
Message message = MessageBuilder.createMessage(payload, new MessageHeaders(headers));
return appendCapturedMessage(onCustomerOrderEventBindingName, message);
}
}
// autogenerated in: target/generated-sources/zenwave/src/test/java/.../ProducerInMemoryContext.java
public class ProducerInMemoryContext {
public static final ProducerInMemoryContext INSTANCE = new ProducerInMemoryContext();
private CustomerEventsProducerCaptor customerEventsProducerCaptor = new CustomerEventsProducerCaptor();
public <T extends ICustomerEventsProducer> T customerEventsProducer() {
return (T) customerEventsProducerCaptor;
}
}

Routing Business Exceptions to Dead Letter Queues with Configuration

When consuming Events you can route different business exceptions to different Dead Letter Queues bindings using the dead-letter-queue-error-map.

This mechanism is useful when you know an exception is not retrayable, and you want to route it to a different DLQ.

When no matching exception is found in dead-letter-queue-error-map, the exception will propagate up the call stack, allowing standard retry and error handling mechanisms to take effect.

# application.yml
spring:
cloud:
stream:
bindings:
do-create-customer-in-0:
destination: customer.requests
content-type: application/json
dead-letter-queue-error-map: >
{
'jakarta.validation.ValidationException': 'do-create-customer-validation-error-out-0',
'java.lang.Exception': 'do-create-customer-error-out-0'
}

Generating Consumer Adapters (Skeletons)

jbang zw -p io.zenwave360.sdk.plugins.SpringCloudStreams3AdaptersPlugin --help
jbang zw -p io.zenwave360.sdk.plugins.SpringCloudStreams3AdaptersPlugin \
specFile=src/main/resources/model/asyncapi.yml \
zdlFile=src/main/resources/model/orders-model.jdl \
role=provider \
style=imperative \
basePackage=io.zenwave360.example \
consumerApiPackage=io.zenwave360.example.adapters.events \
modelPackage=io.zenwave360.example.core.domain.events \
targetFolder=.
@Component
public class DoCustomerRequestConsumerServiceAdapter implements IDoCustomerRequestConsumerService {
private EventEntityMapper mapper;
// TODO: private EntityUseCases service;
@Autowired
public void setEventEntityMapper(EventEntityMapper mapper) {
this.mapper = mapper;
}
/** Customer Async Requests */
public void doCustomerRequest(CustomerRequestPayload payload, CustomerRequestPayloadHeaders headers) {
// TODO: service.doCustomerRequest(mapper.asEntity(payload));
};
}

Consumer Adapters API Tests

// generated and editable in: src/test/java/.../adapters/events/DoCustomerRequestConsumerServiceIT.java
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
@org.springframework.transaction.annotation.Transactional
public class DoCustomerRequestConsumerServiceIT extends BaseConsumerTest {
@Autowired public IDoCustomerRequestConsumerService consumerService;
/** Test for doCustomerRequest: */
@Test
public void doCustomerRequestTest() {
CustomerRequestPayload payload = new CustomerRequestPayload();
payload.setCustomerId(null);
payload.setRequestType(null);
payload.setCustomer(null);
CustomerRequestPayloadHeaders headers = new CustomerRequestPayloadHeaders();
// invoke the method under test
consumerService.doCustomerRequest(payload, headers);
// perform your assertions here
}
}

Options for Consumer Adapters

OptionDescriptionTypeDefaultValues
specFileAPI Specification FileURI
specFilesZDL files to parseString[][]
targetFolderTarget folder to generate code to. If left empty, it will print to stdout.File
styleProgramming styleProgrammingStyleimperativeimperative, reactive
roleProject role: provider/clientAsyncapiRoleTypeproviderprovider, client
exposeMessageWhether to expose underlying spring Message to consumers or not.booleanfalse
apiIdUnique identifier of each AsyncAPI that you consume as a client or provider. It will become the last package token for generated adaptersStringcommands
basePackageApplications base packageString
adaptersPackageThe package to generate Async Inbound Adapters inString{{basePackage}}.adapters.events.{{apiId}}
inboundDtosPackagePackage where your inbound dtos areString{{basePackage}}.core.inbound.dtos
servicesPackagePackage where your domain services/usecases interfaces areString{{basePackage}}.core.inbound
apiPackageJava API package, if producerApiPackage and consumerApiPackage are not set.String
modelPackageJava Models package nameString
producerApiPackageJava API package name for outbound (producer) services. It can override apiPackage for producers.String{{apiPackage}}
consumerApiPackageJava API package name for inbound (consumer) services. It can override apiPackage for consumer.String{{apiPackage}}
bindingTypesBinding names to include in code generation. Generates code for ALL bindings if left emptyList
operationIdsOperation ids to include in code generation. Generates code for ALL if left emptyList[]
runtimeHeadersPropertyAsyncAPI extension property name for runtime auto-configuration of headers.Stringx-runtime-expression
continueOnZdlErrorContinue even when ZDL contains fatal errorsbooleantrue
inputDTOSuffixShould use same value configured in BackendApplicationDefaultPlugin. Whether to use an input DTO for entities used as command parameter.String
baseTestClassNameBaseConsumerTest class nameStringBaseConsumerTest
baseTestClassPackageBaseConsumerTest packageString{{basePackage}}.adapters.events
transactionalAnnotate tests as @Transactionalbooleantrue
transactionalAnnotationClass@Transactional annotation class nameStringorg.springframework.transaction.annotation.Transactional
methodAndMessageSeparatorTo avoid method erasure conflicts, when exposeMessage or reactive style this character will be used as separator to append message payload type to method names in consumer interfaces.String$
consumerPrefixSC Streams Binder class prefixString
consumerSuffixSC Streams Binder class suffixStringConsumer
bindingPrefixSC Streams Binding Name Prefix (used in @Component name)String
servicePrefixBusiness/Service interface prefixStringI
serviceSuffixBusiness/Service interface suffixStringConsumerService
bindingSuffixSpring-Boot binding suffix. It will be appended to the operation name kebab-cased. E.g. <operation-id>-in-0String-0
formatterCode formatter implementationFormattersspringgoogle, palantir, spring, eclipse
skipFormattingSkip java sources output formattingbooleanfalse
haltOnFailFormattingHalt on formatting errorsbooleantrue