LF Edge eKuiper
As part of the Edge Xpert distribution, IOTech provides LF Edge eKuiper. This is a lightweight, SQL-based rules engine, which is used for IoT data analytics and streaming application at the edge.
- For further information on eKuiper configuration, see eKuiper Configuration.
- For an example of using eKuiper with Edge Xpert, see eKuiper Example.
- For further information on the provided components, refer to the eKuiper Rules Engine tutorial.
- For further information on eKuiper, refer to the eKuiper page.
eKuiper Configuration
The default eKuiper environment variables are configured in the docker-compose file as follows:
EDGEX__DEFAULT__PORT: 1883
EDGEX__DEFAULT__PROTOCOL: tcp
EDGEX__DEFAULT__SERVER: mqtt-broker
EDGEX__DEFAULT__TOPIC: edgex/events/#
EDGEX__DEFAULT__TYPE: mqtt
EDGEX__DEFAULT__OPTIONAL__KEEPALIVE: "50"
EDGEX__DEFAULT__MESSAGETYPE: request
KUIPER__BASIC__CONSOLELOG: "true"
KUIPER__BASIC__RESTPORT: 59720
KUIPER__BASIC__DEBUG: "false"
Field | Arguments | Description |
---|---|---|
EDGEX__DEFAULT__PORT | <port_number> | The port number of the Edge Xpert message bus Default is 6379 |
EDGEX__DEFAULT__PROTOCOL | <protocol> | The protocol to use when connecting to the Edge Xpert message bus Default is redis |
EDGEX__DEFAULT__SERVER | <core-data_address> | The address of the Core Data service from which to obtain the value descriptors |
EDGEX__DEFAULT__TOPIC | <topic_name> | The topic name of the Edge Xpert message bus to which to subscribe Default is edgex/events/# |
EDGEX__DEFAULT__TYPE | <message_bus_type> | The message bus type Three types of message buses are supported: zero , mqtt and redis Default is mqtt |
EDGEX__DEFAULT__OPTIONAL__KEEPALIVE | <interval> (in seconds) | The interval between client sending keepalive ping to broker |
EDGEX__DEFAULT__MESSAGETYPE | <message_model_type> | The message model type To publish the message as an event like Application Service, use event . Otherwise, to publish the message as an event request like Device Service or Core Data, use request Default is request |
KUIPER__BASIC__CONSOLELOG | true (default) false |
Determines whether to print the log to console |
KUIPER__BASIC__RESTPORT | <port_number> | The port number of the eKuiper API server |
KUIPER__BASIC__DEBUG | true false (Default) |
Determines if the log level is set to debug |
If you need to update any of the default values, you must create a local copy of the docker-compose.yml before starting the eKuiper Service, as described in Using Docker Compose. Once you have created a local file, you can update the environments section and start Edge Xpert from the directory containing your local docker-compose file.
eKuiper Overview
LF Edge eKuiper is a lightweight open-source software (Apache 2.0. open-source license agreement) package for IoT edge analytics and stream processing implemented in Golang, which can run on various resource-constrained edge devices. Users can realise fast data processing on the edge and write rules in SQL. The eKuiper rules engine is based on three components Source
, SQL
and Sink
.
- Source: Source of stream data, such as data from an MQTT server. For Edge Xpert the data source is an EdgeX message bus, which can be implemented by ZeroMQ, Redis Pub/Sub or MQTT;
- SQL: SQL is where the specified business logic is processed. eKuiper provides SQL statements to extract, filter and transform data;
- Sink: Used to send the analysis result to a specific target, such as sending the analysis results to Core Command, or an MQTT broker in the cloud;
The relationship between Source, SQL and Sink in eKuiper is shown below.
By default Edge Xpert configuration eKuiper analyses data coming from the Edge Xpert message bus. Edge Xpert provides an abstract message bus interface, and implements the ZeroMQ, Redis Pub/Sub and MQTT protocols respectively to support information exchange between different microservices.
In order to process the data from the Edge Xpert message bus, you have to complete the following steps:
- Create a stream which specifies the data Source
- Write a rule. This has two parts:
- The SQL for data analysis
- The Sink target where the results of the analysis will be sent
- Deploy and run the rule
eKuiper Example
In this example, we will use the Virtual Device Service and run rules against the data generated by this device service.
Starting the Edge Xpert Services
In this example we need to start the Virtual Device Service and eKuiper:
edgexpert up device-virtual kuiper
Create a Data Stream
Creation of the data stream is managed in two parts.
-
Using the configuration to set up the IP Address and port of the data source- see eKuiper Configuration
-
Creating a stream via the eKuiper REST API or CLI
Create a Stream Using the REST API
Info
If Edge Xpert is running in secure mode, you will need to replace localhost in the URL with the IP address for the service. See CLI Service Ports for details.
Note
The port of eKuiper API is 59720
, instead of the default port 9081
To create a stream that will consume data from the Edge Xpert message bus, use the following command:
curl -X POST \
http://localhost:59720/streams \
-H 'Content-Type: application/json' \
-d '{
"sql": "create stream demo() WITH (FORMAT=\"JSON\", TYPE=\"edgex\")"
}'
Create a Stream Using eKuiper CLI
To enter the Kuiper docker instance and run the CLI, use the following command:
docker exec -it kuiper /bin/sh
To create a stream named demo by running the following command:
bin/kuiper create stream demo'() WITH (FORMAT="JSON", TYPE="edgex")'
Create a Rule
For this example we will create a rule that sends result data to an MQTT broker. See this link for detailed information about the MQTT Sink
Note
You can create the rule using either the REST API or the CLI.
We are going to create a rule that will get all of the values from the event topic. The sink result will:
- Publish to the topic result of the built-in MQTT broker
- Print to a log File
Create a Rule Using the REST API
To create a rule using the REST API use the following command:
curl -X POST \
http://localhost:59720/rules \
-H 'Content-Type: application/json' \
-d '{
"id": "rule1",
"sql": "SELECT * FROM demo",
"actions": [
{
"mqtt": {
"server": "tcp://mqtt-broker:1883",
"topic": "result",
"clientId": "demo_001"
}
},
{
"log":{}
}
]
}'
Create a Rule Using the eKuiper CLI
To create the rule run the command:
docker exec kuiper bin/kuiper create rule rule1 '
{
"id": "rule1",
"sql": "SELECT * FROM demo",
"actions": [
{
"mqtt": {
"server": "tcp://mqtt-broker:1883",
"topic": "result",
"clientId": "demo_001"
}
},
{
"log":{}
}
]
}'
Connecting to 127.0.0.1:20498...
Rule rule1 was created successfully, please use 'bin/kuiper getstatus rule rule1' command to get rule status.
View the Analysis Result
To view the results we can use Eclipse Mosquitto which provides an example MQTT Publisher and Subscriber. You can also use other MQTT Client Tools
To view the results using Eclipse Mosquitto, you can use the mosquitto_sub command as shown below:
mosquitto_sub -t result
[{"Float64":-6.432173e+307}]
[{"Float32":2.815127058905047e+38}]
[{"Int16":-10981}]
[{"Int8":17}]
[{"Int32":-2025653700}]
[{"Int64":-2025046756228926158}]
[{"Bool":false}]