Create an Azure IoT Hub Stream Processor with Dapr
💡Check Out all Dapr Posts
Creating a stream processor is always an interesting task, you want to be able to take incoming message and process them one by one, executing some actions along the way. Typically, we would use Apache Spark or Apache Flink to process these incoming streams, but these tools can be quite complex. What if you just have to be able to take incoming messages, read them and write them to a sink while doing some data manipulation along the way?
Use Case
For this use case, we will be streaming events to Azure IoT Hub and pick up those events and sink them towards a CosmosDB.
Difficulties & Solution
When reading the above, developers instantly think: Damn, we have to integrate IoT Hub (which is annoying in Javascript due to the SDK ecosystem) and sink it towards CosmosDB (yet another API to integrate).
Now, there is a solution for this! By utilizing Dapr and the JS SDK, we can simply configure our 2 connections and utilize the Dapr SDK to connect to Azure IoT Hub and write those events to CosmosDB! If we want to utilize another PubSub or Sink, we can simply redefine our components and it will magically work!
Creating our Solution
Directory Layout
In this case start of by creating a Typescript project and adding a components/
folder, so that your layout will look like this:
components/
src/
package.json
README.md
tsconfig.json
Dapr Components
In the layout above, we configure 2 Dapr Components
PubSub - IoT Hub
components/stream.yaml
# https://docs.dapr.io/reference/components-reference/supported-bindings/eventhubs/
# https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-azure-eventhubs/#subscribing-to-azure-iot-hub-events
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: stream
namespace: default
spec:
type: pubsub.azure.eventhubs
version: v1
metadata:
- name: connectionString # Azure EventHubs connection string
value: "IOTHUB_EH_CONNECTION_STRING"
- name: consumerID # EventHubs consumer group
value: "$Default"
- name: storageAccountName # Azure Storage Account Name
value: "CHECKPOINT_ACCOUNT_NAME"
- name: storageAccountKey # Azure Storage Account Key
value: "CHECKPOINT_ACCOUNT_KEY"
- name: storageContainerName # Azure Storage Container Name
value: "CHECKPOINT_CONTAINER_NAME"
Binding - CosmosDB
components/sink.yaml
# https://docs.dapr.io/reference/components-reference/supported-bindings/cosmosdb/
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: sink
namespace: default
spec:
type: bindings.azure.cosmosdb
version: v1
metadata:
- name: url
value: COSMOSDB_URL
- name: masterKey
value: COSMOSDB_KEY
- name: database
value: COSMOSDB_DATABASE
- name: collection
value: COSMOSDB_COLLECTION
- name: partitionKey
value: COSMOSDB_PARTITIONKEY
Coding
After creating the components are configured we can create our source code that will connect to the IoT Hub stream through the JS SDK await server.pubsub.receive()
and sink it to CosmosDB with await client.binding.send()
import { DaprClient, DaprServer } from "@dapr/dapr";
import Config from "./config";
const client = new DaprClient(Config.dapr.sidecar.host, Config.dapr.sidecar.portHttp);
async function main() {
console.log(`Initializing Server (App Port: ${Config.dapr.app.port}, Sidecar Port: ${Config.dapr.sidecar.portHttp}, Sidecar Protocol: HTTP)`);
const server = new DaprServer(Config.dapr.app.host, Config.dapr.app.port, Config.dapr.sidecar.host, Config.dapr.sidecar.portHttp);
// Initialize the server to subscribe (listen)
await server.pubsub.subscribe(Config?.dapr?.components?.pubsub?.eventhub?.name, Config?.dapr?.components?.pubsub?.eventhub?.topic, async (data: any) => {
await writeToSink(data);
});
await server.start();
}
async function writeToSink(message: any) {
// Add PartitionID
const partitionId = message.body.my_partition_key; // @todo: set your partition key
message.body.partitionID = partitionId;
// Sink the body to CosmosDB
await client.binding.send(Config?.dapr?.components?.bindings?.cosmos?.name, "create", message.body);
}
main().catch(e => console.error(e));
Testing
Finally, we can test the code above by adding some scripts to the package.json
:
"start": "npm run build && node dist/index.js",
"start:dapr": "dapr run --app-id stream-processor --app-port 10000 --dapr-http-port 10001 --dapr-grpc-port 10002 --app-protocol http --components-path ./components/ npm run start",
This will start up dapr, and you will be able to see incoming messages!