3 min read

Create an Azure IoT Hub Stream Processor with Dapr

Create an Azure IoT Hub Stream Processor with Dapr
💡Check Out all Dapr Posts

Creating a stream processor is always an interesting task, you want to be able to take incoming message and process them one by one, executing some actions along the way. Typically, we would use Apache Spark or Apache Flink to process these incoming streams, but these tools can be quite complex. What if you just have to be able to take incoming messages, read them and write them to a sink while doing some data manipulation along the way?

Use Case

For this use case, we will be streaming events to Azure IoT Hub and pick up those events and sink them towards a CosmosDB.

Difficulties & Solution

When reading the above, developers instantly think: Damn, we have to integrate IoT Hub (which is annoying in Javascript due to the SDK ecosystem) and sink it towards CosmosDB (yet another API to integrate).

Now, there is a solution for this! By utilizing Dapr and the JS SDK, we can simply configure our 2 connections and utilize the Dapr SDK to connect to Azure IoT Hub and write those events to CosmosDB! If we want to utilize another PubSub or Sink, we can simply redefine our components and it will magically work!

Creating our Solution

Directory Layout

In this case start of by creating a Typescript project and adding a components/ folder, so that your layout will look like this:

components/
src/
package.json
README.md
tsconfig.json

Dapr Components

In the layout above, we configure 2 Dapr Components

PubSub - IoT Hub

components/stream.yaml

# https://docs.dapr.io/reference/components-reference/supported-bindings/eventhubs/
# https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-azure-eventhubs/#subscribing-to-azure-iot-hub-events
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: stream
  namespace: default
spec:
  type: pubsub.azure.eventhubs
  version: v1
  metadata:
  - name: connectionString      # Azure EventHubs connection string
    value: "IOTHUB_EH_CONNECTION_STRING"
  - name: consumerID         # EventHubs consumer group
    value: "$Default"
  - name: storageAccountName    # Azure Storage Account Name
    value: "CHECKPOINT_ACCOUNT_NAME"
  - name: storageAccountKey     # Azure Storage Account Key
    value: "CHECKPOINT_ACCOUNT_KEY"
  - name: storageContainerName  # Azure Storage Container Name
    value: "CHECKPOINT_CONTAINER_NAME"

Binding - CosmosDB

components/sink.yaml

# https://docs.dapr.io/reference/components-reference/supported-bindings/cosmosdb/
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: sink
  namespace: default
spec:
  type: bindings.azure.cosmosdb
  version: v1
  metadata:
  - name: url
    value: COSMOSDB_URL
  - name: masterKey
    value: COSMOSDB_KEY
  - name: database
    value: COSMOSDB_DATABASE
  - name: collection
    value: COSMOSDB_COLLECTION
  - name: partitionKey
    value: COSMOSDB_PARTITIONKEY

Coding

After creating the components are configured we can create our source code that will connect to the IoT Hub stream through the JS SDK await server.pubsub.receive() and sink it to CosmosDB with await client.binding.send()

import { DaprClient, DaprServer } from "@dapr/dapr";
import Config from "./config";

const client = new DaprClient(Config.dapr.sidecar.host, Config.dapr.sidecar.portHttp);

async function main() {
  console.log(`Initializing Server (App Port: ${Config.dapr.app.port}, Sidecar Port: ${Config.dapr.sidecar.portHttp}, Sidecar Protocol: HTTP)`);
  const server = new DaprServer(Config.dapr.app.host, Config.dapr.app.port, Config.dapr.sidecar.host, Config.dapr.sidecar.portHttp);

  // Initialize the server to subscribe (listen)
  await server.pubsub.subscribe(Config?.dapr?.components?.pubsub?.eventhub?.name, Config?.dapr?.components?.pubsub?.eventhub?.topic, async (data: any) => {
    await writeToSink(data);
  });

  await server.start();
}

async function writeToSink(message: any) {
  // Add PartitionID
  const partitionId = message.body.my_partition_key; // @todo: set your partition key
  message.body.partitionID = partitionId;

  // Sink the body to CosmosDB
  await client.binding.send(Config?.dapr?.components?.bindings?.cosmos?.name, "create", message.body);
}


main().catch(e => console.error(e));

Testing

Finally, we can test the code above by adding some scripts to the package.json:

"start": "npm run build && node dist/index.js",
"start:dapr": "dapr run --app-id stream-processor --app-port 10000 --dapr-http-port 10001 --dapr-grpc-port 10002 --app-protocol http --components-path ./components/ npm run start",

This will start up dapr, and you will be able to see incoming messages!