Consuming Async Commands
Listening for Async Requests and Third-Party Events
When consuming Event-Driven messages we need to differentiate between:
- Consuming Third-Party Async Domain Events
- Listening for Async Command/Requests we are the owner
Because the semantic of a Command and an Event is quite different, we need to handle them differently.
- A Command is a request to start an action. It may be rejected so the action may not start at all.
- An Event is a notification that something has happened. It is not a request, it is a notification of a fact that already happen on a given system.
This distinction is paramount because a Command can have be requested by many different applications but should be listened only by one application. And on the other hand a Domain Event can be produced only by one application, but it can be listened by an unlimited number of applications.
So we need to differentiate between the provider/client of a given functionality. Not to be confused with the producer/consumer of a given message.
π A provider produces Domain Events and consumes Command Request. A client consumes Domain Events and produces Command Requests.
We recommend not to mix in the same AsyncAPI definition functionality that is provided with other functionality that is just consumed from third party.
With ZenWave AsyncAPI API-First Plugin you can just define AsyncAPI definitions once for the perspective of the provider. And use the provider's AsyncAPI definition to generate the client's AsyncAPI definition using the client
role.
Or create two different AsyncAPI definitions, one for the provider and one for the client. But just don't mix them together in the same AsyncAPI definition file.
Using ZenWave ZDL as Definition Language for AsyncAPI
You can use ZDL To AsyncAPI Generator to generate AsyncAPI definitions from ZDL Models with a command like this:
jbang zw -p io.zenwave360.sdk.plugins.ZDLToAsyncAPIPlugin \specFile=orders-model.zdl \idType=integer \idTypeFormat=int64 \targetFile=src/main/resources/apis/asyncapi.yml
But before that, first you need to define in your ZDL Model:
- The APIs you are using, and whether you are the provider or the client of the API.
- Define a Service for your Domain Aggregate holding inbound commands and outbound events.
- Annotate with
@asyncapi
your inbound command documenting the API and the channel you will be listening to.- If you specify an
api
it will be used to determine if you are the provider (listening for a Command) or the client of the API (listening for third-party Events). Noapi
means you are the provider.
- If you specify an
- Annotate with
@asyncapi
your outbound events documenting the channel you will be publishing to. - You can specify the
topic
for messages of APIs you are the provider. It will be added as the channel's address in your AsyncAPI definition (only for version 3).
// apis section at the beginning of your ZDL Modelapis {asyncapi(provider) default {uri "orders/src/main/resources/apis/asyncapi.yml"}asyncapi(client) RestaurantsAsyncAPI {uri "restaurants/src/main/resources/apis/asyncapi.yml"}asyncapi(client) DeliveryAsyncAPI {uri "delivery/src/main/resources/apis/asyncapi.yml"}}
Notice how we specify third-party events (1), async commands (2) and domain events (3):
service OrdersService for (CustomerOrder) {// 1) we are listening to a third-party event, because we are a client of 'api: RestaurantsAsyncAPI'@asyncapi({api: RestaurantsAsyncAPI, channel: "KitchenOrdersStatusChannel"})updateKitchenStatus(id, KitchenStatusInput) CustomerOrder withEvents OrderStatusUpdated// 2) we are listening for a command, because we didn't specify an api (or if the api specified was of type 'provider')@asyncapi({channel: "CancelOrdersChannel", topic: "orders.cancel_orders"})cancelOrder(id, CancelOrderInput) CustomerOrder withEvents OrderStatusUpdated}// 3) here we are producing events, informing of a fact that happened on our system@asyncapi({channel: "OrderUpdatesChannel", topic: "orders.order_updates"})event OrderStatusUpdated {}
From the above ZDL definition, ZenWave ZDLToAsyncAPI Plugin will generate operations just for those messages 'OrdersService' is the provider:
operations:doCancelOrder:action: receivechannel:$ref: '#/channels/CancelOrdersChannel'onOrderStatusUpdated:action: sendchannel:$ref: '#/channels/OrderUpdatesChannel'
It will not generate operations for those messages OrdersService
is acting as a client, like listening for third-party events like { api: RestaurantsAsyncAPI, channel: "KitchenOrdersStatusChannel" }
.
API-First Code Generator from AsyncAPI
You can use API-First AsyncAPI Maven Plugin to generate models (DTOs) and a Listener implementation with error handling and other features.
By default, ZenWave generates Spring Cloud Streams implementation that counts with different binders for virtually any message broker.
With this schema you just need to:
- Provide an implementation for the
IOnOrderEventConsumer
interface generated. - Add
OnOrderEventConsumer
to your Spring Boot context and set any required configuration forspring.streams.bindings
inapplication.yml
.
π¦ target/generated-sources/zenwaveπ¦ src/main/javaββ <modelPackage> models (DTOs)ββ OrderDTO.javaββ <consumerPackage>ββ IOnOrderEventConsumer (interface you need to implement)ββ OnOrderEventConsumer (spring-cloud-streams consumer)
IOnOrderEventConsumer
is just a business interface without dependencies on any framework or library and retaining the nomenclature of your AsyncAPI definition (and Domain):
@jakarta.annotation.Generated(value = "io.zenwave360.sdk.plugins.SpringCloudStreams3Plugin")public interface IOnOrderEventConsumerService {void onOrderEvent(OrderEvent payload, OrderEventHeaders headers);}
Once you provide an implementation of this interface (in your Spring Boot context), you just need to set any required configuration for spring.streams.bindings
in application.yml
:
spring:stream:bindings:do-cancel-order-in-0: ## you can find this name in OnOrderEventConsumerdestination: orders.cancel_ordersgroup: orders.consumer
You can consult how to configure API-First AsyncAPI Maven Plugin in Producing Domain Events section.
Just note, in regards the distinction from commands and events, provider and clients. You can configure as many plugin <execution/>
as you need, either in role
provider
or client
.
Generating Async Listeners and Tests
You can use AsyncAPI Spring Cloud Streams 3 Adapter Generator to generate Async Adapters (stubs) and Tests from AsyncAPI definitions with a command like this:
jbang zw -p io.zenwave360.sdk.plugins.SpringCloudStreams3AdaptersPlugin --help
jbang zw -p io.zenwave360.sdk.plugins.SpringCloudStreams3AdaptersPlugin \specFile=src/main/resources/apis/asyncapi.yml \zdlFile=src/main/resources/model/orders-model.jdl \role=provider \style=imperative \basePackage=io.zenwave360.example \consumerApiPackage=io.zenwave360.example.adapters.events \modelPackage=io.zenwave360.example.core.domain.events \targetFolder=.
This will generate Command Listeners and a Test for each operation in your AsyncAPI definition. Depending on configured role
: it will generate Command Listeners for subscribe/receive
operations for role provider
and publish/send
operations for role client
.
@Slf4j@Servicepublic class DoCancelOrderConsumerService implements IDoCancelOrderConsumerService {@Overridepublic void doCancelOrder(CancelOrderInput payload, CancelOrderInputHeaders headers) {log.debug("Received CancelOrderInput: {}", payload);// TODO: implement this functionality}}
/*** Integration tests for {@link IOnOrderEventConsumerService}.*/public class OnOrderEventConsumerService extends BaseConsumerTest {@Autowiredpublic IOnOrderEventConsumerService consumerService;/*** Test for onOrderEvent:*/@Testpublic void onOrderEventTest() {OrderEvent payload = new OrderEvent();OrderEventHeaders headers = new OrderEventHeaders();// invoke the method under testconsumerService.onOrderEvent(payload, headers);// perform your assertions here}}