3 min read

Create an Azure IoT Hub Stream Processor with Dapr

Create an Azure IoT Hub Stream Processor with Dapr
💡Check Out all Dapr Posts

Creating a stream processor is always an interesting task, you want to be able to take incoming message and process them one by one, executing some actions along the way. Typically, we would use Apache Spark or Apache Flink to process these incoming streams, but these tools can be quite complex. What if you just have to be able to take incoming messages, read them and write them to a sink while doing some data manipulation along the way?

Use Case

For this use case, we will be streaming events to Azure IoT Hub and pick up those events and sink them towards a CosmosDB.

Difficulties & Solution

When reading the above, developers instantly think: Damn, we have to integrate IoT Hub (which is annoying in Javascript due to the SDK ecosystem) and sink it towards CosmosDB (yet another API to integrate).

Now, there is a solution for this! By utilizing Dapr and the JS SDK, we can simply configure our 2 connections and utilize the Dapr SDK to connect to Azure IoT Hub and write those events to CosmosDB! If we want to utilize another PubSub or Sink, we can simply redefine our components and it will magically work!

Creating our Solution

Directory Layout

In this case start of by creating a Typescript project and adding a components/ folder, so that your layout will look like this:


Dapr Components

In the layout above, we configure 2 Dapr Components

PubSub - IoT Hub


# https://docs.dapr.io/reference/components-reference/supported-bindings/eventhubs/
# https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-azure-eventhubs/#subscribing-to-azure-iot-hub-events
apiVersion: dapr.io/v1alpha1
kind: Component
  name: stream
  namespace: default
  type: pubsub.azure.eventhubs
  version: v1
  - name: connectionString      # Azure EventHubs connection string
  - name: consumerID         # EventHubs consumer group
    value: "$Default"
  - name: storageAccountName    # Azure Storage Account Name
  - name: storageAccountKey     # Azure Storage Account Key
  - name: storageContainerName  # Azure Storage Container Name

Binding - CosmosDB


# https://docs.dapr.io/reference/components-reference/supported-bindings/cosmosdb/
apiVersion: dapr.io/v1alpha1
kind: Component
  name: sink
  namespace: default
  type: bindings.azure.cosmosdb
  version: v1
  - name: url
    value: COSMOSDB_URL
  - name: masterKey
    value: COSMOSDB_KEY
  - name: database
  - name: collection
  - name: partitionKey


After creating the components are configured we can create our source code that will connect to the IoT Hub stream through the JS SDK await server.pubsub.receive() and sink it to CosmosDB with await client.binding.send()

import { DaprClient, DaprServer } from "@dapr/dapr";
import Config from "./config";

const client = new DaprClient(Config.dapr.sidecar.host, Config.dapr.sidecar.portHttp);

async function main() {
  console.log(`Initializing Server (App Port: ${Config.dapr.app.port}, Sidecar Port: ${Config.dapr.sidecar.portHttp}, Sidecar Protocol: HTTP)`);
  const server = new DaprServer(Config.dapr.app.host, Config.dapr.app.port, Config.dapr.sidecar.host, Config.dapr.sidecar.portHttp);

  // Initialize the server to subscribe (listen)
  await server.pubsub.subscribe(Config?.dapr?.components?.pubsub?.eventhub?.name, Config?.dapr?.components?.pubsub?.eventhub?.topic, async (data: any) => {
    await writeToSink(data);

  await server.start();

async function writeToSink(message: any) {
  // Add PartitionID
  const partitionId = message.body.my_partition_key; // @todo: set your partition key
  message.body.partitionID = partitionId;

  // Sink the body to CosmosDB
  await client.binding.send(Config?.dapr?.components?.bindings?.cosmos?.name, "create", message.body);

main().catch(e => console.error(e));


Finally, we can test the code above by adding some scripts to the package.json:

"start": "npm run build && node dist/index.js",
"start:dapr": "dapr run --app-id stream-processor --app-port 10000 --dapr-http-port 10001 --dapr-grpc-port 10002 --app-protocol http --components-path ./components/ npm run start",

This will start up dapr, and you will be able to see incoming messages!