Custom functions for your SonataFlow service

The Cloud Native Computing Foundation (CNCF) specification supports the custom function type, which enables the implementations to extend the function definition capability.

SonataFlow supports several custom types, which are listed below.

Custom functions might not be portable across other implementations.

Sysout custom function

You can use the sysout function for logging as shown in the following example:

Example of sysout function definition
{
  "functions": [
    {
      "name": "logInfo",
      "type": "custom",
      "operation": "sysout:INFO"
    }
  ]
}

The string after the : is optional and is used to indicate the log level. The possible values are TRACE, DEBUG, INFO, WARN, and ERROR. If not present, INFO is considered by default.

In the state definition, you can call the same sysout function as shown in the following example:

Example of a sysout function reference within a state
{
  "states": [
    {
      "name": "myState",
      "type": "operation",
      "actions": [
        {
          "name": "printAction",
          "functionRef": {
            "refName": "logInfo",
            "arguments": {
              "message": "\"Workflow model is \\(.)\""
            }
          }
        }
      ]
    }
  ]
}

message argument can be a jq expression or a jq string using interpolation, as in the example above

Java custom function

SonataFlow supports the java functions within an Apache Maven project, in which you define your workflow service.

Function Definition

The following example shows the declaration of a java function:

Example of a java function declaration
{
  "functions": [
    {
      "name": "myFunction", (1)
      "type": "custom", (2)
      "operation": "service:java:com.acme.MyInterfaceOrClass::myMethod" (3)
    }
  ]
}
1 myFunction is the function name
2 custom is the function type
3 service:java:com.acme.MyInterfaceOrClass::myMethod is the custom operation definition. In the custom operation definition, service is the reserved operation keyword followed by the java keyword. com.acme.MyInterfaceOrClass is the FQCN (Fully Qualified Class Name) of the interface or implementation class followed by the method name (myMethod).

Function Arguments

Your method interface signature must copy the arguments passed by the workflow.

For example, if you invoke a function using one argument as follows, then your method signature assumes that the number model variable is an integer:

Example of a java function reference with one argument
{
 "functionRef": {
   "refName": "myFunction",
   "arguments": {
     "number": "${.number}"
   }
 }
}
Example of a java function implementation
public class MyInterfaceOrClass {

  public void myMethod(int number) {
        if (number % 2 != 0) {
            throw new IllegalArgumentException("Odd situation");
        }
    }
}

As a particular case, if you provide no argument in the workflow definition, the signature of the Java method might include a Jackson’s JsonNode parameter. This means that the Java method expects the entire workflow model as input.

When using the following example function reference with no arguments, and if the method signature contains a JsonNode parameter, the entire workflow model is passed when the method call is performed.

Example of a java function reference with no arguments
{
  "functionRef": {
    "refName": "myFunction"
   }
}
Example of a java function implementation
public class MyInterfaceOrClass {

    public JsonNode myMethod(JsonNode workflowData) {
        // do whatever I want with the Workflow model
        ......
        // return the modified content:
        return workflowData;
    }
}

Function return values

If your method returns a JsonNode, the content of that node is merged into the workflow model (you can use an action data filter to control what is merged).

The same occurs if your method returns any Java Object descendant that is not a primitive wrapper, the Java object is recursively converted to a JSON object and the result is merged into the workflow model (you can use an action data filter to control what is merged).

If your method returns a primitive type or their corresponding wrapper object (int, boolean, long, and so on), then the primitive value is added to the workflow model with the name response (you can change that name using an action data filter).

If your method returns Java collections, it is converted to a JSON array and added to the workflow model with the name response (you can change that name using an action data filter).

Function accessing contextual data

If you need access to process contextual information (for example, Kogito process instance ID) inside your Java service, you can add a KogitoProcessContext parameter as the last one in the method signature.

Therefore, if you need to do so, you can update the signature of methods from previous sections.

Example of a function accessing Kogito context
public class MyInterfaceOrClass {
public JsonNode myMethod(JsonNode workflowData, KogitoProcessContext context ) {
        // do whatever I want with the JsonNode and the Kogito process context
        ......
        // return the modified content:
        return workflowData;
    }
}
Example of a function accessing Kogito context
public class MyInterfaceOrClass {


    public void myMethod(int number, KogitoProcessContext context) {
        if (number % 2 != 0) {
            throw new IllegalArgumentException("Odd situation");
        }
    }
}

