Multiple Pipelines (Deprecated)
Overview
Alert
Since Edge Xpert v2.3.0, the feature of defining multiple pipelines triggered by different triggers has been deprecated, so this sample is obsolete and will be completely removed in v3.0. The purpose of this sample is to demonstrate two features of Edge Xpert Application Service:
1. defining multiple pipelines triggered by different triggers in a single Edge Xpert Application Service
2. subscribing MQTT messages from external MQTT broker to trigger a core command execution
However, since EdgeX Foundry Levski release adds support for an external MQTT connection on command service, this sample is no longer required and will be completely removed in Edge Xpert v3.0.
EdgeX Foundry provides Pipeline Per Topics feature to support use cases that require multiple functions pipelines to process data differently. However, there is a limitation with regard to the Pipeline Per Topics feature of EdgeX Foundry: all different pipelines can only be triggered by the same trigger. Edge Xpert application service also inherits the same feature and make additional enhancement to allow different pipelines can be triggered by different triggers as defined under [Trigger] section. With such enhancement, a single Edge Xpert application service can be configured to further support use cases that involve bidirectional data communication, such as sending device data to Cloud vendors and then receive the response back from Cloud vendors.
This example will create an application service with three different pipelines:
- a default pipeline to be triggered by HTTP Trigger and then print out the incoming data in the log.
- a pipeline to be triggered by EdgeX MessageBus Trigger, transform the data to XML format, and then sets transformation result as the response data that the pipeline returns to the configured trigger.
- a pipeline to be triggered by External MQTT Trigger, parse the input JSON message to execute a core command, and then export the execution result to the external MQTT broker.
Prerequisite
- Edge Xpert CLI installation
- Understand Edge Xpert Application Service
- Launch Edge Xpert core services, device-virtual, and mqtt-broker
Configure an Application service with three different pipelines
- We will need a configuration file to start up an application service. The recommended way is to copy the sample configuration as provided by Edge Xpert CLI, and then revise the configuration to satisfy your needs. Copy
/usr/share/edgexpert/examples/app-configurable/multiple-pipelines-triggers/configuration.tomlsample configuration file to your working directory. -
Open the copied configuration.toml file in a text editor, and you will observe the first pipeline, or so-called default pipeline, is defined under
[Writable.Pipeline]section as shown below:This default pipeline contains a single function[Writable.Pipeline] UseTargetTypeOfByteArray = true ExecutionOrder = "PrintDataToLog" # ExecutionOrder for the default pipelinePrintDataToLogin theExecutionOrder, and the pipeline will be triggered by the first trigger as defined in theTypeparameter of[Trigger], which will behttpin the sample below:[Trigger] Type = "http, edgex-messagebus, external-mqtt"Note
UseTargetTypeOfByteArray=trueset the TargetType to be a byte array/slice, i.e.[]bytefor the pipeline. Please refer to EdgeX Foundry documentation for more details about the usage ofUseTargetTypeOfByteArray -
In the configuration.toml file, the second pipeline
Pipeline1is defined under[Writable.Pipeline.PerTopicPipelines.Pipeline1]:[Writable.Pipeline.PerTopicPipelines.Pipeline1] UseTargetTypeOfByteArray = false Id = "Pipeline1" Topics = "edgex/events/#/#/Random-Float-Device/#" ExecutionOrder = "Transform, SetResponseData" TriggerType = "edgex-messagebus"Pipeline1contains two functions,TransformandSetResponseDatain theExecutionOrder, and the pipeline will be triggered by theedgex-messagebustrigger per the value ofTriggerTypeparameter. Theedgex-messagebustrigger configuration is defined under[Trigger.EdgexMessageBus]in the sample below:
Per this configuration,[Trigger.EdgexMessageBus] Type = "mqtt" [Trigger.EdgexMessageBus.SubscribeHost] Host = "localhost" Port = 1883 Protocol = "mqtt" SubscribeTopics="edgex/events/#" [Trigger.EdgexMessageBus.PublishHost] Host = "localhost" Port = 1883 Protocol = "mqtt" PublishTopic="example" [Trigger.EdgexMessageBus.Optional] ClientId ="new-app-service" Qos = "0" # Quality of Sevice values are 0 (At most once), 1 (At least once) or 2 (Exactly once) KeepAlive = "10" # Seconds (must be 2 or greater) Retained = "false" AutoReconnect = "true" ConnectTimeout = "30" # Seconds SkipCertVerify = "false" authmode = "none" # change to "usernamepassword", "clientcert", or "cacert" for secure MQTT messagebus. secretname = "mqtt-bus"Pipeline1will be triggered when any new message arrives at thelocalhost:1883MQTT broker via topics matching toedgex/events/# -
In the configuration.toml file, the third pipeline
Pipeline2is defined under[Writable.Pipeline.PerTopicPipelines.Pipeline2]:[Writable.Pipeline.PerTopicPipelines.Pipeline2] UseTargetTypeOfByteArray = true Id = "Pipeline2" Topics = "sample/command" ExecutionOrder = "ExecuteCoreCommand, XpertMQTTExport" TriggerType = "external-mqtt"Pipeline2contains two functions,ExecuteCoreCommandandXpertMQTTExportin theExecutionOrder, and the pipeline will be triggered by theexternal-mqtttrigger per the value ofTriggerTypeparameter. Theexternal-mqtttrigger configuration is defined under[Trigger.ExternalMqtt]in the sample below:Per this configuration,[Trigger.ExternalMqtt] Url = "tcp://localhost:1883" SubscribeTopics = "sample/#" PublishTopic = "" ClientId = "" ConnectTimeout = "30s" AutoReconnect = false KeepAlive = 60 QoS = 0 Retain = false SkipCertVerify = false SecretPath = "" AuthMode = "none"Pipeline2will be triggered when any new message arrives at thetcp://localhost:1883MQTT broker via topics matching tosample/#.Note
ExecuteCoreCommandparses the incoming JSON message to determine a core command, issue the core command via Core Command service, and then return the execution result. The default TargetType for data flowing into the functions pipeline is an EdgeX Event DTO. To receive JSON data, the functions pipeline containingExecuteCoreCommandfunction must be configured withUseTargetTypeOfByteArray=trueto set the TargetType to be a byte array. For more details about the usage ofExecuteCoreCommand, please refer to ExecuteCoreCommand documentation. -
Now, the application service is ready to start up. As this sample configuration uses the embedded MQTT Broker for both
edgex-messagebustrigger andexternal-mqtttrigger, there is no authentication required and no secrets need to be specified. If your Edge Xpert is running under secure mode, enter the following command, replacing the path with your configuration file:If your Edge Xpert is running under insecure mode, enter the following command, replacing the path with your configuration file:edgexpert up --secret app-service --path=<path your confgiuration file>edgexpert up app-service --path=<path your confgiuration file> -
Once the application is running up, you should expect to observe similar logs as shown below to indicate that three different type of triggers has been used:
level=INFO ts=2022-02-25T14:08:49.330582152Z app=app-multipipelines-triggers source=xperttriggerfactory.go:27 msg="HTTP trigger selected" level=INFO ts=2022-02-25T14:08:49.330608304Z app=app-multipipelines-triggers source=xperttriggerfactory.go:30 msg="EdgeX MessageBus trigger selected" level=INFO ts=2022-02-25T14:08:49.330612937Z app=app-multipipelines-triggers source=xperttriggerfactory.go:33 msg="External MQTT trigger selected" level=INFO ts=2022-02-25T14:08:49.330617886Z app=app-multipipelines-triggers source=rest.go:65 msg="Initializing HTTP Trigger" level=INFO ts=2022-02-25T14:08:49.330634219Z app=app-multipipelines-triggers source=rest.go:67 msg="HTTP Trigger Initialized" level=INFO ts=2022-02-25T14:08:49.330642224Z app=app-multipipelines-triggers source=messaging.go:65 msg="Initializing Message Bus Trigger for 'mqtt'" level=INFO ts=2022-02-25T14:08:49.330813907Z app=app-multipipelines-triggers source=configupdates.go:55 msg="Waiting for App Service configuration updates..." level=INFO ts=2022-02-25T14:08:49.331157001Z app=app-multipipelines-triggers source=telemetry.go:78 msg="Starting CPU Usage Average loop" level=INFO ts=2022-02-25T14:08:49.332230656Z app=app-multipipelines-triggers source=messaging.go:97 msg="Subscribing to topic(s): 'edgex/events/#' @ mqtt://localhost:1883" level=INFO ts=2022-02-25T14:08:49.332278818Z app=app-multipipelines-triggers source=messaging.go:106 msg="Publishing to topic: 'example' @ mqtt://localhost:1883" level=INFO ts=2022-02-25T14:08:49.332366156Z app=app-multipipelines-triggers source=messaging.go:118 msg="Waiting for messages from the MessageBus on the 'edgex/events/#' topic" level=INFO ts=2022-02-25T14:08:49.332565334Z app=app-multipipelines-triggers source=mqtt.go:78 msg="Initializing MQTT Trigger" level=INFO ts=2022-02-25T14:08:49.332605989Z app=app-multipipelines-triggers source=mqtt.go:115 msg="Connecting to mqtt broker for MQTT trigger at: tcp://localhost:1883" level=INFO ts=2022-02-25T14:08:49.333088152Z app=app-multipipelines-triggers source=mqtt.go:121 msg="Connected to mqtt server for MQTT trigger" level=INFO ts=2022-02-25T14:08:49.333220554Z app=app-multipipelines-triggers source=service.go:189 msg="StoreAndForward disabled. Not running retry loop." level=INFO ts=2022-02-25T14:08:49.333229237Z app=app-multipipelines-triggers source=service.go:192 msg="Sample Application Service has started" level=INFO ts=2022-02-25T14:08:49.333253066Z app=app-multipipelines-triggers source=server.go:156 msg="Starting HTTP Web Server on address localhost:59704" level=INFO ts=2022-02-25T14:08:49.333409887Z app=app-multipipelines-triggers source=mqtt.go:149 msg="Subscribed to topic(s) 'sample/#' for MQTT trigger" -
To verify the running application service, we have to obtain the IP addresses for both mqtt-broker and application service:
Ideally, the command result will list ip addresses of running services:edgexpert ipName |IP --------------------------------------------- core-command |172.19.0.6 core-data |172.19.0.8 core-keeper |172.19.0.4 core-metadata |172.19.0.5 device-virtual |172.19.0.7 mqtt-broker |172.19.0.2 redis |172.19.0.3 app-multipipelines-triggers |172.19.0.9 -
To verify the
default-pipelineas defined in the step 2, we can send a POST request with a JSON message to the application service:Ideally, you should expect to observe similar log as shown below to indicate that thecurl -X POST http://<replace with the ip of application service as obtained from step 7>:59740/api/v2/trigger -d '{ "message": "this is a message to trigger default-pipeline"}'default-pipelinewas triggered and then usedPrintDataToLogfunction to print out the received message in the log:level=INFO ts=2022-02-25T14:27:18.509508671Z app=app-multipipelines-triggers source=printdatatolog.go:19 msg="Data: { \"message\": \"this is a message to trigger default-pipeline\"}" -
To verify the
Pipeline1as defined in the step 3, we can use mosquitto_sub command to subscribe for the data as published back toedgex-messagebusbySetResponseDatafunction in the pipeline:Ideally, mosquitto_sub should expect to receive similar messages as shown below:mosquitto_sub -h <replace with the ip of mqtt-broker as obtained from step 7> -t example{"ReceivedTopic":"","CorrelationID":"c1324fbb-e846-49fd-8e7d-0c7a989d79ad","Payload":"PEV2ZW50PjxBcGlWZXJzaW9uPnYyPC9BcGlWZXJzaW9uPjxJZD4zZWQ1MjgwZS1hNTdiLTQzMTYtOTNlMS04MzZhNmU2YTc4NzQ8L0lkPjxEZXZpY2VOYW1lPlJhbmRvbS1GbG9hdC1EZXZpY2U8L0RldmljZU5hbWU+PFByb2ZpbGVOYW1lPlJhbmRvbS1GbG9hdC1EZXZpY2U8L1Byb2ZpbGVOYW1lPjxTb3VyY2VOYW1lPkZsb2F0NjQ8L1NvdXJjZU5hbWU+PE9yaWdpbj4xNjU2NTU1Mzc0MDEzNzM5ODg2PC9PcmlnaW4+PFJlYWRpbmdzPjxJZD5iYTdlMjMzYy1hNThmLTQ1MjMtYTNkYy1kYWRlZTkyNjFkMmY8L0lkPjxPcmlnaW4+MTY1NjU1NTM3NDAxMzczOTg4NjwvT3JpZ2luPjxEZXZpY2VOYW1lPlJhbmRvbS1GbG9hdC1EZXZpY2U8L0RldmljZU5hbWU+PFJlc291cmNlTmFtZT5GbG9hdDY0PC9SZXNvdXJjZU5hbWU+PFByb2ZpbGVOYW1lPlJhbmRvbS1GbG9hdC1EZXZpY2U8L1Byb2ZpbGVOYW1lPjxWYWx1ZVR5cGU+RmxvYXQ2NDwvVmFsdWVUeXBlPjxVbml0cz48L1VuaXRzPjxCaW5hcnlWYWx1ZT48L0JpbmFyeVZhbHVlPjxNZWRpYVR5cGU+PC9NZWRpYVR5cGU+PFZhbHVlPjkuMzg0NDMwZSswMjwvVmFsdWU+PC9SZWFkaW5ncz48L0V2ZW50Pg==","ContentType":"application/xml"} -
To verify the
Pipeline2as defined in the step 4, we will need to publish a JSON message indicating the target core command to themqtt-brokervia topicsample/command:The payload as defined in the above command will triggermosquitto_pub -h <replace with the ip of mqtt-broker as obtained from step 7> -t sample/command -m '{ "DeviceName": "Random-Integer-Device","CommandName":"Int8","RequestMethod":"GET"}'Pipeline2to issue aGETrequest on theInt8command of theRandom-Integer-Devicedevice.Pipeline2will then export the command execution result tomqtt-brokervia topicresult. To ensure the command execution result does arrive in themqtt-broker, we can use following command to subscribe for the messages:Ideally, we should expect to observe similar message as shown below:mosquitto_sub -h <replace with the ip of mqtt-broker as obtained from step 7> -t result{"apiVersion":"v2","statusCode":200,"event":{"apiVersion":"v2","id":"725b6b80-7e74-4db1-966b-6b1adc37614e","deviceName":"Random-Integer-Device","profileName":"Random-Integer-Device","sourceName":"Int8","origin":1645803920926367223,"readings":[{"id":"2b84db7e-1173-43f6-a1c2-9ae7c1a468aa","origin":1645803920926367223,"deviceName":"Random-Integer-Device","resourceName":"Int8","profileName":"Random-Integer-Device","valueType":"Int8","value":"123"}]}}
More configuration details about ExecuteCoreCommand function
ExecuteCoreCommand pipeline function parses the input JSON message using GJSON library to obtain the following information:
- Device Name
- Command Name
- Request Method
- Request Body
Note
To understand the GJSON path syntax, please refer to Path Syntax.
Above information will be used by ExecuteCoreCommand to issue a core command to Core Command service. If the core command invocation is successful, ExecuteCoreCommand will pass the response result received from the Core Command service to the next function in the pipeline. If the core command invocation results in any error, ExecuteCoreCommand will return the error to stop the pipeline execution by default.
| Configuration | Description |
|---|---|
| DeviceNameJSONPath | Mandatory configuration to specify the GJSON path to get Device Name from the incoming JSON message. |
| CommandNameJSONPath | Mandatory configuration to specify the GJSON path to get Command Name from the incoming JSON message. |
| BodyJSONPath | Specify the GJSON path to get Request Body from the incoming JSON message. This configuration must be specified when the Request Method is either SET or PUT |
| RequestMethodJSONPath | Specify the GJSON path to get Request Method from the incoming JSON message. Must be defined if DefaultRequestMethod is not defined. |
| DefaultRequestMethod | Specify the default value for Request Method. Must be defined if RequestMethodJSONPath is not defined. Valid values are GET, and SET. |
| PushEvent | Specify if a successful GET will result in the event being pushed to the EdgeX system. Valid values are yes, or no. Default value is no. Effective only when the identified value of Request Method is GET. |
| ReturnEvent | Specify if a successful GET will return the event or not. Valid values are yes, or no. Default value is yes. Effective only when the identified value of Request Method is GET. |
| ContinueOnError | Specify if ExecuteCoreCommand should terminate the pipeline when error occurs. Valid values are true, or false. Default value is false. This configuration is useful for chained export functions. If ContinueOnError=true, the functions pipeline continues after an error occurs so next export function executes. |
For example, if the ExecuteCoreCommand pipeline function receives the following JSON message:
{
"payload": {
"DeviceName": "Random-Integer-Device",
"CommandName": "Int8",
"RequestMethod": "SET",
"Body": "{\"Int8\":\"123\"}"
}
}
[Writable.Pipeline.Functions.ExecuteCoreCommand] # See <ExecuteCoreCommand documentation url TBD> for more details.
[Writable.Pipeline.Functions.ExecuteCoreCommand.Parameters]
DeviceNameJSONPath = 'payload.DeviceName'
CommandNameJSONPath = 'payload.CommandName'
BodyJSONPath = 'payload.Body'
RequestMethodJSONPath = 'payload.RequestMethod'
ExecuteCoreCommand will issue a SET request to Int8 command of Random-Intger-Device device, where the payload is in following format:
{
"Int8" : "123"
}