Custom functions for your SonataFlow service
The Cloud Native Computing Foundation (CNCF) specification supports the custom
function type, which enables the implementations to extend the function definition capability.
SonataFlow supports several custom types, which are listed below.
Custom functions might not be portable across other implementations. |
Sysout custom function
You can use the sysout
function for logging as shown in the following example:
sysout
function definition{
"functions": [
{
"name": "logInfo",
"type": "custom",
"operation": "sysout:INFO"
}
]
}
The string after the :
is optional and is used to indicate the log level. The possible values are TRACE, DEBUG, INFO, WARN, and ERROR. If not present, INFO is considered by default.
In the state
definition, you can call the same sysout
function as shown in the following example:
sysout
function reference within a state
{
"states": [
{
"name": "myState",
"type": "operation",
"actions": [
{
"name": "printAction",
"functionRef": {
"refName": "logInfo",
"arguments": {
"message": "\"Workflow model is \\(.)\""
}
}
}
]
}
]
}
message
argument can be a jq expression or a jq string using interpolation, as in the example above
Java custom function
SonataFlow supports the java
functions within an Apache Maven project, in which you define your workflow service.
Function Definition
The following example shows the declaration of a java
function:
java
function declaration{
"functions": [
{
"name": "myFunction", (1)
"type": "custom", (2)
"operation": "service:java:com.acme.MyInterfaceOrClass::myMethod" (3)
}
]
}
1 | myFunction is the function name |
2 | custom is the function type |
3 | service:java:com.acme.MyInterfaceOrClass::myMethod is the custom operation definition. In the custom operation definition, service is the reserved operation keyword followed by the java keyword. com.acme.MyInterfaceOrClass is the FQCN (Fully Qualified Class Name) of the interface or implementation class followed by the method name (myMethod ). |
Function Arguments
Your method interface signature must copy the arguments passed by the workflow.
For example, if you invoke a function using one argument as follows, then your method signature assumes that the number
model variable is an integer:
java
function reference with one argument{
"functionRef": {
"refName": "myFunction",
"arguments": {
"number": "${.number}"
}
}
}
java
function implementationpublic class MyInterfaceOrClass {
public void myMethod(int number) {
if (number % 2 != 0) {
throw new IllegalArgumentException("Odd situation");
}
}
}
As a particular case, if you provide no argument in the workflow definition, the signature of the Java method might include a Jackson’s JsonNode
parameter. This means that the Java method expects the entire workflow model as input.
When using the following example function reference with no arguments, and if the method signature contains a JsonNode
parameter, the entire workflow model is passed when the method call is performed.
java
function reference with no arguments{
"functionRef": {
"refName": "myFunction"
}
}
java
function implementationpublic class MyInterfaceOrClass {
public JsonNode myMethod(JsonNode workflowData) {
// do whatever I want with the Workflow model
......
// return the modified content:
return workflowData;
}
}
Function return values
If your method returns a JsonNode
, the content of that node is merged into the workflow model (you can use an action data filter to control what is merged).
The same occurs if your method returns any Java Object
descendant that is not a primitive wrapper, the Java object is recursively converted to a JSON object and the result is merged into the workflow model (you can use an action data filter to control what is merged).
If your method returns a primitive type or their corresponding wrapper object (int, boolean, long, and so on), then the primitive value is added to the workflow model with the name response
(you can change that name using an action data filter).
If your method returns Java collections, it is converted to a JSON array and added to the workflow model with the name response
(you can change that name using an action data filter).
Function accessing contextual data
If you need access to process contextual information (for example, Kogito process instance ID) inside your Java service, you can add a KogitoProcessContext
parameter as the last one in the method signature.
Therefore, if you need to do so, you can update the signature of methods from previous sections.
public class MyInterfaceOrClass {
public JsonNode myMethod(JsonNode workflowData, KogitoProcessContext context ) {
// do whatever I want with the JsonNode and the Kogito process context
......
// return the modified content:
return workflowData;
}
}
public class MyInterfaceOrClass {
public void myMethod(int number, KogitoProcessContext context) {
if (number % 2 != 0) {
throw new IllegalArgumentException("Odd situation");
}
}
}
Avoid using |
Camel custom function
Kogito supports the Camel Routes functions within an Apache Maven project, in which you define your workflow service.
This section briefly exemplifies how to define and use Camel Routes within your workflow application. For more information, see the Integrating with Camel Routes. |
Function definition
The following example shows the declaration of a Camel function:
{
"functions": [
{
"name": "myCamelEndpoint", (1)
"type": "custom", (2)
"operation": "camel:direct:myendpoint" (3)
}
]
}
1 | myCamelEndpoint is the function name |
2 | custom is the function type |
3 | camel:direct:myendpoint is the custom operation definition. In this definition, camel is the reserved keyword followed by the direct endpoint. Camel Direct is the only supported consumer by SonataFlow. Finally, myendpoint is the endpoint URI name found in the route within your project’s context. |
Function arguments
The Camel function arguments must follow a specific structure when passing data from the workflow state to the route. The body
and headers
are the only attributes supported in the function arguments. Both are optional if you need to call the Camel routes without arguments. The only constraint is that you can not call a Camel route with only headers
.
The following examples display valid argument structures for Camel function arguments:
body
and headers
{
"functionRef": {
"refName": "myCamelEndpoint",
"arguments": {
"body": "${ .my.body.data }", (1)
"headers": { (2)
"key1": "${ .my.value }",
"key2": "${ .my.other.value }"
}
}
}
}
1 | jq expression filtering the state data for the body argument. |
2 | JSON key/value pair for the headers argument. A jq expression returning the same JSON object is also valid. |
body
{
"functionRef": {
"refName": "myCamelEndpoint",
"arguments": {
"body": "${ .my.body.data }"
}
}
}
{
"functionRef": {
"refName": "myCamelEndpoint"
}
}
{
"functionRef": {
"refName": "myCamelEndpoint",
"arguments": "${ .my.body.data }"
}
}
In this last example, the jq
expression result is used as the body
argument in a way to simplify the definition.
Function return values
The Camel route is responsible to produce the return value in a way that the workflow can understand. The following are considered valid objects:
-
A string that contains a valid JSON object
-
A valid Java bean that can be serialized to JSON
-
A Jackson’s
JsonNode
object -
Any primitive type (Integer, Float, Decimal, String, etc)
Python custom function
SonataFlow implements a custom function to execute embedded Python scripts and functions. See Invoking Python from SonataFlow
Knative custom function
SonataFlow provides an implementation of a custom function through the knative-serving
add-on to invoke Knative services. It allows you to have a static URI, defining a Knative service, which is used to perform HTTP requests. The Knative service defined in the URI is queried in the current Knative cluster and translated to a valid URL.
This section briefly exemplifies how to define and use Knative custom functions within your workflow application. For more information, see Invoking Knative services from Serverless Workflow. |
Function definition
Given the following deployed Knative service:
$ kn service list
NAME URL LATEST AGE CONDITIONS READY REASON
custom-function-knative-service http://custom-function-knative-service.default.10.109.169.193.sslip.io custom-function-knative-service-00001 3h16m 3 OK / 3 True
You can declare a SonataFlow custom function using the Knative service name, like the following:
"functions": [
{
"name": "greet", (1)
"type": "custom", (2)
"operation": "knative:services.v1.serving.knative.dev/custom-function-knative-service?path=/plainJsonFunction", (3)
}
]
1 | greet is the function name |
2 | custom is the function type |
3 | In operation you set the coordinates of the Knative service. See Kubernetes Service Discovery for supported Scheme and Kubernetes GVK |
The above function will send a POST
request to the http://custom-function-knative-service.default.10.109.169.193.sslip.io/plainJsonFunction URL. If you don’t specify a path, SonataFlow will use the root path (/).
You can also send |
GET
request "functions": [
{
"name": "greet",
"type": "custom",
"operation": "knative:services.v1.serving.knative.dev/custom-function-knative-service?path=/plainJsonFunction&method=GET", (1)
}
]
1 | GET request |
About namespaces
Note that in the above example, you declared only the name of the service you wanted to call, but not a namespace. In this case, SonataFlow will look for a Knative service in the same namespace the workflow service is running.
In case you need to call a Knative service in a different namespace, you can declare the function as:
"functions": [
{
"name": "greet",
"type": "custom",
"operation": "knative:services.v1.serving.knative.dev/my_different_namespace/custom-function-knative-service?path=/plainJsonFunction"
}
]
In the above example, SonataFlow will look for the custom-function-knative-service
in the my_different_namespace
namespace.
Function arguments
In case you need to send a payload in the request, you can add it to arguments
in functionRef
.
Sending a regular JSON object
You must send the following JSON object as the payload:
{
"product_id": ".product_id",
"customer_name": ".customer_name"
}
You must declare a functionRef
like the following.
"states": [
{
"name": "invokeFunction",
"type": "operation",
"actions": [
{
"functionRef": {
"refName": "greet",
"arguments": { (1)
"product_id": ".product_id",
"customer_name": ".customer_name"
}
}
}
],
"end": true
}
]
1 | The request payload is set in arguments . |
Sending a CloudEvent
By default, SonataFlow sends the payload of a Knative function as a regular JSON object and Content-Type
as application/json
. However, you can tell SonataFlow to send the payload as a CloudEvent. In that case, SonataFlow will check if the CloudEvent has all mandatory attributes set and use application/cloudevents+json; charset=UTF-8
in Content-Type
.
To tell SonataFlow you want to send the payload as a CloudEvent, you must define your function as follows:
"functions": [
{
"name": "greet",
"type": "custom",
"operation": "knative:services.v1.serving.knative.dev/custom-function-knative-service?path=/plainJsonFunction&asCloudEvent=true" (1)
}
]
1 | Tells SonataFlow to send the payload as a CloudEvent. Default is false |
If you do not set the |
You must send the following CloudEvent as the payload:
{
"specversion" : "1.0",
"type" : "com.github.pull_request.opened",
"source" : "https://github.com/cloudevents/spec/pull",
"subject" : "123",
"time" : "2018-04-05T17:31:00Z",
"comexampleextension1" : "value",
"comexampleothervalue" : 5,
"datacontenttype" : "text/xml",
"data" : "<much wow=\"xml\"/>"
}
You must declare a functionRef
like the following: (Do not forget to set asCloudEvent
as true
in function metadata)
"states": [
{
"name": "invokeFunction",
"type": "operation",
"actions": [
{
"functionRef": {
"refName": "greet",
"arguments": { (1)
"specversion" : "1.0",
"type" : "com.github.pull_request.opened",
"source" : "https://github.com/cloudevents/spec/pull",
"subject" : "123",
"time" : "2018-04-05T17:31:00Z",
"comexampleextension1" : "value",
"comexampleothervalue" : 5,
"datacontenttype" : "text/xml",
"data" : "<much wow=\"xml\"/>"
}
}
}
],
"end": true
}
]
1 | The CloudEvent is set in arguments . |
SonataFlow generates a CloudEvent ID based on the |
"arguments": {
"specversion" : "1.0",
"id": "a_unique_id_42", (1)
"type" : "com.github.pull_request.opened",
"source" : "https://github.com/cloudevents/spec/pull",
"subject" : "123",
"time" : "2018-04-05T17:31:00Z",
"comexampleextension1" : "value",
"comexampleothervalue" : 5,
"datacontenttype" : "text/xml",
"data" : "<much wow=\"xml\"/>"
}
1 | The CloudEvent ID. |
Configurations
Request timeout
By default, the Knative service must respond within 10 seconds. You can use the kogito.sw.functions.<function_name>.timeout
property to configure this value.
For instance, if you want to reduce the request timeout to 5 seconds, you must add the following to your application.properties
file:
kogito.sw.functions.greet.timeout=5000 (1)
1 | Time in milliseconds |
REST custom function
Serverless Workflow Specification defines the OpenAPI function type, which is the preferred way to interact with existing REST servers. However, sometimes a workflow should interact with several REST APIs that are not described using an OpenAPI specification file. Since generating such files for these services might be tedious, SonataFlow offers REST custom type as a shortcut.
When using custom rest, in the function definition, you specify the HTTP URI to be invoked and the HTTP method (get, post, patch, or put) to be used, using the operation
string. When the function is invoked, you pass the request arguments as you do when using an OpenAPI function.
The following example shows the declaration of a rest
function:
rest
function declaration{
"functions": [
{
"name": "multiplyAllByAndSum", (1)
"type": "custom", (2)
"operation": "rest:post:/numbers/{multiplier}/multiplyByAndSum" (3)
}
]
}
1 | multiplyAllAndSum is the function name |
2 | custom is the function type |
3 | rest:post:/numbers/{multiplier}/multiplyByAndSum is the custom operation definition. In the custom operation definition:
|
When using the relative endpoints you must specify the host as a property. The format of the host property is kogito.sw.functions.<function_name>.host
. Therefore, in this example, kogito.sw.functions.multiplyAllByAndSum.host
is the host property key. You might override the default port (80) if needed by specifying kogito.sw.functions.multiplyAllAndSum.port
property.
This particular endpoint expects as body a JSON object whose field numbers
is an array of integers, multiplies each item in the array by multiplier
and returns the sum of all the multiplied items. Therefore, to invoke it, assuming the input array is stored in the workflow model as property inputNumbers
, you should write:
rest
function call{
"functionRef": {
"refName": "multiplyAllByAndSum",
"arguments": {
"numbers": "$.inputNumbers"
"multiplier": 3 (1)
}
}
}
1 | you replace path and query parameters by specifying an argument which name is equal to the path or query parameter. The query or path parameter to be replaced should be enclosed within {} in the endpoint string. |
If inputNumbers
contains 1
, 2
, and 3
, the output of the call will be `1*3+2*3+3*3=18.
In case you want to specify headers in your HTTP request, you might do it by adding arguments starting with the HEADER_
prefix. Therefore, if you add "HEADER_ce_id": "123"
to the previous argument set, you will be adding a header named ce_id
with the value 123
to your request. A similar approach might be used to add query params to a GET request, in that case, you must add arguments starting with the QUERY_
prefix. Note that you can also use {} notation for replacements of query parameters included directly in the operation
string.
For example, given the following function definition that performs a get
request
get
request definition{ "functions": [ { "name": "getProductList", "type": "custom", "operation": "rest:get:/products/search?category={category}" } ] }
You can replace {category}
specifying an argument with that name, plus adding sort
query parameter
get
request invocation{ "functionRef": { "refName": "getProductList", "arguments": { "category": "electronics", "QUERY_sort": "asc" } } }
Additional custom function types
You can add your custom types by using the Kogito add-on mechanism. As predefined custom types like sysout
or java
, the custom type identifier is the prefix of the operation field of the function definition.
Kogito add-ons relies on the Quarkus extensions mechanism. And the add-on consists of at least two Maven projects:
-
The deployment module, which is responsible for generating the code required for the extension to work.
-
The runtime module, which includes the non-generated classes that are required for the extension to work.
In the case of a Serverless Workflow custom type, following are the roles of the modules:
-
The deployment project
The deployment project is expected to configure the work item handler used during runtime to perform the logic associated with the custom type. It must contain a Java class that inherits from
WorkItemTypeHandler
. Its responsibilities are to indicate the custom type identifier (the operation prefix, as indicated earlier) and to set up theWorkItemNodeFactory
instance passed as a parameter of thefillWorkItemHandler
method. That instance is included in the Kogito process definition for that Workflow. As a part of this setup, you must indicate the name of theWorkItemNodeFactory
. You might also provide any relevant metadata for that handler if needed. -
The runtime project
The runtime project consists of a
WorkflowWorkItemHandler
implementation, which name must match with the one provided toWorkItemNodeFactory
during the deployment phase, and aWorkItemHandlerConfig
bean that registers that handler with that name.When a Serverless Workflow function is called, Kogito identifies the proper
WorkflowWorkItemHandler
instance to be used for that function type (using the handler name associated with that type by the deployment project) and then invokes theinternalExecute
method. TheMap
parameter contains the function arguments defined in the workflow, and theWorkItem
parameter contains the metadata information added to the handler by the deployment project. Hence, theexecuteWorkItem
implementation has access to all the information needed to perform the computational logic intended for that custom type.
Custom function type example
Assuming you want to interact, from a workflow file, with a legacy RPC server as the one defined in this project. This legacy server supports four simple arithmetic operations: add, minus, multiply and divide, which can be invoked using a custom RPC protocol.
Since this is an uncommon protocol, the workflow cannot handle them by using any of the predefined Serverless Workflow function types. The available options are to use a Java service, which invokes a Java class that knows how to interact with the server, or define a custom type that knows how to interact with the service.
Using the recent approach, you can write a workflow file defining this function.
"functions": [
{
"name": "division",
"type": "custom",
"operation": "rpc:division"
}
],
The operation
starts with rpc
, which is the custom type identifier, and continues with division
, which denotes the operation that will be executed in the legacy server.
A Kogito addon that defines the rpc
custom type must be developed for this function definition to be identified. It is consist of a deployment project and a runtime project.
The deployment project is responsible for extending the WorkItemTypeHandler
and setup of the WorkItemNodeFactory
as follows:
import static org.kie.kogito.examples.sw.custom.RPCCustomWorkItemHandler.NAME;
import static org.kie.kogito.examples.sw.custom.RPCCustomWorkItemHandler.OPERATION;
public class RPCCustomTypeHandler extends WorkItemTypeHandler{
@Override
public String type() {
return "rpc";
}
@Override
protected <T extends RuleFlowNodeContainerFactory<T, ?>> WorkItemNodeFactory<T> fillWorkItemHandler(Workflow workflow,
ParserContext context,
WorkItemNodeFactory<T> node,
FunctionDefinition functionDef) {
return node.workName(NAME).metaData(OPERATION, trimCustomOperation(functionDef));
}
}
This example setups the name of the KogitoWorkItemHandler
, adds a metadata key with the name of the remote operation (extracted from the Serverless Workflow function definition operation property), and declares that the custom type is named as rpc
.
The Runtime project contains the KogitoWorkItemHandler and the WorkItemHandlerConfig implementations.
As expected, RPCCustomWorkItemHandler
implements the internalExecute
method as follows:
internalExecute
method @Override
protected Object internalExecute(KogitoWorkItem workItem, Map<String, Object> parameters) {
try {
Iterator<?> iter = parameters.values().iterator();
Map<String, Object> metadata = workItem.getNodeInstance().getNode().getMetaData();
String operationId = (String) metadata.get(OPERATION);
if (operationId == null) {
throw new IllegalArgumentException ("Operation is a mandatory parameter");
}
return CalculatorClient.invokeOperation((String)metadata.getOrDefault(HOST,"localhost"), (int) metadata.getOrDefault(PORT, 8082),
OperationId.valueOf(operationId.toUpperCase()), (Integer)iter.next(), (Integer)iter.next());
} catch (IOException io ) {
throw new UncheckedIOException(io);
}
}
The implementation invokes the CalculatorClient.invokeOperation
, a java static method that knows how to interact with the legacy service. You can obtain the operation parameter from the WorkItem
metadata. The dividend and the divisor parameters are obtained from the Map parameter, which contains the function arguments defined in the workflow file.
{
"actions":[
{
"functionRef":{
"refName":"division",
"arguments":{
"dividend":".dividend",
"divisor":".divisor"
}
}
}
]
}
The RPCCustomWorkItemHandlerConfig
is a bean that registers the handler name.
@Inject
RPCCustomWorkItemHandler handler;
@PostConstruct
void init () {
register(handler.getName(),handler);
}
Found an issue?
If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!