Multiple Pipelines (Deprecated)
Overview
Alert
Since Edge Xpert v2.3.0, the feature of defining multiple pipelines triggered by different triggers has been deprecated, so this sample is obsolete and will be completely removed in v3.0. The purpose of this sample is to demonstrate two features of Edge Xpert Application Service:
1. defining multiple pipelines triggered by different triggers in a single Edge Xpert Application Service
2. subscribing MQTT messages from external MQTT broker to trigger a core command execution
However, since EdgeX Foundry Levski release adds support for an external MQTT connection on command service, this sample is no longer required and will be completely removed in Edge Xpert v3.0.
EdgeX Foundry provides Pipeline Per Topics feature to support use cases that require multiple functions pipelines to process data differently. However, there is a limitation with regard to the Pipeline Per Topics feature of EdgeX Foundry: all different pipelines can only be triggered by the same trigger. Edge Xpert application service also inherits the same feature and make additional enhancement to allow different pipelines can be triggered by different triggers as defined under [Trigger]
section. With such enhancement, a single Edge Xpert application service can be configured to further support use cases that involve bidirectional data communication, such as sending device data to Cloud vendors and then receive the response back from Cloud vendors.
This example will create an application service with three different pipelines:
- a default pipeline to be triggered by HTTP Trigger and then print out the incoming data in the log.
- a pipeline to be triggered by EdgeX MessageBus Trigger, transform the data to XML format, and then sets transformation result as the response data that the pipeline returns to the configured trigger.
- a pipeline to be triggered by External MQTT Trigger, parse the input JSON message to execute a core command, and then export the execution result to the external MQTT broker.
Prerequisite
- Edge Xpert CLI installation
- Understand Edge Xpert Application Service
- Launch Edge Xpert core services, device-virtual, and mqtt-broker
Configure an Application service with three different pipelines
- We will need a configuration file to start up an application service. The recommended way is to copy the sample configuration as provided by Edge Xpert CLI, and then revise the configuration to satisfy your needs. Copy
/usr/share/edgexpert/examples/app-configurable/multiple-pipelines-triggers/configuration.toml
sample configuration file to your working directory. -
Open the copied configuration.toml file in a text editor, and you will observe the first pipeline, or so-called default pipeline, is defined under
[Writable.Pipeline]
section as shown below:This default pipeline contains a single function[Writable.Pipeline] UseTargetTypeOfByteArray = true ExecutionOrder = "PrintDataToLog" # ExecutionOrder for the default pipeline
PrintDataToLog
in theExecutionOrder
, and the pipeline will be triggered by the first trigger as defined in theType
parameter of[Trigger]
, which will behttp
in the sample below:[Trigger] Type = "http, edgex-messagebus, external-mqtt"
Note
UseTargetTypeOfByteArray=true
set the TargetType to be a byte array/slice, i.e.[]byte
for the pipeline. Please refer to EdgeX Foundry documentation for more details about the usage ofUseTargetTypeOfByteArray
-
In the configuration.toml file, the second pipeline
Pipeline1
is defined under[Writable.Pipeline.PerTopicPipelines.Pipeline1]
:[Writable.Pipeline.PerTopicPipelines.Pipeline1] UseTargetTypeOfByteArray = false Id = "Pipeline1" Topics = "edgex/events/#/#/Random-Float-Device/#" ExecutionOrder = "Transform, SetResponseData" TriggerType = "edgex-messagebus"
Pipeline1
contains two functions,Transform
andSetResponseData
in theExecutionOrder
, and the pipeline will be triggered by theedgex-messagebus
trigger per the value ofTriggerType
parameter. Theedgex-messagebus
trigger configuration is defined under[Trigger.EdgexMessageBus]
in the sample below:
Per this configuration,[Trigger.EdgexMessageBus] Type = "mqtt" [Trigger.EdgexMessageBus.SubscribeHost] Host = "localhost" Port = 1883 Protocol = "mqtt" SubscribeTopics="edgex/events/#" [Trigger.EdgexMessageBus.PublishHost] Host = "localhost" Port = 1883 Protocol = "mqtt" PublishTopic="example" [Trigger.EdgexMessageBus.Optional] ClientId ="new-app-service" Qos = "0" # Quality of Sevice values are 0 (At most once), 1 (At least once) or 2 (Exactly once) KeepAlive = "10" # Seconds (must be 2 or greater) Retained = "false" AutoReconnect = "true" ConnectTimeout = "30" # Seconds SkipCertVerify = "false" authmode = "none" # change to "usernamepassword", "clientcert", or "cacert" for secure MQTT messagebus. secretname = "mqtt-bus"
Pipeline1
will be triggered when any new message arrives at thelocalhost:1883
MQTT broker via topics matching toedgex/events/#
-
In the configuration.toml file, the third pipeline
Pipeline2
is defined under[Writable.Pipeline.PerTopicPipelines.Pipeline2]
:[Writable.Pipeline.PerTopicPipelines.Pipeline2] UseTargetTypeOfByteArray = true Id = "Pipeline2" Topics = "sample/command" ExecutionOrder = "ExecuteCoreCommand, XpertMQTTExport" TriggerType = "external-mqtt"
Pipeline2
contains two functions,ExecuteCoreCommand
andXpertMQTTExport
in theExecutionOrder
, and the pipeline will be triggered by theexternal-mqtt
trigger per the value ofTriggerType
parameter. Theexternal-mqtt
trigger configuration is defined under[Trigger.ExternalMqtt]
in the sample below:Per this configuration,[Trigger.ExternalMqtt] Url = "tcp://localhost:1883" SubscribeTopics = "sample/#" PublishTopic = "" ClientId = "" ConnectTimeout = "30s" AutoReconnect = false KeepAlive = 60 QoS = 0 Retain = false SkipCertVerify = false SecretPath = "" AuthMode = "none"
Pipeline2
will be triggered when any new message arrives at thetcp://localhost:1883
MQTT broker via topics matching tosample/#
.Note
ExecuteCoreCommand
parses the incoming JSON message to determine a core command, issue the core command via Core Command service, and then return the execution result. The default TargetType for data flowing into the functions pipeline is an EdgeX Event DTO. To receive JSON data, the functions pipeline containingExecuteCoreCommand
function must be configured withUseTargetTypeOfByteArray=true
to set the TargetType to be a byte array. For more details about the usage ofExecuteCoreCommand
, please refer to ExecuteCoreCommand documentation. -
Now, the application service is ready to start up. As this sample configuration uses the embedded MQTT Broker for both
edgex-messagebus
trigger andexternal-mqtt
trigger, there is no authentication required and no secrets need to be specified. If your Edge Xpert is running under secure mode, enter the following command, replacing the path with your configuration file:If your Edge Xpert is running under insecure mode, enter the following command, replacing the path with your configuration file:edgexpert up --secret app-service --path=<path your confgiuration file>
edgexpert up app-service --path=<path your confgiuration file>
-
Once the application is running up, you should expect to observe similar logs as shown below to indicate that three different type of triggers has been used:
level=INFO ts=2022-02-25T14:08:49.330582152Z app=app-multipipelines-triggers source=xperttriggerfactory.go:27 msg="HTTP trigger selected" level=INFO ts=2022-02-25T14:08:49.330608304Z app=app-multipipelines-triggers source=xperttriggerfactory.go:30 msg="EdgeX MessageBus trigger selected" level=INFO ts=2022-02-25T14:08:49.330612937Z app=app-multipipelines-triggers source=xperttriggerfactory.go:33 msg="External MQTT trigger selected" level=INFO ts=2022-02-25T14:08:49.330617886Z app=app-multipipelines-triggers source=rest.go:65 msg="Initializing HTTP Trigger" level=INFO ts=2022-02-25T14:08:49.330634219Z app=app-multipipelines-triggers source=rest.go:67 msg="HTTP Trigger Initialized" level=INFO ts=2022-02-25T14:08:49.330642224Z app=app-multipipelines-triggers source=messaging.go:65 msg="Initializing Message Bus Trigger for 'mqtt'" level=INFO ts=2022-02-25T14:08:49.330813907Z app=app-multipipelines-triggers source=configupdates.go:55 msg="Waiting for App Service configuration updates..." level=INFO ts=2022-02-25T14:08:49.331157001Z app=app-multipipelines-triggers source=telemetry.go:78 msg="Starting CPU Usage Average loop" level=INFO ts=2022-02-25T14:08:49.332230656Z app=app-multipipelines-triggers source=messaging.go:97 msg="Subscribing to topic(s): 'edgex/events/#' @ mqtt://localhost:1883" level=INFO ts=2022-02-25T14:08:49.332278818Z app=app-multipipelines-triggers source=messaging.go:106 msg="Publishing to topic: 'example' @ mqtt://localhost:1883" level=INFO ts=2022-02-25T14:08:49.332366156Z app=app-multipipelines-triggers source=messaging.go:118 msg="Waiting for messages from the MessageBus on the 'edgex/events/#' topic" level=INFO ts=2022-02-25T14:08:49.332565334Z app=app-multipipelines-triggers source=mqtt.go:78 msg="Initializing MQTT Trigger" level=INFO ts=2022-02-25T14:08:49.332605989Z app=app-multipipelines-triggers source=mqtt.go:115 msg="Connecting to mqtt broker for MQTT trigger at: tcp://localhost:1883" level=INFO ts=2022-02-25T14:08:49.333088152Z app=app-multipipelines-triggers source=mqtt.go:121 msg="Connected to mqtt server for MQTT trigger" level=INFO ts=2022-02-25T14:08:49.333220554Z app=app-multipipelines-triggers source=service.go:189 msg="StoreAndForward disabled. Not running retry loop." level=INFO ts=2022-02-25T14:08:49.333229237Z app=app-multipipelines-triggers source=service.go:192 msg="Sample Application Service has started" level=INFO ts=2022-02-25T14:08:49.333253066Z app=app-multipipelines-triggers source=server.go:156 msg="Starting HTTP Web Server on address localhost:59704" level=INFO ts=2022-02-25T14:08:49.333409887Z app=app-multipipelines-triggers source=mqtt.go:149 msg="Subscribed to topic(s) 'sample/#' for MQTT trigger"
-
To verify the running application service, we have to obtain the IP addresses for both mqtt-broker and application service:
Ideally, the command result will list ip addresses of running services:edgexpert ip
Name |IP --------------------------------------------- core-command |172.19.0.6 core-data |172.19.0.8 core-keeper |172.19.0.4 core-metadata |172.19.0.5 device-virtual |172.19.0.7 mqtt-broker |172.19.0.2 redis |172.19.0.3 app-multipipelines-triggers |172.19.0.9
-
To verify the
default-pipeline
as defined in the step 2, we can send a POST request with a JSON message to the application service:Ideally, you should expect to observe similar log as shown below to indicate that thecurl -X POST http://<replace with the ip of application service as obtained from step 7>:59740/api/v2/trigger -d '{ "message": "this is a message to trigger default-pipeline"}'
default-pipeline
was triggered and then usedPrintDataToLog
function to print out the received message in the log:level=INFO ts=2022-02-25T14:27:18.509508671Z app=app-multipipelines-triggers source=printdatatolog.go:19 msg="Data: { \"message\": \"this is a message to trigger default-pipeline\"}"
-
To verify the
Pipeline1
as defined in the step 3, we can use mosquitto_sub command to subscribe for the data as published back toedgex-messagebus
bySetResponseData
function in the pipeline:Ideally, mosquitto_sub should expect to receive similar messages as shown below:mosquitto_sub -h <replace with the ip of mqtt-broker as obtained from step 7> -t example
{"ReceivedTopic":"","CorrelationID":"c1324fbb-e846-49fd-8e7d-0c7a989d79ad","Payload":"PEV2ZW50PjxBcGlWZXJzaW9uPnYyPC9BcGlWZXJzaW9uPjxJZD4zZWQ1MjgwZS1hNTdiLTQzMTYtOTNlMS04MzZhNmU2YTc4NzQ8L0lkPjxEZXZpY2VOYW1lPlJhbmRvbS1GbG9hdC1EZXZpY2U8L0RldmljZU5hbWU+PFByb2ZpbGVOYW1lPlJhbmRvbS1GbG9hdC1EZXZpY2U8L1Byb2ZpbGVOYW1lPjxTb3VyY2VOYW1lPkZsb2F0NjQ8L1NvdXJjZU5hbWU+PE9yaWdpbj4xNjU2NTU1Mzc0MDEzNzM5ODg2PC9PcmlnaW4+PFJlYWRpbmdzPjxJZD5iYTdlMjMzYy1hNThmLTQ1MjMtYTNkYy1kYWRlZTkyNjFkMmY8L0lkPjxPcmlnaW4+MTY1NjU1NTM3NDAxMzczOTg4NjwvT3JpZ2luPjxEZXZpY2VOYW1lPlJhbmRvbS1GbG9hdC1EZXZpY2U8L0RldmljZU5hbWU+PFJlc291cmNlTmFtZT5GbG9hdDY0PC9SZXNvdXJjZU5hbWU+PFByb2ZpbGVOYW1lPlJhbmRvbS1GbG9hdC1EZXZpY2U8L1Byb2ZpbGVOYW1lPjxWYWx1ZVR5cGU+RmxvYXQ2NDwvVmFsdWVUeXBlPjxVbml0cz48L1VuaXRzPjxCaW5hcnlWYWx1ZT48L0JpbmFyeVZhbHVlPjxNZWRpYVR5cGU+PC9NZWRpYVR5cGU+PFZhbHVlPjkuMzg0NDMwZSswMjwvVmFsdWU+PC9SZWFkaW5ncz48L0V2ZW50Pg==","ContentType":"application/xml"}
-
To verify the
Pipeline2
as defined in the step 4, we will need to publish a JSON message indicating the target core command to themqtt-broker
via topicsample/command
:The payload as defined in the above command will triggermosquitto_pub -h <replace with the ip of mqtt-broker as obtained from step 7> -t sample/command -m '{ "DeviceName": "Random-Integer-Device","CommandName":"Int8","RequestMethod":"GET"}'
Pipeline2
to issue aGET
request on theInt8
command of theRandom-Integer-Device
device.Pipeline2
will then export the command execution result tomqtt-broker
via topicresult
. To ensure the command execution result does arrive in themqtt-broker
, we can use following command to subscribe for the messages:Ideally, we should expect to observe similar message as shown below:mosquitto_sub -h <replace with the ip of mqtt-broker as obtained from step 7> -t result
{"apiVersion":"v2","statusCode":200,"event":{"apiVersion":"v2","id":"725b6b80-7e74-4db1-966b-6b1adc37614e","deviceName":"Random-Integer-Device","profileName":"Random-Integer-Device","sourceName":"Int8","origin":1645803920926367223,"readings":[{"id":"2b84db7e-1173-43f6-a1c2-9ae7c1a468aa","origin":1645803920926367223,"deviceName":"Random-Integer-Device","resourceName":"Int8","profileName":"Random-Integer-Device","valueType":"Int8","value":"123"}]}}
More configuration details about ExecuteCoreCommand function
ExecuteCoreCommand
pipeline function parses the input JSON message using GJSON library to obtain the following information:
- Device Name
- Command Name
- Request Method
- Request Body
Note
To understand the GJSON path syntax, please refer to Path Syntax.
Above information will be used by ExecuteCoreCommand
to issue a core command to Core Command service. If the core command invocation is successful, ExecuteCoreCommand
will pass the response result received from the Core Command service to the next function in the pipeline. If the core command invocation results in any error, ExecuteCoreCommand
will return the error to stop the pipeline execution by default.
Configuration | Description |
---|---|
DeviceNameJSONPath | Mandatory configuration to specify the GJSON path to get Device Name from the incoming JSON message. |
CommandNameJSONPath | Mandatory configuration to specify the GJSON path to get Command Name from the incoming JSON message. |
BodyJSONPath | Specify the GJSON path to get Request Body from the incoming JSON message. This configuration must be specified when the Request Method is either SET or PUT |
RequestMethodJSONPath | Specify the GJSON path to get Request Method from the incoming JSON message. Must be defined if DefaultRequestMethod is not defined. |
DefaultRequestMethod | Specify the default value for Request Method . Must be defined if RequestMethodJSONPath is not defined. Valid values are GET , and SET . |
PushEvent | Specify if a successful GET will result in the event being pushed to the EdgeX system. Valid values are yes , or no . Default value is no . Effective only when the identified value of Request Method is GET . |
ReturnEvent | Specify if a successful GET will return the event or not. Valid values are yes , or no . Default value is yes . Effective only when the identified value of Request Method is GET . |
ContinueOnError | Specify if ExecuteCoreCommand should terminate the pipeline when error occurs. Valid values are true , or false . Default value is false . This configuration is useful for chained export functions. If ContinueOnError=true , the functions pipeline continues after an error occurs so next export function executes. |
For example, if the ExecuteCoreCommand
pipeline function receives the following JSON message:
{
"payload": {
"DeviceName": "Random-Integer-Device",
"CommandName": "Int8",
"RequestMethod": "SET",
"Body": "{\"Int8\":\"123\"}"
}
}
[Writable.Pipeline.Functions.ExecuteCoreCommand] # See <ExecuteCoreCommand documentation url TBD> for more details.
[Writable.Pipeline.Functions.ExecuteCoreCommand.Parameters]
DeviceNameJSONPath = 'payload.DeviceName'
CommandNameJSONPath = 'payload.CommandName'
BodyJSONPath = 'payload.Body'
RequestMethodJSONPath = 'payload.RequestMethod'
ExecuteCoreCommand
will issue a SET
request to Int8
command of Random-Intger-Device
device, where the payload is in following format:
{
"Int8" : "123"
}