Skip to content

Multiple Pipelines

Overview

EdgeX Foundry provides Pipeline Per Topics feature to support use cases that require multiple functions pipelines to process data differently. However, there is a limitation with regard to the Pipeline Per Topics feature of EdgeX Foundry: all different pipelines can only be triggered by the same trigger. Edge Xpert application service also inherits the same feature and make additional enhancement to allow different pipelines can be triggered by different triggers as defined under [Trigger] section. With such enhancement, a single Edge Xpert application service can be configured to further support use cases that involve bidirectional data communication, such as sending device data to Cloud vendors and then receive the response back from Cloud vendors.

This example will create an application service with three different pipelines:

  1. a default pipeline to be triggered by HTTP Trigger and then print out the incoming data in the log.
  2. a pipeline to be triggered by EdgeX MessageBus Trigger, transform the data to XML format, and then sets transformation result as the response data that the pipeline returns to the configured trigger.
  3. a pipeline to be triggered by External MQTT Trigger, parse the input JSON message to execute a core command, and then export the execution result to the external MQTT broker.

Prerequisite

  1. Edge Xpert CLI installation
  2. Understand Edge Xpert Application Service
  3. Launch Edge Xpert core services, device-virtual, and mqtt-broker

Configure an Application service with three different pipelines

  1. We will need a configuration file to start up an application service. The recommended way is to copy the sample configuration as provided by Edge Xpert CLI, and then revise the configuration to satisfy your needs. Copy /usr/share/edgexpert/examples/app-configurable/multiple-pipelines-triggers/configuration.toml sample configuration file to your working directory.
  2. Open the copied configuration.toml file in a text editor, and you will observe the first pipeline, or so-called default pipeline, is defined under [Writable.Pipeline] section as shown below:

    [Writable.Pipeline]
    UseTargetTypeOfByteArray = true
    ExecutionOrder = "PrintDataToLog" # ExecutionOrder for the default pipeline
    
    This default pipeline contains a single function PrintDataToLog in the ExecutionOrder, and the pipeline will be triggered by the first trigger as defined in the Type parameter of [Trigger], which will be http in the sample below:
    [Trigger]
    Type = "http, edgex-messagebus, external-mqtt"
    

    Note

    UseTargetTypeOfByteArray=true set the TargetType to be a byte array/slice, i.e. []byte for the pipeline. Please refer to EdgeX Foundry documentation for more details about the usage of UseTargetTypeOfByteArray

  3. In the configuration.toml file, the second pipeline Pipeline1 is defined under [Writable.Pipeline.PerTopicPipelines.Pipeline1]:

    [Writable.Pipeline.PerTopicPipelines.Pipeline1]
    UseTargetTypeOfByteArray = false
    Id = "Pipeline1"
    Topics = "edgex/events/#/#/Random-Float-Device/#"
    ExecutionOrder = "Transform, SetResponseData"
    TriggerType = "edgex-messagebus"
    
    Pipeline1 contains two functions, Transform and SetResponseData in the ExecutionOrder, and the pipeline will be triggered by the edgex-messagebus trigger per the value of TriggerType parameter. The edgex-messagebus trigger configuration is defined under [Trigger.EdgexMessageBus] in the sample below:
    [Trigger.EdgexMessageBus]
    Type = "mqtt"
      [Trigger.EdgexMessageBus.SubscribeHost]
      Host = "localhost"
      Port = 1883
      Protocol = "mqtt"
      SubscribeTopics="edgex/events/#"
      [Trigger.EdgexMessageBus.PublishHost]
      Host = "localhost"
      Port = 1883
      Protocol = "mqtt"
      PublishTopic="example"
      [Trigger.EdgexMessageBus.Optional]
      ClientId ="new-app-service"
      Qos            = "0" # Quality of Sevice values are 0 (At most once), 1 (At least once) or 2 (Exactly once)
      KeepAlive      = "10" # Seconds (must be 2 or greater)
      Retained       = "false"
      AutoReconnect  = "true"
      ConnectTimeout = "30" # Seconds
      SkipCertVerify = "false"
      authmode = "none"  # change to "usernamepassword", "clientcert", or "cacert" for secure MQTT messagebus.
      secretname = "mqtt-bus"
    
    Per this configuration, Pipeline1 will be triggered when any new message arrives at the localhost:1883 MQTT broker via topics matching to edgex/events/#

  4. In the configuration.toml file, the third pipeline Pipeline2 is defined under [Writable.Pipeline.PerTopicPipelines.Pipeline2]:

    [Writable.Pipeline.PerTopicPipelines.Pipeline2]
    UseTargetTypeOfByteArray = true
    Id = "Pipeline2"
    Topics = "sample/command"
    ExecutionOrder = "ExecuteCoreCommand, XpertMQTTExport"
    TriggerType = "external-mqtt"
    
    Pipeline2 contains two functions, ExecuteCoreCommand and XpertMQTTExport in the ExecutionOrder, and the pipeline will be triggered by the external-mqtt trigger per the value of TriggerType parameter. The external-mqtt trigger configuration is defined under [Trigger.ExternalMqtt] in the sample below:
    [Trigger.ExternalMqtt]
    Url = "tcp://localhost:1883"
    SubscribeTopics = "sample/#"
    PublishTopic = ""
    ClientId = ""
    ConnectTimeout = "30s"
    AutoReconnect = false
    KeepAlive = 60
    QoS = 0
    Retain = false
    SkipCertVerify = false
    SecretPath = ""
    AuthMode = "none"
    
    Per this configuration, Pipeline2 will be triggered when any new message arrives at the tcp://localhost:1883 MQTT broker via topics matching to sample/#.

    Note

    ExecuteCoreCommand parses the incoming JSON message to determine a core command, issue the core command via Core Command service, and then return the execution result. The default TargetType for data flowing into the functions pipeline is an EdgeX Event DTO. To receive JSON data, the functions pipeline containing ExecuteCoreCommand function must be configured with UseTargetTypeOfByteArray=true to set the TargetType to be a byte array. For more details about the usage of ExecuteCoreCommand, please refer to ExecuteCoreCommand documentation.

  5. Now, the application service is ready to start up. As this sample configuration uses the embedded MQTT Broker for both edgex-messagebus trigger and external-mqtt trigger, there is no authentication required and no secrets need to be specified. If your Edge Xpert is running under secure mode, enter the following command, replacing the path with your configuration file:

    edgexpert up --secret app-service --path=<path your confgiuration file>
    
    If your Edge Xpert is running under insecure mode, enter the following command, replacing the path with your configuration file:
    edgexpert up app-service --path=<path your confgiuration file>
    

  6. Once the application is running up, you should expect to observe similar logs as shown below to indicate that three different type of triggers has been used:

    level=INFO ts=2022-02-25T14:08:49.330582152Z app=app-multiple-pipelines-triggers-v21signoff source=xperttriggerfactory.go:27 msg="HTTP trigger selected"
    level=INFO ts=2022-02-25T14:08:49.330608304Z app=app-multiple-pipelines-triggers-v21signoff source=xperttriggerfactory.go:30 msg="EdgeX MessageBus trigger selected"
    level=INFO ts=2022-02-25T14:08:49.330612937Z app=app-multiple-pipelines-triggers-v21signoff source=xperttriggerfactory.go:33 msg="External MQTT trigger selected"
    level=INFO ts=2022-02-25T14:08:49.330617886Z app=app-multiple-pipelines-triggers-v21signoff source=rest.go:65 msg="Initializing HTTP Trigger"
    level=INFO ts=2022-02-25T14:08:49.330634219Z app=app-multiple-pipelines-triggers-v21signoff source=rest.go:67 msg="HTTP Trigger Initialized"
    level=INFO ts=2022-02-25T14:08:49.330642224Z app=app-multiple-pipelines-triggers-v21signoff source=messaging.go:65 msg="Initializing Message Bus Trigger for 'mqtt'"
    level=INFO ts=2022-02-25T14:08:49.330813907Z app=app-multiple-pipelines-triggers-v21signoff source=configupdates.go:55 msg="Waiting for App Service configuration updates..."
    level=INFO ts=2022-02-25T14:08:49.331157001Z app=app-multiple-pipelines-triggers-v21signoff source=telemetry.go:78 msg="Starting CPU Usage Average loop"
    level=INFO ts=2022-02-25T14:08:49.332230656Z app=app-multiple-pipelines-triggers-v21signoff source=messaging.go:97 msg="Subscribing to topic(s): 'edgex/events/#' @ mqtt://localhost:1883"
    level=INFO ts=2022-02-25T14:08:49.33227888Z app=app-multiple-pipelines-triggers-v21signoff source=messaging.go:106 msg="Publishing to topic: 'example' @ mqtt://localhost:1883"
    level=INFO ts=2022-02-25T14:08:49.332366156Z app=app-multiple-pipelines-triggers-v21signoff source=messaging.go:118 msg="Waiting for messages from the MessageBus on the 'edgex/events/#' topic"
    level=INFO ts=2022-02-25T14:08:49.332565334Z app=app-multiple-pipelines-triggers-v21signoff source=mqtt.go:78 msg="Initializing MQTT Trigger"
    level=INFO ts=2022-02-25T14:08:49.332605989Z app=app-multiple-pipelines-triggers-v21signoff source=mqtt.go:115 msg="Connecting to mqtt broker for MQTT trigger at: tcp://localhost:1883"
    level=INFO ts=2022-02-25T14:08:49.333088152Z app=app-multiple-pipelines-triggers-v21signoff source=mqtt.go:121 msg="Connected to mqtt server for MQTT trigger"
    level=INFO ts=2022-02-25T14:08:49.333220554Z app=app-multiple-pipelines-triggers-v21signoff source=service.go:189 msg="StoreAndForward disabled. Not running retry loop."
    level=INFO ts=2022-02-25T14:08:49.333229237Z app=app-multiple-pipelines-triggers-v21signoff source=service.go:192 msg="Sample Application Service has started"
    level=INFO ts=2022-02-25T14:08:49.333253066Z app=app-multiple-pipelines-triggers-v21signoff source=server.go:156 msg="Starting HTTP Web Server on address localhost:59704"
    level=INFO ts=2022-02-25T14:08:49.333409887Z app=app-multiple-pipelines-triggers-v21signoff source=mqtt.go:149 msg="Subscribed to topic(s) 'sample/#' for MQTT trigger"
    

  7. To verify the default-pipeline as defined in the step 2, we can send a POST request with a JSON message:

    curl -X POST http://localhost:59740/api/v2/trigger -d '{ "message": "this is a message to trigger default-pipeline"}'
    
    Ideally, you should expect to observe similar log as shown below to indicate that the default-pipeline was triggered and then used PrintDataToLog function to print out the received message in the log:
    level=INFO ts=2022-02-25T14:27:18.509508671Z app=app-multiple-pipelines-triggers-v21signoff source=printdatatolog.go:19 msg="Data: { \"message\": \"this is a message to trigger default-pipeline\"}"
    

  8. To verify the Pipeline1 as defined in the step 3, we can use mosquitto_sub command to subscribe for the data as published back to edgex-messagebus by SetResponseData function in the pipeline:

    mosquitto_sub -t example
    
    Ideally, mosquitto_sub should expect to receive similar messages as shown below:
    {"ReceivedTopic":"","CorrelationID":"68c6a37d-94cc-480e-b13f-9123656b9cd7","Payload":"PEV2ZW50PjxBcGlWZXJzaW9uPnYyPC9BcGlWZXJzaW9uPjxJZD45NDIxYTc2OS02MjIwLTRjYjUtOWNiYi1mMDQ0MGQ0MTU1YWM8L0lkPjxEZXZpY2VOYW1lPlJhbmRvbS1GbG9hdC1EZXZpY2U8L0RldmljZU5hbWU+PFByb2ZpbGVOYW1lPlJhbmRvbS1GbG9hdC1EZXZpY2U8L1Byb2ZpbGVOYW1lPjxTb3VyY2VOYW1lPkZsb2F0NjQ8L1NvdXJjZU5hbWU+PE9yaWdpbj4xNjQ1NzkzNzg1MDY5NjQyOTcyPC9PcmlnaW4+PFJlYWRpbmdzPjxJZD4yNDdhYzlkNi0zMzAxLTQwMDctOGI1Yi01YTEwNGQ5Yjg4NGE8L0lkPjxPcmlnaW4+MTY0NTc5Mzc4NTA2OTY0Mjk3MjwvT3JpZ2luPjxEZXZpY2VOYW1lPlJhbmRvbS1GbG9hdC1EZXZpY2U8L0RldmljZU5hbWU+PFJlc291cmNlTmFtZT5GbG9hdDY0PC9SZXNvdXJjZU5hbWU+PFByb2ZpbGVOYW1lPlJhbmRvbS1GbG9hdC1EZXZpY2U8L1Byb2ZpbGVOYW1lPjxWYWx1ZVR5cGU+RmxvYXQ2NDwvVmFsdWVUeXBlPjxCaW5hcnlWYWx1ZT48L0JpbmFyeVZhbHVlPjxNZWRpYVR5cGU+PC9NZWRpYVR5cGU+PFZhbHVlPi00LjYyNDg1OWUrMDI8L1ZhbHVlPjwvUmVhZGluZ3M+PC9FdmVudD4=","ContentType":"application/json"}
    

  9. To verify the Pipeline2 as defined in the step 4, we will need to publish a JSON message indicating the target core command to the mqtt-broker via topic sample/command:

    mosquitto_pub -t sample/command -m '{ "DeviceName": "Random-Integer-Device","CommandName":"Int8","RequestMethod":"GET"}'
    
    The payload as defined in the above command will trigger Pipeline2 to issue a GET request on the Int8 command of the Random-Integer-Device device. Pipeline2 will then export the command execution result to mqtt-broker via topic sample/result. To ensure the command execution result does arrive in the mqtt-broker, we can use following command to subscribe for the messages:
    mosquitto_sub -t sample/result
    
    Ideally, we should expect to observe similar message as shown below:
    {"apiVersion":"v2","statusCode":200,"event":{"apiVersion":"v2","id":"725b6b80-7e74-4db1-966b-6b1adc37614e","deviceName":"Random-Integer-Device","profileName":"Random-Integer-Device","sourceName":"Int8","origin":1645803920926367223,"readings":[{"id":"2b84db7e-1173-43f6-a1c2-9ae7c1a468aa","origin":1645803920926367223,"deviceName":"Random-Integer-Device","resourceName":"Int8","profileName":"Random-Integer-Device","valueType":"Int8","value":"123"}]}}
    