Avoid using java functions to call the external services, instead, you can use the services orchestration features.

Camel custom function

Kogito supports the Camel Routes functions within an Apache Maven project, in which you define your workflow service.

This section briefly exemplifies how to define and use Camel Routes within your workflow application. For more information, see the Integrating with Camel Routes.

Function definition

The following example shows the declaration of a Camel function:

{
  "functions": [
    {
      "name": "myCamelEndpoint", (1)
      "type": "custom",  (2)
      "operation": "camel:direct:myendpoint" (3)
    }
  ]
}
1 myCamelEndpoint is the function name
2 custom is the function type
3 camel:direct:myendpoint is the custom operation definition. In this definition, camel is the reserved keyword followed by the direct endpoint. Camel Direct is the only supported consumer by SonataFlow. Finally, myendpoint is the endpoint URI name found in the route within your project’s context.

Function arguments

The Camel function arguments must follow a specific structure when passing data from the workflow state to the route. The body and headers are the only attributes supported in the function arguments. Both are optional if you need to call the Camel routes without arguments. The only constraint is that you can not call a Camel route with only headers.

The following examples display valid argument structures for Camel function arguments:

Example calling a Camel route using body and headers
{
  "functionRef": {
    "refName": "myCamelEndpoint",
    "arguments": {
      "body": "${ .my.body.data }", (1)
      "headers": { (2)
        "key1": "${ .my.value }",
        "key2": "${ .my.other.value }"
      }
    }
  }
}
1 jq expression filtering the state data for the body argument.
2 JSON key/value pair for the headers argument. A jq expression returning the same JSON object is also valid.
Example calling a Camel route using body
{
  "functionRef": {
    "refName": "myCamelEndpoint",
    "arguments": {
      "body": "${ .my.body.data }"
    }
  }
}
Example calling a Camel route without arguments
{
  "functionRef": {
    "refName": "myCamelEndpoint"
  }
}
Example calling a Camel route with only one argument
{
  "functionRef": {
    "refName": "myCamelEndpoint",
    "arguments": "${ .my.body.data }"
  }
}

In this last example, the jq expression result is used as the body argument in a way to simplify the definition.

Function return values

The Camel route is responsible to produce the return value in a way that the workflow can understand. The following are considered valid objects:

  1. A string that contains a valid JSON object

  2. A valid Java bean that can be serialized to JSON

  3. A Jackson’s JsonNode object

  4. Any primitive type (Integer, Float, Decimal, String, etc)

Python custom function

SonataFlow implements a custom function to execute embedded Python scripts and functions. See Invoking Python from SonataFlow

Knative custom function

SonataFlow provides an implementation of a custom function through the knative-serving add-on to invoke Knative services. It allows you to have a static URI, defining a Knative service, which is used to perform HTTP requests. The Knative service defined in the URI is queried in the current Knative cluster and translated to a valid URL.

This section briefly exemplifies how to define and use Knative custom functions within your workflow application. For more information, see Invoking Knative services from Serverless Workflow.

Function definition

Given the following deployed Knative service:

$ kn service list
NAME                              URL                                                                      LATEST                                  AGE     CONDITIONS   READY   REASON
custom-function-knative-service   http://custom-function-knative-service.default.10.109.169.193.sslip.io   custom-function-knative-service-00001   3h16m   3 OK / 3     True

You can declare a SonataFlow custom function using the Knative service name, like the following:

  "functions": [
    {
      "name": "greet", (1)
      "type": "custom", (2)
      "operation": "knative:services.v1.serving.knative.dev/custom-function-knative-service?path=/plainJsonFunction", (3)
    }
  ]
1 greet is the function name
2 custom is the function type
3 In operation you set the coordinates of the Knative service. See Kubernetes Service Discovery for supported Scheme and Kubernetes GVK

The above function will send a POST request to the http://custom-function-knative-service.default.10.109.169.193.sslip.io/plainJsonFunction URL. If you don’t specify a path, SonataFlow will use the root path (/).

You can also send GET requests by setting method=GET in operation. In this case, the arguments are forwarded over a query string.

Example of a GET request
  "functions": [
    {
      "name": "greet",
      "type": "custom",
      "operation": "knative:services.v1.serving.knative.dev/custom-function-knative-service?path=/plainJsonFunction&method=GET", (1)
    }
  ]
