Skip to content

LF Edge eKuiper

As part of the Edge Xpert distribution, IOTech provides LF Edge eKuiper. This is a lightweight, SQL-based rules engine, which is used for IoT data analytics and streaming application at the edge.

eKuiper Configuration

The default eKuiper environment variables are configured in the docker-compose file as follows:

  EDGEX__DEFAULT__PORT: 1883
  EDGEX__DEFAULT__PROTOCOL: tcp
  EDGEX__DEFAULT__SERVER: mqtt-broker
  EDGEX__DEFAULT__TOPIC: edgex/events/#
  EDGEX__DEFAULT__TYPE: mqtt
  EDGEX__DEFAULT__OPTIONAL__KEEPALIVE: "50"
  EDGEX__DEFAULT__MESSAGETYPE: request
  KUIPER__BASIC__CONSOLELOG: "true"
  KUIPER__BASIC__RESTPORT: 59720
  KUIPER__BASIC__DEBUG: "false"

The parameters are described in the following table:

Field Arguments Description
EDGEX__DEFAULT__PORT <port_number> The port number of the Edge Xpert message bus
Default is 6379
EDGEX__DEFAULT__PROTOCOL <protocol> The protocol to use when connecting to the Edge Xpert message bus
Default is redis
EDGEX__DEFAULT__SERVER <core-data_address> The address of the Core Data service from which to obtain the value descriptors
EDGEX__DEFAULT__TOPIC <topic_name> The topic name of the Edge Xpert message bus to which to subscribe
Default is edgex/events/#
EDGEX__DEFAULT__TYPE <message_bus_type> The message bus type
Three types of message buses are supported: zero, mqtt and redis
Default is mqtt
EDGEX__DEFAULT__OPTIONAL__KEEPALIVE <interval> (in seconds) The interval between client sending keepalive ping to broker
EDGEX__DEFAULT__MESSAGETYPE <message_model_type> The message model type
To publish the message as an event like EdgeX application service, use event. Otherwise, to publish the message as an event request like EdgeX device service or core data service, use request
Default is request
KUIPER__BASIC__CONSOLELOG true (default)
false
Determines whether to print the log to console
KUIPER__BASIC__RESTPORT <port_number> The port number of the eKuiper API server
KUIPER__BASIC__DEBUG true
false (Default)
Determines if the log level is set to debug

If you need to update any of the default values, you must create a local copy of the docker-compose.yml before starting the eKuiper Service, as described in Using Docker Compose. Once you have created a local file, you can update the environments section and start Edge Xpert from the directory containing your local docker-compose file.

eKuiper Overview

LF Edge eKuiper is a lightweight open-source software (Apache 2.0. open-source license agreement) package for IoT edge analytics and stream processing implemented in Golang, which can run on various resource-constrained edge devices. Users can realise fast data processing on the edge and write rules in SQL. The eKuiper rules engine is based on three components Source, SQL and Sink.

  • Source: Source of stream data, such as data from an MQTT server. For Edge Xpert the data source is an EdgeX message bus, which can be implemented by ZeroMQ, Redis Pub/Sub or MQTT;
  • SQL: SQL is where the specified business logic is processed. eKuiper provides SQL statements to extract, filter and transform data;
  • Sink: Used to send the analysis result to a specific target, such as sending the analysis results to Edge Xpert's command service, or an MQTT broker in the cloud;

The relationship between Source, SQL and Sink in eKuiper is shown below.

Kuiper, source sink archtecture diagram

In default Edge Xpert configuration eKuiper analyses data coming from the Edge Xpert message bus. Edge Xpert provides an abstract message bus interface, and implements the ZeroMQ, Redis Pub/Sub and MQTT protocols respectively to support information exchange between different microservices.

In order to process the data from the Edge Xpert message bus we need the following three steps:

  • Create a stream which specifies the data Source
  • Write a rule. This has two parts:
    • The SQL for data analysis
    • The Sink target where the results of the analysis will be sent
  • Deploy and run the rule

eKuiper Example

In this example, we will use the Virtual Device Service and run rules against the data generated by this device service.

Starting the Edge Xpert Services

In this example we need to start the Virtual Device Service and eKuiper. Do this using the command:

edgexpert up device-virtual kuiper

Create a Data Stream

Creation of the data stream is managed in two parts.

  • Using the configuration to set up the IP Address and port of the data source- see eKuiper Configuration

  • Creating a stream via the eKuiper REST API or CLI

Create a Stream Using the REST API

Note: The Edge Xpert eKuiper API uses port 59720 instead of the default port 9081

To create a stream that will consume data from the Edge Xpert message bus use the following command:

curl -X POST \
http://localhost:59720/streams \
-H 'Content-Type: application/json' \
-d '{
    "sql": "create stream demo() WITH (FORMAT=\"JSON\", TYPE=\"edgex\")"
}'

For other REST API's please refer to eKuiper REST API overview

Create a Stream Using eKuiper CLI

To enter the Kuiper docker instance and run the CLI, use the following command:

docker exec -it edgex-kuiper /bin/sh

To create a stream named demo use the following command:

bin/kuiper create stream demo'() WITH (FORMAT="JSON", TYPE="edgex")'
For other command line tools refer to the eKuiper CLI overview

Create a Rule

For this example we will create a rule that will send result data to an MQTT broker. See this link for detailed information about the MQTT Sink

Again, we can create the rule using either the REST API or the CLI.

We are going to create a rule that will get all of the values from the event topic. The sink result will:

  • Publish to the topic result of the built-in MQTT broker
  • Print to a log File

Create a Rule Using the REST API

To create a rule using the REST API use the following command:

curl -X POST \
http://localhost:59720/rules \
-H 'Content-Type: application/json' \
-d '{
"id": "rule1",
"sql": "SELECT * FROM demo",
"actions": [
   {
    "mqtt": {
    "server": "tcp://mqtt-broker:1883",
    "topic": "result",
    "clientId": "demo_001"
    }
  },
   {
    "log":{}
   }
  ]
}'

Create a Rule Using the eKuiper CLI

To create the rule run the command:

docker exec kuiper bin/kuiper create rule rule1 '
{
    "id": "rule1",
    "sql": "SELECT * FROM demo",
    "actions": [
        {
            "mqtt": {
            "server": "tcp://mqtt-broker:1883",
            "topic": "result",
            "clientId": "demo_001"
            }
        },
        {
            "log":{}
        }
    ]
}'

Connecting to 127.0.0.1:20498...
Rule rule1 was created successfully, please use 'bin/kuiper getstatus rule rule1' command to get rule status.

View the Analysis Result

To view the results we can use Eclipse Mosquitto which provides an example MQTT Publisher and Subscriber. You can also use other MQTT Client Tools

To view the results using Eclipse Mosquitto, you can use the mosquitto_sub command as shown below:

mosquitto_sub -t result
[{"Float64":-6.432173e+307}]
[{"Float32":2.815127058905047e+38}]
[{"Int16":-10981}]
[{"Int8":17}]
[{"Int32":-2025653700}]
[{"Int64":-2025046756228926158}]
[{"Bool":false}]
Back to top