More configuration details about ExecuteCoreCommand function

ExecuteCoreCommand pipeline function parses the input JSON message using GJSON library to obtain the following information:

  • Device Name
  • Command Name
  • Request Method
  • Request Body

Note

To understand the GJSON path syntax, please refer to Path Syntax.

Above information will be used by ExecuteCoreCommand to issue a core command to Core Command service. If the core command invocation is successful, ExecuteCoreCommand will pass the response result received from the Core Command service to the next function in the pipeline. If the core command invocation results in any error, ExecuteCoreCommand will return the error to stop the pipeline execution by default.

Configuration Description
DeviceNameJSONPath Mandatory configuration to specify the GJSON path to get Device Name from the incoming JSON message.
CommandNameJSONPath Mandatory configuration to specify the GJSON path to get Command Name from the incoming JSON message.
BodyJSONPath Specify the GJSON path to get Request Body from the incoming JSON message. This configuration must be specified when the Request Method is either SET or PUT
RequestMethodJSONPath Specify the GJSON path to get Request Method from the incoming JSON message. Must be defined if DefaultRequestMethod is not defined.
DefaultRequestMethod Specify the default value for Request Method. Must be defined if RequestMethodJSONPath is not defined. Valid values are GET, and SET.
PushEvent Specify if a successful GET will result in the event being pushed to the EdgeX system. Valid values are yes, or no. Default value is no. Effective only when the identified value of Request Method is GET.
ReturnEvent Specify if a successful GET will return the event or not. Valid values are yes, or no. Default value is yes. Effective only when the identified value of Request Method is GET.
ContinueOnError Specify if ExecuteCoreCommand should terminate the pipeline when error occurs. Valid values are true, or false. Default value is false. This configuration is useful for chained export functions. If ContinueOnError=true, the functions pipeline continues after an error occurs so next export function executes.

For example, if the ExecuteCoreCommand pipeline function receives the following JSON message:

{
  "payload": {
    "DeviceName": "Random-Integer-Device",
    "CommandName": "Int8",
    "RequestMethod": "SET",
    "Body": "{\"Int8\":\"123\"}"
  }
}
The parameters could be defined as follows to obtain necessary information:
[Writable.Pipeline.Functions.ExecuteCoreCommand] # See <ExecuteCoreCommand documentation url TBD> for more details.
  [Writable.Pipeline.Functions.ExecuteCoreCommand.Parameters]
  DeviceNameJSONPath = 'payload.DeviceName'
  CommandNameJSONPath = 'payload.CommandName'
  BodyJSONPath = 'payload.Body'
  RequestMethodJSONPath = 'payload.RequestMethod'
For above sample configuration, in the runtime, the pipeline containing ExecuteCoreCommand will issue a SET request to Int8 command of Random-Intger-Device device, where the payload is in following format:
{
  "Int8" : "123"
}

Back to top