1 GET request

About namespaces

Note that in the above example, you declared only the name of the service you wanted to call, but not a namespace. In this case, SonataFlow will look for a Knative service in the same namespace the workflow service is running.

In case you need to call a Knative service in a different namespace, you can declare the function as:

  "functions": [
    {
      "name": "greet",
      "type": "custom",
      "operation": "knative:services.v1.serving.knative.dev/my_different_namespace/custom-function-knative-service?path=/plainJsonFunction"
    }
  ]

In the above example, SonataFlow will look for the custom-function-knative-service in the my_different_namespace namespace.

Function arguments

In case you need to send a payload in the request, you can add it to arguments in functionRef.

Sending a regular JSON object

You must send the following JSON object as the payload:

{
  "product_id": ".product_id",
  "customer_name": ".customer_name"
}

You must declare a functionRef like the following.

  "states": [
    {
      "name": "invokeFunction",
      "type": "operation",
      "actions": [
        {
          "functionRef": {
            "refName": "greet",
            "arguments": { (1)
              "product_id": ".product_id",
              "customer_name": ".customer_name"
            }
          }
        }
      ],
      "end": true
    }
  ]
1 The request payload is set in arguments.

Sending a CloudEvent

By default, SonataFlow sends the payload of a Knative function as a regular JSON object and Content-Type as application/json. However, you can tell SonataFlow to send the payload as a CloudEvent. In that case, SonataFlow will check if the CloudEvent has all mandatory attributes set and use application/cloudevents+json; charset=UTF-8 in Content-Type.

To tell SonataFlow you want to send the payload as a CloudEvent, you must define your function as follows:

  "functions": [
    {
      "name": "greet",
      "type": "custom",
      "operation": "knative:services.v1.serving.knative.dev/custom-function-knative-service?path=/plainJsonFunction&asCloudEvent=true" (1)
    }
  ]
1 Tells SonataFlow to send the payload as a CloudEvent. Default is false

If you do not set the asCloudEvent attribute to true, you can not send a CloudEvent. If you do so, SonataFlow will throw an error.

You must send the following CloudEvent as the payload:

{
  "specversion" : "1.0",
  "type" : "com.github.pull_request.opened",
  "source" : "https://github.com/cloudevents/spec/pull",
  "subject" : "123",
  "time" : "2018-04-05T17:31:00Z",
  "comexampleextension1" : "value",
  "comexampleothervalue" : 5,
  "datacontenttype" : "text/xml",
  "data" : "<much wow=\"xml\"/>"
}

You must declare a functionRef like the following: (Do not forget to set asCloudEvent as true in function metadata)

  "states": [
    {
      "name": "invokeFunction",
      "type": "operation",
      "actions": [
        {
          "functionRef": {
            "refName": "greet",
            "arguments": { (1)
                "specversion" : "1.0",
                "type" : "com.github.pull_request.opened",
                "source" : "https://github.com/cloudevents/spec/pull",
                "subject" : "123",
                "time" : "2018-04-05T17:31:00Z",
                "comexampleextension1" : "value",
                "comexampleothervalue" : 5,
                "datacontenttype" : "text/xml",
                "data" : "<much wow=\"xml\"/>"
            }
          }
        }
      ],
      "end": true
    }
  ]
1 The CloudEvent is set in arguments.

SonataFlow generates a CloudEvent ID based on the source and the workflow instance ID. In case you decide to set an ID, SonataFlow will use it and you must ensure it’s unique. Refer to the following example on how to set a CloudEvent ID:

Setting a CloudEvent ID
"arguments": {
    "specversion" : "1.0",
    "id": "a_unique_id_42", (1)
    "type" : "com.github.pull_request.opened",
    "source" : "https://github.com/cloudevents/spec/pull",
    "subject" : "123",
    "time" : "2018-04-05T17:31:00Z",
    "comexampleextension1" : "value",
    "comexampleothervalue" : 5,
    "datacontenttype" : "text/xml",
    "data" : "<much wow=\"xml\"/>"
}
1 The CloudEvent ID.

Configurations

Request timeout

By default, the Knative service must respond within 10 seconds. You can use the kogito.sw.functions.<function_name>.timeout property to configure this value.

For instance, if you want to reduce the request timeout to 5 seconds, you must add the following to your application.properties file:

kogito.sw.functions.greet.timeout=5000 (1)
1 Time in milliseconds

REST custom function

Serverless Workflow Specification defines the OpenAPI function type, which is the preferred way to interact with existing REST servers. However, sometimes a workflow should interact with several REST APIs that are not described using an OpenAPI specification file. Since generating such files for these services might be tedious, SonataFlow offers REST custom type as a shortcut.

When using custom rest, in the function definition, you specify the HTTP URI to be invoked and the HTTP method (get, post, patch, or put) to be used, using the operation string. When the function is invoked, you pass the request arguments as you do when using an OpenAPI function.

The following example shows the declaration of a rest function:

Example of a rest function declaration
{
  "functions": [
    {
      "name": "multiplyAllByAndSum", (1)
      "type": "custom", (2)
      "operation": "rest:post:/numbers/{multiplier}/multiplyByAndSum" (3)
    }
  ]
}
1 multiplyAllAndSum is the function name
2 custom is the function type
3 rest:post:/numbers/{multiplier}/multiplyByAndSum is the custom operation definition. In the custom operation definition:
  • rest is the reserved operation keyword that indicates this is a REST call.

  • post is the HTTP method.

  • /numbers/{multiplier}/multiplyByAndSum is the relative endpoint.

When using the relative endpoints you must specify the host as a property. The format of the host property is kogito.sw.functions.<function_name>.host. Therefore, in this example, kogito.sw.functions.multiplyAllByAndSum.host is the host property key. You might override the default port (80) if needed by specifying kogito.sw.functions.multiplyAllAndSum.port property.

This particular endpoint expects as body a JSON object whose field numbers is an array of integers, multiplies each item in the array by multiplier and returns the sum of all the multiplied items. Therefore, to invoke it, assuming the input array is stored in the workflow model as property inputNumbers, you should write:

Example of a rest function call
{
 "functionRef": {
   "refName": "multiplyAllByAndSum",
   "arguments": {
     "numbers": "$.inputNumbers"
     "multiplier": 3 (1)
   }
 }
}
1 you replace path and query parameters by specifying an argument which name is equal to the path or query parameter. The query or path parameter to be replaced should be enclosed within {} in the endpoint string.

