Sinking events from Event Hub to CosmosDB with Dapr

One of the most common scenarios when working with a publish/subscribe system is to be able to take incoming events and store them into an application store that can be used for querying and displaying all your events on a simple frontend.

Normally we would then need to find the corresponding APIs and learn how to configure them, test it, ... a process taking a couple of hours / days to test it well. Luckily for us, Dapr comes to the rescue here! Offering an out-of-the box experience for PubSub systems.

Before we get started though, ensure that you have the following set-up:

Prerequisites

Azure Resources

  • Azure EventHub (with an EventHub named: "main", which is our topic)
  • Azure Storage Account (used for checkpoints)
  • Azure CosmosDB for NoSQL
  • A blank typescript project

Dapr Components

Once the above has been created, create 2 files in your directory to connect to the main resources:

components/pubsub-items.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub-items
spec:
  type: pubsub.azure.eventhubs
  version: v1
  metadata:
    - name: connectionString
      value: "YOUR_CONNECTION_STRING"
    - name: storageAccountName
      value: "YOUR_ACCOUNT_NAME"
    - name: storageAccountKey
      value: "YOUR_STORAGE_ACCOUNT_KEY"
    - name: storageContainerName
      value: "YOUR_CONTAINER_NAME"

components/event-store.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: state-events
spec:
  type: state.azure.cosmosdb
  version: v1
  metadata:
    - name: url
      value: "YOUR_COSMOSDB_URL"
    - name: masterKey
      value: "YOUR_COSMOSDB_KEY"
    - name: database
      value: "YOUR_COSMOSDB_DB"
    - name: collection
      value: "YOUR_COSMOSDB_COLL_OR_CONTAINER"

Creating our Application

Finally, let's create our application that will sink this all to the event store. As we are using Dapr, we can simplify the entire application to the code below:

import { DaprClient, DaprServer } from "@dapr/dapr";
import Config from "./config";

process.setMaxListeners(0);

const appHost = "127.0.0.1";
const appPort = 10010;
const sidecarHost = "127.0.0.1";
const sidecarPort = 10011;
const daprPubSub = "pubsub-items";
const daprPubSubTopic = "main;
const daprEventStore = "binding-events"

class Server {
  daprClient: DaprClient;
  daprServer: DaprServer;

  constructor() {
    this.daprServer = new DaprServer(appHost, appPort);
    this.daprClient = new DaprClient(sidecarHost, sidecarPort);
  }

  async start() {
    console.log(
      `Subscribing to PubSub '${daprPubSub}' and topic '${daprPubSubTopic}'`,
    );
    await this.daprServer.pubsub.subscribe(
      daprPubSub,
      daprPubsubTopic,
      async (data: any, headers: object) => this.handleMessage(data, headers),
    );

    console.log("Starting server");
    await this.daprServer.start();
  }

  async handleMessage(data: any, headers: any) {
    console.log(`Received message with data: ${JSON.stringify(data)} and headers: ${JSON.stringify(headers)}`);
    await this.daprClient.binding.send(daprEventStore, "create", data);
  }
}

async function start() {
  const server = new Server();
  await server.start();
}

start().catch((e) => console.error(e));

Starting up the application is then as simple as adding the following to your package.json:

"scripts": {
  "dev:watch": "nodemon --ext \".ts,.js\" --watch \"./src\" --exec \"ts-node --swc ./src/index.ts\" --ignore node_modules",
  "dev": "dapr run --app-id '$Default' --app-port 10010 --dapr-http-port 10011 --dapr-grpc-port 10012 --app-protocol http --components-path ./components/ -- npm run dev:watch",
},

So that you can run:

pnpm run dev

Testing it out

To test this, we can use the Dapr CLI which offers a simple, but powerful command to test sending events to our pubsub system.

# Publish App ID: The Event Hub Consumer Group
# PubSub: The Dapr Component
# Topic: The Event Hub Name
dapr publish --publish-app-id '$Default' --pubsub "pubsub-items" --topic "main" --data '{"key":"value"}'

Once we perform a sent action, we will now see events coming in on our CosmosDB account!