Example of SonataFlow application using OpenAPI callback events with Quarkus

To understand the OpenAPI Callback, see the serverless-workflow-callback-events-over-http-quarkus example application in GitHub repository.

This example contains a simple workflow-service that illustrates callback state using OpenAPI callbacks functionality. A callback is a state that invokes an action and wait for an event (event that will be eventually fired by the external service notified by the action). This example consists of a callback state that waits for an event to arrive at the wait channel. Its action calls an external service named callback-event-service that publishes the wait event over HTTP. After consuming the wait event, the workflow prints the message received in the wait event and ends the workflow.

The serverless-workflow-callback-events-over-http-quarkus application is initiated with the following request to http://localhost:8080/callback:

{
  "message": "Hello"
}

Once the workflow is started, it makes an external service call with the callback URL and the workflow instance ID in the request body to callback-event-service. Then, as configured in the OpenAPI file, the callback URL is invoked to send a CloudEvent to the workflow.

Once the wait type CloudEvent is received by the callback-workflow-service, the workflow moves to the next state and ends successfully. The following figure shows the serverless-workflow-callback-events-over-http-quarkus image:

openapi callback

To use the OpenAPI callback in a workflow, the OpenAPI YAML file is configured with the callback as specified in the OpenAPI file.

To use the Callback state in a workflow, first CloudEvent type wait is declared that the workflow uses. Following is an example of CloudEvents declaration in a workflow definition:

Example of CloudEvent declaration in a workflow definition
 "events": [
    {
      "name": "waitEvent",
      "source": "",
      "type": "wait"
    }

After that, a Callback state is declared, which waits for a CloudEvent with the wait type. Following is an example of declaring a Callback state that handles the wait type CloudEvent:

Example of a Callback State declaration handling the wait CloudEvent
 {
      "name": "waitForEvent",
      "type": "callback",
      "action":
        {
        "functionRef": {
          "refName": "callBack",
          "arguments": {
            "uri": "http://localhost:8080/wait",
            "processInstanceId": "$WORKFLOW.instanceId"
          }
        }
      },
      "eventRef": "waitEvent",
      "transition": "finish"
    }

Please refer configure openapi service endpoints document to set the URL dynamically using an environment variable.

An event listener publishes a new wait type CloudEvent. Following is an example of a Java method that publishes the wait type CloudEvent:

Example of a Java method that makes a call to Callback URL and publishes the wait CloudEvent
    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    public void wait(EventInput eventInput) throws JsonProcessingException {
        logger.info("About to generate event for {}",eventInput);
        CloudEventBuilder builder = CloudEventBuilder.v1()
                .withId(UUID.randomUUID().toString())
                .withSource(URI.create(""))
                .withType("wait")
                .withTime(OffsetDateTime.now())
                .withExtension(CloudEventExtensionConstants.PROCESS_REFERENCE_ID, eventInput.getProcessInstanceId())
                .withData(objectMapper.writeValueAsBytes(Collections.singletonMap("message", "New Event")));

        webClient.postAbs(eventInput.getUri()).sendJson(builder.build()).toCompletionStage();
    }

The callback-workflow-service consumes the CloudEvent, it contains an attribute named kogitoprocrefid, which holds the instance ID of the workflow.

The kogitoprocrefid attribute is crucial because when the correlation is not used, then this attribute is the only way for the Callback state to identify that the related CloudEvent needs to be used to resume the workflow. For more information about correlation, see Event correlation in SonataFlow.

Note that each workflow is identified by a unique instance ID, which is automatically included in any published CloudEvent, as kogitoprocinstanceid CloudEvent extension.

HTTP transport configuration

The serverless-workflow-callback-events-over-http-quarkus example application consumes the Cloudevents using Knative Eventing. For more information about incoming and outgoing CloudEvents oer HTTP, see Consuming and Producing CloudEvents over HTTP.

The HTTP path where the workflow application will listen for the CloudEvents in the serverless-workflow-callback-events-over-http-quarkus example application, is configured in the application.properties file as shown below:

mp.messaging.incoming.wait.connector=quarkus-http
mp.messaging.incoming.wait.path=/wait

Found an issue?

If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!