Export EdgeX events to Kafka Broker using App Service
Overview
Apache Kafka is a popular event streaming platform for data pipelines, streaming analytics, and data integration. Edge Xpert provides a proprietary KafkaSend
function to send data to Apache Kafka. This article will take you through steps to create an Edge Xpert application service to send data as generated by virtual device service to an Apache Kafka broker.
Prerequisites
- Understand Edge Xpert Application Service
- Launch core services
- If you would like to enable SSL client authentication for the Kafka broker, both
openssl
utility (version 1.1.1) andkeytool
utility are required
Enable two-way SSL authentication between Kafka Broker and Edge Xpert
With a standard Kafka setup, all data are sent in plain-text between the Kafka broker and its producers/consumers. External applications, such as Edge Xpert, can write any messages to any topics without authentication. Kafka allows clients to connect over SSL, so that data transferred between the Kafka broker and its producers/consumers will be encrypted. By default, SSL is disabled but can be enabled as needed. Kafka also supports certificate-based mutual authentication over SSL, so that both Kafka broker and its producer/consumer can authenticate with each other. Apache Kafka documentation details steps to enable two-way SSL authentication for Kafka broker, and this section will walk through these steps to enable SSL and two-way SSL authentication between Kafka broker and Edge Xpert.
Generate certificates, keystore, truststore for Kafka Broker
Kafka uses keystore to store each broker’s own identity, and uses truststore to store all the certificates that the machine should trust. Importing a certificate into broker’s truststore means trusting all certificates that are signed by that certificate.
- Save the following listing into a file called
openssl-ca.cnf
and adjust the values for validity and common attributes as necessary:HOME = . RANDFILE = $ENV::HOME/.rnd #################################################################### [ ca ] default_ca = CA_default # The default ca section [ CA_default ] base_dir = . certificate = $base_dir/cacert.pem # The CA certifcate private_key = $base_dir/cakey.pem # The CA private key new_certs_dir = $base_dir # Location for new certs after signing database = $base_dir/index.txt # Database index file serial = $base_dir/serial.txt # The current serial number default_days = 365 # How long to certify for default_crl_days = 30 # How long before next CRL default_md = sha256 # Use public key default MD preserve = no # Keep passed DN ordering x509_extensions = ca_extensions # The extensions to add to the cert email_in_dn = no # Don't concat the email in the DN copy_extensions = copy # Required to copy SANs from CSR to cert #################################################################### [ req ] default_bits = 4096 default_keyfile = cakey.pem distinguished_name = ca_distinguished_name x509_extensions = ca_extensions string_mask = utf8only #################################################################### [ ca_distinguished_name ] countryName = Country Name (2 letter code) countryName_default = TW stateOrProvinceName = State or Province Name (full name) stateOrProvinceName_default = Test Province localityName = Locality Name (eg, city) localityName_default = Test Town organizationName = Organization Name (eg, company) organizationName_default = Test Company organizationalUnitName = Organizational Unit (eg, division) organizationalUnitName_default = Test Unit commonName = Common Name (e.g. server FQDN or YOUR name) commonName_default = kafka emailAddress = Email Address emailAddress_default = test@test.com #################################################################### [ ca_extensions ] subjectKeyIdentifier = hash authorityKeyIdentifier = keyid:always, issuer basicConstraints = critical, CA:true keyUsage = keyCertSign, cRLSign subjectAltName = DNS:kafka #################################################################### [ signing_policy ] countryName = optional stateOrProvinceName = optional localityName = optional organizationName = optional organizationalUnitName = optional commonName = supplied emailAddress = optional #################################################################### [ signing_req ] subjectKeyIdentifier = hash authorityKeyIdentifier = keyid,issuer basicConstraints = CA:FALSE keyUsage = digitalSignature, keyEncipherment subjectAltName = DNS:kafka
- Create a database and serial number file, these will be used to keep track of which certificates were signed with this CA. Both of these are simply text files that reside in the same directory as your CA keys:
$ echo 01 > serial.txt $ touch index.txt
- With above steps done you are now ready to generate your CA that will be used to sign certificates later:
openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM
-
Create the keystore kafka.server.keystore.jks with alias localhost and generate a key pair for Kafka broker:
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
-
Import the CA certificate cacert.pem as created in step 3 into truststore kafka.server.truststore.jks for Kafka broker:
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file cacert.pem
-
Generate a certificate request kafka_cert-file:
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file kafka_cert-file
-
Based on the signing request kafka_cert-file, generate a signed certificate kafka_cert-signed
openssl x509 -req -CA cacert.pem -CAkey cakey.pem -in kafka_cert-file -out kafka_cert-signed -days 365 -CAcreateserial
-
Import the CA certificate as created in step 3 into keystore kafka.server.keystore.jks of Kafka broker:
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file cacert.pem
-
Import the signed certificate as created in step 8 into keystore kafka.server.keystore.jks of Kafka broker:
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file kafka_cert-signed
Note
You will have to provide password when creating keystore and truststore, make a note of the password as they will be needed later for Kafka broker configuration.
Generate client key/certificate signed with CA certificate from Kafka broker
Now, you had successfully created keystore and truststore with CA certificate/signed certificate for Kafka broker. Next step is to create client key and client certificate signed with CA certificate, so the client, Edge Xpert, can be trusted and authenticated by the Kafka broker.
-
Generate client key kafka-client.key
This documentation assumes you to use openssl 1.1.1 when generating kafka-client.key. If you use openssl 3.x or above, please use command below to generate the kafka-client.key:openssl genrsa -des3 -out kafka-client.key 2048
openssl genrsa -traditional -des3 -out kafka-client.key 2048
-
Generate certificate request
openssl req -key kafka-client.key -new -out kafka-client.req
-
Based on the request as created from previous step, generate the certificate kafka-client.pem signed with CA certificate from Kafka broker
openssl x509 -req -in kafka-client.req -CA cacert.pem -CAkey cakey.pem -CAcreateserial -out kafka-client.pem -days 365
Note
When you configure the application service later, you will have to add following secrets into secret provider:
cacert
: specifies the content of cacert.pem.clientcert
: specifies the content of kafka-client.pem.clientkey
: specifies the content of kafka-client.key.decryptedpassword
: specifies the password to decryptclientkey
secret. This is the pass phrase you provided during the kafka-client.key creation.
Configure Kafka broker with SSL enabled
At this moment, all the mandatory certificates and keys are ready, and you are ready to set up a testing Kafka broker. For testing purpose, this tutorial leverages kafka docker image as provided by confluent to deploy Kafka broker and its dependency Zookeeper. To enable the two-way SSL authentication for the Kafka broker, add following lines into /etc/edgexpert/docker-compose.yml when you prefer to run Edge Xpert in insecure mode or /etc/edgexpert/docker-compose-security.yml when you prefer to run Edge Xpert in secure mode:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
hostname: zookeeper
networks:
- edgex-network
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
hostname: kafka
networks:
- edgex-network
ports:
- '9093:9093'
depends_on:
- zookeeper
volumes:
- /home/user/kafka/secrets_tests:/secrets
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SSL_HOST:SSL
KAFKA_INTER_BROKER_LISTENER_NAME: SSL_HOST
KAFKA_ADVERTISED_LISTENERS: SSL_HOST://kafka:9093
KAFKA_SSL_CLIENT_AUTH: "required"
KAFKA_SSL_KEYSTORE_LOCATION: /secrets/kafka.server.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: test123
KAFKA_SSL_TRUSTSTORE_LOCATION: /secrets/kafka.server.truststore.jks
KAFKA_SSL_TRUSTSTORE_PASSWORD: test123
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Note
For kafka container, you can specify their preferred port number for SSL protocol under ports
section. The sample above specifies 9093
as the port number.
Under volumes
section, you have to add the volume where keystore and truststore were previously generated. The sample above assumes that both keystore and truststore exist in local folder /home/user/kafka/secrets_tests
, and the local folder will be mapped to /secrets
of kafka container.
Under environment
section, you have to specify several environment variables to enable SSL:
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: specifies that a listener
SSL_HOST
usesSSL
protocol. - KAFKA_INTER_BROKER_LISTENER_NAME: specifies that the inter-broker communication uses listener
SSL_HOST
. - KAFKA_ADVERTISED_LISTENERS: specifies that external Kafka clients can communicate with the Kafka broker through
kafka:9093
. - KAFKA_SSL_CLIENT_AUTH: specifies that the SSL client authentication is
required
, and all external kafka clients need proper client certificate to communicate with the Kafka broker. - KAFKA_SSL_KEYSTORE_LOCATION: specifies the keystore location.
- KAFKA_SSL_KEYSTORE_PASSWORD: specifies the password to access keystore.
- KAFKA_SSL_TRUSTSTORE_LOCATION: specifies the truststore location.
- KAFKA_SSL_TRUSTSTORE_PASSWORD: specifies the password to access truststore.
- KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: this tutorial intentionally disables server host name verification by setting this environment variable to an empty string for testing purpose. For more details about these configuration, please refer to confluent documentation.
Configure Kafka Broker with standard setup without SSL enabled
So far, this tutorial describes all the necessary steps when SSL client authentication is enabled on Kafka Broker. For scenarios that Kafka broker does not require security, you can simply deploy a Kafka broker with standard setup without SSL enabled by adding following lines into Edge Xpert compose file:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
hostname: zookeeper
networks:
- edgex-network
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
hostname: kafka
networks:
- edgex-network
ports:
- '9092:9092'
- '29092:29092'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Configure an Application service to produce events to Kafka Broker
-
Before configure a Edge Xpert application service to produce messages to Kafka broker, you shall expect to obtain Kafka broker address, Kafka broker port number, the topic that messages were sent to, and the partition that messages were sent to. If the Kafka broker is enabled with SSL client authentication, you shall also obtain the client certificate, client key, client key decryption password, and CA certificate.
-
Edge Xpert provides a proprietary built-in function
KafkaSend
to produce events to the Kafka broker. A sample configuration is provided with Edge Xpert CLI utility. Once you complete the installation of Edge Xpert CLI utility, the sample configuration is available at/usr/share/edgexpert/examples/app-configurable/kafka-export/
. Copy and save the sample configuration file as a separate file, e.g. kafka-export.toml, and then revise this configuration per Kafka broker address, Kafka broker port number, the topic that messages were sent to, and the partition that messages were sent to. For example, if you have Kafka Client ID: edgex, Kafka broker address: kafka, Kafka broker port: 9093, Kafka topic: test, and Kafka partition: 0, user shall reviseKafkaSend
with following parameters:[Writable.Pipeline.Functions.KafkaSend] [Writable.Pipeline.Functions.KafkaSend.Parameters] ClientID = "edgex" Address = "kafka" Port = "9093" Topic = "test" Partition = "0" PersistOnError = "false"
If Kafka broker is configured with SSL Client authentication, additional parameters shall be added to specify the authentication mode, secret format, and secret path:
[Writable.Pipeline.Functions.KafkaSend] [Writable.Pipeline.Functions.KafkaSend.Parameters] ClientID = "edgex" Address = "kafka" Port = "9093" Topic = "test" Partition = "0" PersistOnError = "false" AuthMode = "clientcert" SkipVerify = "true" SecretPath = "kafka"
Note
You will find two sample configuration files under
/usr/share/edgexpert/examples/app-configurable/kafka-export/
. If the application service will be running in insecure mode, useconfiguration-insecure.toml
as the sample. If the application service will be running in secure mode, useconfiguration-secretstore.toml
as the sample. -
If the application service will be launched under insecure mode, secrets will be retrieved from
Writable.InsecureSecrets
configuration. You will need to updateWritable.InsecureSecrets.kafka
configuration with correct values ofdecryptedpassword
,clientcert
,clientkey
, andcacert
:[Writable.InsecureSecrets.kafka] path = 'kafka' [Writable.InsecureSecrets.kafka.Secrets] decryptedpassword = 'test123' cacert = ''' -----BEGIN CERTIFICATE----- Od/SkytPbja4JHnOp10CUwxVLndmc4k/i4HT9Cwqpopdb1DwkmHpd2GsyYrUXbyW x5KfT/jCkhtopWgbUGqi -----END CERTIFICATE-----''' clientkey = ''' -----BEGIN RSA PRIVATE KEY----- Proc-Type: 4,ENCRYPTED DEK-Info: DES-EDE3-CBC,B0C11751BD827F58 1Sn0Mvrv9JvX/SFfwjt376JbQzfvaS8q24GElZaBtC+/r5JA9h6CcljXDBreqEU6 OdlipMxo0Mof0ze9yc0oqbDc1zQUM1IMe1VLv8v7dmMpJXNY/+yM8g== -----END RSA PRIVATE KEY-----''' clientcert = ''' -----BEGIN CERTIFICATE----- MIIDETCCAfkCFFjUeThPf4gHVpH9jUeH3n9Q6Jk+MA0GCSqGSIb3DQEBCwUAMEUx N1bzRPcBGQZ+wbvS4jLkS4zllOD6 -----END CERTIFICATE-----'''
Note
The
cacert
is a required secret only when usingKafkaSend
withSkipVerify="false"
and the target Kafka broker uses self-signed certificate. -
Once configuration is ready, you can launch testing Kafka broker and application service in secure mode using command below:
If You prefer to launch testing Kafka broker and the application service in insecure mode, use command below:$ edgexpert up --secret zookeeper kafka app-service --path=<path of the confgiuration file>
$ edgexpert up zookeeper kafka app-service --path=<path of the confgiuration file>
-
If the application service is launched under secure mode on previous step, secrets will be retrieved from Vault secret store. You need to add
decryptedpassword
,clientcert
,clientkey
, andcacert
into Vault secret store. Please refer to Edge Xpert Application Services for details about adding secrets into Vault secret store.Note
Since Edge Xpert V2.1, IOTech recommends you to add secrets through the POST secrets REST API of application service. Follow steps below to add secrets under secure mode.
1. Save following scripts into a file calledaddKafkaSecrets.sh
and adjust the values for the file location of your certificates/key and the decrypted password as necessary:
2. Execute the shell script to add secrets into Valut secret store:#!/bin/sh set -x CLIENTCERT=$(awk 'NF {sub(/\r/, ""); printf "%s\\n",$0;}' ./kafka-client.pem) CLIENTKEY=$(awk 'NF {sub(/\r/, ""); printf "%s\\n",$0;}' ./kafka-client.key) CACERT=$(awk 'NF {sub(/\r/, ""); printf "%s\\n",$0;}' ./cacert.pem) DECRYPTPASSWORD='test123' PAYLOAD='{ "apiVersion": "v2", "path":"kafka", "secretData": [ {"key":"decryptedpassword", "value":"'$DECRYPTPASSWORD'" },{"key":"clientcert", "value":"'$CLIENTCERT'" },{"key":"clientkey", "value":"'$CLIENTKEY'" },{"key":"cacert", "value":"'$CACERT'" } ] }' CONTAINER_NAME='secretposter' docker run -it --network="edgexpert_edgex-network" --name "$CONTAINER_NAME" curlimages/curl:latest -k -X POST http://app-configuration-secretstore:59700/api/v2/secret -d "$PAYLOAD" docker rm "$CONTAINER_NAME"
$ ./addKafkaSecrets.sh
-
Now, an application service is up and ready to produce EdgeX events to Kafka broker. To simulate the device sending readings from southbound, start up the virtual device service in secure mode using command below:
If you prefer to launch the virtual device service in insecure mode, use command below:$ edgexpert up --secret device-virtual
$ edgexpert up device-virtual
-
To monitor events being sent to the Kafka broker, you will need a Kafka consumer. Please refer to Kafka documentation for more details.
Info
When two-way SSL authentication is enabled on the Kafka broker, it’s not easy to configure a Kafka consumer to monitor events being sent into the Kafka broker. For testing purpose, user can configure the Kafka broker with both two-way SSL authentication and PLAINTEXT through different ports, so
KafkaSend
function can act as the Kafka producer to send events through SSL authentication and another Kafka consumer can monitor events through PLAINTEXT without SSL authentication. To achieve this, you can configure your kafka container to communicate in SSL on port 9093 and to communicate in PLAINTEXT on port 9092:With above kafka configuration, you can launch a Kafka consumer inside the kafka container to monitor events using command below:kafka: image: confluentinc/cp-kafka:latest container_name: kafka hostname: kafka networks: - edgex-network ports: - '9092:9092' - '29092:29092' - '9093:9093' depends_on: - zookeeper volumes: - /home/user/kafka/secrets_tests:/secrets environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL_HOST:SSL KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092,SSL_HOST://kafka:9093 KAFKA_SSL_CLIENT_AUTH: "required" KAFKA_SSL_KEYSTORE_LOCATION: /secrets/kafka.server.keystore.jks KAFKA_SSL_KEYSTORE_PASSWORD: zaq12wsx KAFKA_SSL_TRUSTSTORE_LOCATION: /secrets/kafka.server.truststore.jks KAFKA_SSL_TRUSTSTORE_PASSWORD: zaq12wsx KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
$ docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic test
Ideally, you should expect to see similar messages as shown below:
{"apiVersion":"v2","id":"2aef0331-28a1-42bc-bf28-cd6801553c2f","deviceName":"Random-Integer-Device","profileName":"Random-Integer-Device","sourceName":"Int16","origin":1646822882250638275,"readings":[{"id":"5ac1117f-48de-4e14-ab77-a388a9438ae5","origin":1646822882250638275,"deviceName":"Random-Integer-Device","resourceName":"Int16","profileName":"Random-Integer-Device","valueType":"Int16","value":"841"}]} {"apiVersion":"v2","id":"456bdaa0-8dbb-4bf0-b773-4bcd2004263d","deviceName":"Random-Integer-Device","profileName":"Random-Integer-Device","sourceName":"Int32","origin":1646822882250284415,"readings":[{"id":"aa201794-a028-4130-8c78-eff02e360f42","origin":1646822882250284415,"deviceName":"Random-Integer-Device","resourceName":"Int32","profileName":"Random-Integer-Device","valueType":"Int32","value":"-3064"}]}
Info
For development purpose, you may want to launch the app service as binary instead of docker container on the local machine. To make the binary app service connect to Kafka container, the kafka entry inside compose files can be revised to
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092,SSL_HOST://localhost:9093
More configuration details about KafkaSend
The following code defines the pipeline function KafkaSend
:
[Writable.Pipeline.Functions.KafkaSend]
[Writable.Pipeline.Functions.KafkaSend.Parameters]
ClientID = "edgex"
Address = "kafka"
Port = "9093"
Topic = "test"
Partition = "0"
PersistOnError = "false"
AuthMode = "clientcert"
SkipVerify = "true"
SecretPath = "kafka"
The following table describes the configuration parameters used to define the pipeline function KafkaSend
to send data to the Kafka broker:
Configuration | Mandatory | Description |
---|---|---|
ClientID | Yes | Specifies a user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. |
Address | Yes | Specifies the Kafka broker address. |
Port | Yes | Specifies the port number of the Kafka broker. |
Topic | Yes | Specifies the Kafka topic that messages were sent to. |
Partition | Specifies the partition that messages were sent to. The default value is 0 . |
|
SecretPath | Specifies the path in the secret provider from which to retrieve the certificate/private key pair of the target Kafka broker. SecretPath only takes effect when AuthMode is clientcert . Note: Secrets under this path are key-value pairs. By the current design of Application Service, you must add their secrets with the following key names:
|
|
AuthMode | Specifies the authentication mode to produce messages to the Kafka broker. At the time of publication, AuthMode only accepts clientcert for TLS client authentication or none for no authentication. The default value is none . |
|
SkipVerify | Specifies whether the Edge Xpert Application Service verifies the server's certificate chain and host name. If set to true, TLS accepts any certificate presented by the server and any host name in that certificated. This means that TLS is susceptible to man-in-the middle attacks. Use this setting for testing purposes only. The default value is false . |
|
PersistOnError | Specifies whether the events are stored for further attempts if any error is encountered during the message publication. Note: To use persistOnError, Store and Forward must be enabled in the Application Service configuration file. For further information on Store and Forward, refer to the Advanced Topics under the Application Functions SDK section of the EdgeX Foundry documentation.
false . |
Note
The application service can be running in either secure or insecure mode. When the app service is running under secure mode, secrets will be retrieved from Vault secret store; when the app service is running under insecure mode, secrets will be retrieved from Writable.InsecureSecrets
configuration.