Consuming and producing events using Apache Kafka
This document describes how you can configure a workflow to interact with Apache Kafka. Apache Kafka uses an abstraction called topic to classify events. Events are published to a topic, and events are consumed from a topic.
SonataFlow uses Smallrye connectors to encapsulate access to several brokers, enabling SonataFlow to support different brokers by changing the configuration and classpath dependencies. However, Smallrye connectors also introduce an entity called channel. The Smallrye channels are unidirectional and need to be declared as input (incoming
) or output (outgoing
). When using Apache Kafka, the Smallrye channels must be mapped to Apache Kafka topics through configuration.
You need to add the Kafka Quarkus Smallrye connector dependency to indicate that you are using Apache Kafka. To enable a workflow to use Apache Kafka Smallrye connector, add the following dependency to the pom.xml
file of your project if using Apache Maven:
pom.xml
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
The messaging capabilities are included in the Quarkus Serverless Workflow extension, even though the messaging capabilities are optional. This means you do not need to explicitly add the messaging add-on dependency when using SonataFlow.
Smallrye channels configuration for a workflow
You can configure Smallrye channels for a workflow using event definitions. The Smallrye channels are defined using Quarkus configuration. The format for Smallrye channel properties is mp.messaging.[incoming|outgoing].<channel name>.<property_name>
.
SonataFlow allows the following channel mapping strategies:
-
Define one default incoming channel to receive all the incoming messages and one default outgoing channel to store all the published messages.
-
Define a channel for each CloudEvent type so that every message type has a dedicated channel.
-
Define a channel for certain CloudEvent types. The non-mapped CloudEvent types uses the default incoming or outgoing channel.
SonataFlow first searches for a channel name in the properties that is same as CloudEvent type. If the channel name is found, SonataFlow uses the channel for that CloudEvent type. However, if the channel name is not found, SonataFlow searches for the default channel definition. In case the default channel definition is not existing, then an error is reported.
The default incoming channel is kogito_incoming_stream
and the default outgoing channel is kogito_outgoing_stream
.
To change the default incoming and outgoing topic names, you can use the following properties:
kogito.addon.messaging.incoming.defaultName=<default channel name>
kogito.addon.messaging.outgoing.defaultName=<default channel name>
The following properties are mandatory for each channel:
-
connector
: This property needs to be set tosmallrye-kafka
. -
Depending on whether the channel is incoming or outgoing:
-
value.deserializer
: This property is used for incoming channels. Unless you have specific marshaling requirements, you must set this property toorg.apache.kafka.common.serialization.ByteArrayDeserializer
ororg.apache.kafka.common.serialization.StringDeserializer
. -
value.serializer
: This property is used for outgoing channels. Unless you have specific marshaling requirements, you must set this property toorg.apache.kafka.common.serialization.ByteArraySerializer
ororg.apache.kafka.common.serialization.StringSerializer
.
-
|
For more information about the properties that you can use for a channel when using Apache Kafka connector, see the properties list.
Examples of Smallrye channel mapping
This section describes examples of channel mapping, using the properties mentioned in the previous section.
- One Smallrye channel per CloudEvent type
-
The
serverless-workflow-callback-quarkus
example application uses two CloudEvent types, includingwait
(incoming) andresume
(outgoing).Kafka topic names match the CloudEvent types. Therefore, it is suitable to select the mapping strategy of one channel per CloudEvent type. This suggests that two channels must be configured. Note that you do not need to set the optional
topic
property as the channel name matches the CloudEvent type.Example property configurationmp.messaging.incoming.wait.connector=smallrye-kafka mp.messaging.incoming.wait.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer mp.messaging.outgoing.resume.connector=smallrye-kafka mp.messaging.outgoing.resume.value.serializer=org.apache.kafka.common.serialization.StringSerializer
- Default channel mapping
-
The
serverless-workflow-events-quarkus
example application uses two CloudEvent types, includingapplicants
(incoming) anddecisions
(outgoing).As mentioned before, Kafka topic names match the CloudEvent types. However, to use the default channel functionality,
kogito_incoming_stream
andkogito_outgoing_stream
are used as channel names. As explained before, since there is no specific channel name for the CloudEvent type, then the default channels are used. Also, the default channels need to be mapped to the desired topic name using thetopic
property.Example property configurationmp.messaging.incoming.kogito_incoming_stream.connector=smallrye-kafka mp.messaging.incoming.kogito_incoming_stream.topic=applicants mp.messaging.incoming.kogito_incoming_stream.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer mp.messaging.outgoing.kogito_outgoing_stream.connector=smallrye-kafka mp.messaging.outgoing.kogito_outgoing_stream.topic=decisions mp.messaging.outgoing.kogito_outgoing_stream.value.serializer=org.apache.kafka.common.serialization.StringSerializer
OnOverflow handling
Smallrye provide means to manage emitter overflow through OnOverflow annotation
Kogito will annotate generated smallrye emitters
for a particular channel using the information provided by a property of the form kogito.addon.messaging.emitter.<channel_name>.overflow-strategy
. The possible values are BUFFER
, NONE
, UNBOUNDED
, FAIL
, and DROP
. If the value of the strategy is BUFFER
, then you must specify the buffer size by using the property of the form kogito.addon.messaging.emitter.<channel_name>.buffer-size
Therefore, for the wait
channel, in the previous example, if we want to buffer as many as 100 events we will add these two properties
kogito.addon.messaging.emitter.wait.overflow-strategy=BUFFER
kogito.addon.messaging.emitter.wait.buffer-size=100
If all your channels use the same strategy and this strategy differs from the BUFFER
one (you can change buffer size globally by using the mp.messaging.emitter.default-buffer-size
smallrye property). You can set it up by using kogito.addon.messaging.emitter.overflow-strategy=NONE|UNBOUNDED|FAIL|DROP
Found an issue?
If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!