If inputNumbers contains 1, 2, and 3, the output of the call will be `1*3+2*3+3*3=18.

In case you want to specify headers in your HTTP request, you might do it by adding arguments starting with the HEADER_ prefix. Therefore, if you add "HEADER_ce_id": "123" to the previous argument set, you will be adding a header named ce_id with the value 123 to your request. A similar approach might be used to add query params to a GET request, in that case, you must add arguments starting with the QUERY_ prefix. Note that you can also use {} notation for replacements of query parameters included directly in the operation string.

For example, given the following function definition that performs a get request

Example of a get request definition
{
  "functions": [
    {
      "name": "getProductList",
      "type": "custom",
      "operation": "rest:get:/products/search?category={category}"
    }
  ]
}

You can replace {category} specifying an argument with that name, plus adding sort query parameter

Example of a get request invocation
{
 "functionRef": {
   "refName": "getProductList",
   "arguments": {
     "category": "electronics",
     "QUERY_sort": "asc"
   }
 }
}

Additional custom function types

You can add your custom types by using the Kogito add-on mechanism. As predefined custom types like sysout or java, the custom type identifier is the prefix of the operation field of the function definition.

Kogito add-ons relies on the Quarkus extensions mechanism. And the add-on consists of at least two Maven projects:

  • The deployment module, which is responsible for generating the code required for the extension to work.

  • The runtime module, which includes the non-generated classes that are required for the extension to work.

In the case of a Serverless Workflow custom type, following are the roles of the modules:

  • The deployment project

    The deployment project is expected to configure the work item handler used during runtime to perform the logic associated with the custom type. It must contain a Java class that inherits from WorkItemTypeHandler. Its responsibilities are to indicate the custom type identifier (the operation prefix, as indicated earlier) and to set up the WorkItemNodeFactory instance passed as a parameter of the fillWorkItemHandler method. That instance is included in the Kogito process definition for that Workflow. As a part of this setup, you must indicate the name of the WorkItemNodeFactory. You might also provide any relevant metadata for that handler if needed.

  • The runtime project

    The runtime project consists of a WorkflowWorkItemHandler implementation, which name must match with the one provided to WorkItemNodeFactory during the deployment phase, and a WorkItemHandlerConfig bean that registers that handler with that name.

    When a Serverless Workflow function is called, Kogito identifies the proper WorkflowWorkItemHandler instance to be used for that function type (using the handler name associated with that type by the deployment project) and then invokes the internalExecute method. The Map parameter contains the function arguments defined in the workflow, and the WorkItem parameter contains the metadata information added to the handler by the deployment project. Hence, the executeWorkItem implementation has access to all the information needed to perform the computational logic intended for that custom type.

Custom function type example

Assuming you want to interact, from a workflow file, with a legacy RPC server as the one defined in this project. This legacy server supports four simple arithmetic operations: add, minus, multiply and divide, which can be invoked using a custom RPC protocol.

Since this is an uncommon protocol, the workflow cannot handle them by using any of the predefined Serverless Workflow function types. The available options are to use a Java service, which invokes a Java class that knows how to interact with the server, or define a custom type that knows how to interact with the service.

Using the recent approach, you can write a workflow file defining this function.

RPC Custom function definition example
 "functions": [
    {
      "name": "division",
      "type": "custom",
      "operation": "rpc:division"
    }
  ],

The operation starts with rpc, which is the custom type identifier, and continues with division, which denotes the operation that will be executed in the legacy server.

A Kogito addon that defines the rpc custom type must be developed for this function definition to be identified. It is consist of a deployment project and a runtime project.

The deployment project is responsible for extending the WorkItemTypeHandler and setup of the WorkItemNodeFactory as follows:

Example of the RPC function Java implementation
import static org.kie.kogito.examples.sw.custom.RPCCustomWorkItemHandler.NAME;
import static org.kie.kogito.examples.sw.custom.RPCCustomWorkItemHandler.OPERATION;

public class RPCCustomTypeHandler extends WorkItemTypeHandler{


    @Override
    public String type() {
        return "rpc";
    }

    @Override
    protected <T extends RuleFlowNodeContainerFactory<T, ?>> WorkItemNodeFactory<T> fillWorkItemHandler(Workflow workflow,
                                                                                                        ParserContext context,
                                                                                                        WorkItemNodeFactory<T> node,
                                                                                                        FunctionDefinition functionDef) {
        return node.workName(NAME).metaData(OPERATION, trimCustomOperation(functionDef));
    }
}

This example setups the name of the KogitoWorkItemHandler, adds a metadata key with the name of the remote operation (extracted from the Serverless Workflow function definition operation property), and declares that the custom type is named as rpc.

The Runtime project contains the KogitoWorkItemHandler and the WorkItemHandlerConfig implementations.

As expected, RPCCustomWorkItemHandler implements the internalExecute method as follows:

Example of implementation of the internalExecute method
 @Override
protected Object internalExecute(KogitoWorkItem workItem, Map<String, Object> parameters)  {
    try {
        Iterator<?> iter = parameters.values().iterator();
        Map<String, Object> metadata = workItem.getNodeInstance().getNode().getMetaData();
        String operationId = (String) metadata.get(OPERATION);
        if (operationId == null) {
            throw new IllegalArgumentException ("Operation is a mandatory parameter");
        }
        return CalculatorClient.invokeOperation((String)metadata.getOrDefault(HOST,"localhost"), (int) metadata.getOrDefault(PORT, 8082),
                OperationId.valueOf(operationId.toUpperCase()), (Integer)iter.next(), (Integer)iter.next());
    } catch (IOException io ) {
        throw new UncheckedIOException(io);
    }
}

The implementation invokes the CalculatorClient.invokeOperation, a java static method that knows how to interact with the legacy service. You can obtain the operation parameter from the WorkItem metadata. The dividend and the divisor parameters are obtained from the Map parameter, which contains the function arguments defined in the workflow file.

Example of the custom function call from the workflow definition
{
  "actions":[
    {
      "functionRef":{
        "refName":"division",
        "arguments":{
          "dividend":".dividend",
          "divisor":".divisor"
        }
      }
    }
  ]
}

The RPCCustomWorkItemHandlerConfig is a bean that registers the handler name.

Example of injecting the custom`WorkItemHandler`
@Inject
RPCCustomWorkItemHandler handler;

@PostConstruct
void init () {
    register(handler.getName(),handler);
}

Found an issue?

If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!