Consuming and producing events using Apache Kafka in Quarkus
This document describes how you can configure a workflow to interact with Apache Kafka. Apache Kafka uses an abstraction called topic to classify events. Events are published to a topic, and events are consumed from a topic.
SonataFlow uses Smallrye connectors to encapsulate access to several brokers, enabling SonataFlow to support different brokers by changing the configuration and classpath dependencies. However, Smallrye connectors also introduce an entity called channel. The Smallrye channels are unidirectional and need to be declared as input (
incoming) or output (
outgoing). When using Apache Kafka, the Smallrye channels must be mapped to Apache Kafka topics through configuration.
You need to add the Kafka Quarkus Smallrye connector dependency to indicate that you are using Apache Kafka. To enable a workflow to use Apache Kafka Smallrye connector, add the following dependency to the
pom.xml file of your project if using Apache Maven:
The messaging capabilities are included in the Quarkus Serverless Workflow extension, even though the messaging capabilities are optional. This means you do not need to explicitly add the messaging add-on dependency when using SonataFlow.
You can configure Smallrye channels for a workflow using event definitions. The Smallrye channels are defined using Quarkus configuration. The format for Smallrye channel properties is
SonataFlow allows the following channel mapping strategies:
Define one default incoming channel to receive all the incoming messages and one default outgoing channel to store all the published messages.
Define a channel for each CloudEvent type so that every message type has a dedicated channel.
Define a channel for certain CloudEvent types. The non-mapped CloudEvent types uses the default incoming or outgoing channel.
SonataFlow first searches for a channel name in the properties that is same as CloudEvent type. If the channel name is found, SonataFlow uses the channel for that CloudEvent type. However, if the channel name is not found, SonataFlow searches for the default channel definition. In case the default channel definition is not existing, then an error is reported.
The default incoming channel is
kogito_incoming_stream and the default outgoing channel is
To change the default incoming and outgoing topic names, you can use the following properties:
kogito.addon.messaging.incoming.defaultName=<default channel name>
kogito.addon.messaging.outgoing.defaultName=<default channel name>
The following properties are mandatory for each channel:
connector: This property needs to be set to
Depending on whether the channel is incoming or outgoing:
value.deserializer: This property is used for incoming channels. Unless you have specific marshaling requirements, you must set this property to
value.serializer: This property is used for outgoing channels. Unless you have specific marshaling requirements, you must set this property to
For more information about the properties that you can use for a channel when using Apache Kafka connector, see the properties list.
This section describes examples of channel mapping, using the properties mentioned in the previous section.
- One Smallrye channel per CloudEvent type
serverless-workflow-callback-quarkusexample application uses two CloudEvent types, including
Kafka topic names match the CloudEvent types. Therefore, it is suitable to select the mapping strategy of one channel per CloudEvent type. This suggests that two channels must be configured. Note that you do not need to set the optional
topicproperty as the channel name matches the CloudEvent type.Example property configuration
mp.messaging.incoming.wait.connector=smallrye-kafka mp.messaging.incoming.wait.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer mp.messaging.outgoing.resume.connector=smallrye-kafka mp.messaging.outgoing.resume.value.serializer=org.apache.kafka.common.serialization.StringSerializer
- Default channel mapping
serverless-workflow-events-quarkusexample application uses two CloudEvent types, including
As mentioned before, Kafka topic names match the CloudEvent types. However, to use the default channel functionality,
kogito_outgoing_streamare used as channel names. As explained before, since there is no specific channel name for the CloudEvent type, then the default channels are used. Also, the default channels need to be mapped to the desired topic name using the
topicproperty.Example property configuration
mp.messaging.incoming.kogito_incoming_stream.connector=smallrye-kafka mp.messaging.incoming.kogito_incoming_stream.topic=applicants mp.messaging.incoming.kogito_incoming_stream.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer mp.messaging.outgoing.kogito_outgoing_stream.connector=smallrye-kafka mp.messaging.outgoing.kogito_outgoing_stream.topic=decisions mp.messaging.outgoing.kogito_outgoing_stream.value.serializer=org.apache.kafka.common.serialization.StringSerializer
Kogito will annotate generated smallrye
emitters for a particular channel using the information provided by a property of the form
kogito.addon.messaging.emitter.<channel_name>.overflow-strategy. The possible values are
DROP. If the value of the strategy is
BUFFER, then you must specify the buffer size by using the property of the form
Therefore, for the
wait channel, in the previous example, if we want to buffer as many as 100 events we will add these two properties
If all your channels use the same strategy and this strategy differs from the
BUFFER one (you can change buffer size globally by using the
mp.messaging.emitter.default-buffer-size smallrye property). You can set it up by using
If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!