Consuming and producing events on Knative Eventing in Quarkus
This document describes how you can configure a workflow to interact with Knative Eventing.
Knative Eventing abstracts the events consumption through event source and sink components. An event source is a Kubernetes object that produces the event, and a sink is another Kubernetes object that receives the event. The workflow application can act as a source, a sink, or both in Knative Eventing platform.
You need to add the Kogito Knative Eventing add-on dependency to indicate that you are using Knative Eventing. To enable a workflow to use Knative Eventing, add the following dependency to the pom.xml
file of your project:
mvn quarkus:add-extension -Dextensions="kie-addons-quarkus-knative-eventing"
quarkus extension add kie-addons-quarkus-knative-eventing
<dependency>
<groupId>org.kie</groupId>
<artifactId>kie-addons-quarkus-knative-eventing</artifactId>
</dependency>
If you have used the Knative workflow CLI to create your project, then the Kogito Knative Eventing extension is already present. For more information about creating a project using Knative workflow CLI, see SonataFlow plug-in for Knative CLI. |
The Kogito Knative Eventing add-on takes care of the required dependencies and additional configuration that the workflow application needs, to interact with the Knative Eventing platform.
Although the default configuration that the Quarkus Knative Eventing add-on provides ought to be enough for most of the use cases, sometimes you might need to do additional configuration to serve a specific scenario.
Knative Eventing add-on source configuration
The configuration described in this section is useful if your workflow consists of at least one produced
type event definition. In this scenario, the workflow application produces events that act as a Knative source.
- HTTP transport configuration
-
Knative injects the
K_SINK
environment variable in the workflow application when you deploy the application in the cluster. SonataFlow uses theK_SINK
environment variable to address the produced events to the correct Knative sink. For more information, see SinkBinding in Knative Eventing documentation.The following table lists the HTTP transport configuration properties:
Table 1. HTTP transport configuration properties Property Default value Description mp.messaging.outgoing.kogito_outgoing_stream.url
${K_SINK:http://localhost:9090}
This property indicates where to POST the HTTP CloudEvent message.
mp.messaging.outgoing.kogito_outgoing_stream.connector
quarkus-http
This property indicates the Quarkus Smallrye channel implementation. You might not need to change this property.
If the
K_SINK
variable is not present, then the default value ishttp://localhost:9000
. You can override the default value in development environments. - Health check configuration
-
By default, the workflow application generates a health check probe to verify if the Knative platform injected a valid
K_SINK
variable. If theK_SINK
variable is not present, then the pod is not in the state of receiving requests.The following table lists the health check probe configuration property:
Table 2. Health check probe configuration property Property Default value Description org.kie.kogito.addons.knative.eventing.health_enabled
true
This property indicates if the health check is enabled to verify that the
K_SINK
variable is injected into the environment. - Knative target sink generation configuration
-
The Kogito Knative Eventing add-on generates a few Knative objects during build time. By default, the add-on generates a Knative Broker named
default
if the workflow application is acting as an event source.The following table lists the configuration properties related to Knative sink generation:
Table 3. Knative sink generation configuration properties Property Default value Description org.kie.kogito.addons.knative.eventing.auto_generate_broker
true
This property indicates if the Kogito Knative Eventing add-on generates a default Knative Broker in memory to sink and dispatch the messages. Set this property to
false
in case a broker is already installed in your namespace. Note that you can useorg.kie.kogito.addons.knative.eventing.sink.*
property to configure your custom sink. If this property is not set, then the auto-generated broker works as a sink.org.kie.kogito.addons.knative.eventing.sink.namespace
This property indicates the namespace where the generated Knative sink is deployed.
org.kie.kogito.addons.knative.eventing.sink.api_version
eventing.knative.dev/v1
This property indicates the API group and version of the generated Knative sink.
org.kie.kogito.addons.knative.eventing.sink.name
default
This property indicates the name of the generated Knative sink.
org.kie.kogito.addons.knative.eventing.sink.kind
Broker
This property indicates the Kubernetes kind of the generated Knative sink.
Knative Eventing add-on sink configuration
The configuration described in this section is useful if your workflow consists of at least one consumed
type event definition. In this scenario, the workflow application consumes events, acting as a Knative sink.
When the workflow application needs to consume events, the Knative Eventing add-on generates Knative triggers. The Knative triggers are configured to listen to a broker with the required event type, which is defined in your workflow definition.
The following table lists the configuration property related to Knative sink generation:
Property | Default value | Description |
---|---|---|
|
|
This property indicates the name of the default Knative broker that is deployed in the Kubernetes namespace. This broker is used as the reference to create the Knative triggers, which are responsible to delegate the events that the workflow service consumes. |
|
|
This property indicates the HTTP path where the workflow application will listen for the CloudEvents in the default incoming channel. |
|
|
This property indicates the HTTP path where the workflow application will listen for the CloudEvents in the specific given channel name. The channel name is the event |
Manually sending events to an HTTP endpoint
You can send HTTP CloudEvents to the workflow application endpoint by using any tool that’s capable to produce HTTP requests. The only requirement is that the request conforms to the CloudEvents specification.
For example, with the help of curl
, you can send an event to the workflow using the following command:
curl -X POST \
-H 'Content-Type: application/cloudevents+json' \
-d '{"datacontenttype": "application/json", "specversion":"1.0","id":"41495513-a9ef-4a81-8479-21bb14db61f0","source":"/local/curl","type":"kogito.serverless.loanbroker.aggregated.quotes.response","data": { "amount": 300000, "term": 30, "credit": { "score": 700, "history": 15 }, "quotes": [{ "bankId": "Bank1", "rate": 12.2 }, {"bankId": "Bank2", "rate": 10}] } } ' \
http://localhost:8080
In this example we are using the CloudEvents structured format, which includes every event information within the request payload. Note the header Content-Type
being application/cloudevents+json
.
Alternatively, you can use the CloudEvents binary format, which includes the event metadata in the HTTP header. For example, using the same event as before:
curl -X POST -i \
-H 'Content-Type: application/json' \
-H 'ce-specversion: 1.0' \
-H 'ce-id: 41495513-a9ef-4a81-8479-21bb14db61f0' \
-H 'ce-source: /local/curl' \
-H 'ce-type: kogito.serverless.loanbroker.aggregated.quotes.response' \
-d '{ "amount": 300000, "term": 30, "credit": { "score": 700, "history": 15 }, "quotes": [{ "bankId": "Bank1", "rate": 12.2 }, {"bankId": "Bank2", "rate": 10}] }' \
http://localhost:8080/
You can use this tool to test your SonataFlow application locally and verify if the events are being consumed correctly by the workflow.
For more information about testing incoming and outgoing CloudEvents over HTTP, see Mocking HTTP CloudEvents sink using WireMock.
Generating Knative objects during build time
SonataFlow can generate Knative objects during the workflow application build time to facilitate the deployment in a Kubernetes cluster. However, you do not need to use the generated objects if you plan to create and deploy the Knative objects by yourself.
-
A workflow application with the Knative Eventing add-on is created.
-
Add the following Quarkus Kubernetes extension dependency to the
pom.xml
file of your project:mvn quarkus:add-extension -Dextensions="quarkus-kubernetes,quarkus-container-image-jib"
quarkus extension add quarkus-kubernetes quarkus-container-image-jib
<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-kubernetes</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-container-image-jib</artifactId> </dependency>
-
Add the
quarkus.kubernetes.deployment-target=knative
property to yourapplication.properties
file. -
Build your workflow application using the following command:
mvn clean install
quarkus build
kn workflow build --image=<name>
The
target/kubernetes
directory contains two files includingknative.yml
andkogito.yml
. Theknative.yml
file contains the Knative service representing the workflow application. Thekogito.yml
file contains the required objects to connect the workflow application to the Knative Eventing platform. -
You can use the generated files to deploy the workflow application in the Kubernetes cluster using the following command:
kubectl apply -f target/kogito.yml kubectl apply -f target/knative.yml
kn workflow deploy
For more information about building and deploying the workflow application, see Building workflow images using Quarkus CLI.
Example of workflow event definition in Knative
A workflow must contain at least one event definition for the Knative Eventing add-on to generate the event binding objects. The following is an example of a workflow containing produced and consumed events:
{
"events": [
{
"name": "requestQuote",
"type": "kogito.sw.request.quote",
"kind": "produced"
},
{
"name": "aggregatedQuotesResponse",
"type": "kogito.loanbroker.aggregated.quotes.response",
"kind": "consumed",
"source": "/kogito/serverless/loanbroker/aggregator"
}]
}
A workflow application with events definition needs a Knative SinkBinding
to configure the target sink. The target sink is where the produced events (kogito.sw.request.quote
event in the previous example) are dispatched. In this case, the Knative Eventing add-on generates an object as shown in the following example:
SinkBinding
generated by the add-onapiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
name: sb-loanbroker-flow
spec:
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
namespace: ""
subject:
apiVersion: serving.knative.dev/v1
kind: Service
name: loanbroker-flow
Regardless of the number of produced events in the workflow definition, only one |
For the kogito.loanbroker.aggregated.quotes.response
event in a previous example, the Knative Eventing platform must be configured with a Knative trigger using an appropriate CloudEvent filter. The following example shows the Trigger
generated by the Knative Eventing add-on:
Trigger
generated by the add-onapiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: kogito-serverless-loanbroker-aggregated-quotes-response-trigger
spec:
broker: default
filter:
attributes:
type: kogito.loanbroker.aggregated.quotes.response
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: loanbroker-flow
For each consumed event definition, the Knative Eventing add-on generates one Knative Trigger
.
Found an issue?
If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!