Sinking events from Event Hub to CosmosDB with Dapr
One of the most common scenarios when working with a publish/subscribe system is to be able to take incoming events and store them into an application store that can be used for querying and displaying all your events on a simple frontend.
Normally we would then need to find the corresponding APIs and learn how to configure them, test it, ... a process taking a couple of hours / days to test it well. Luckily for us, Dapr comes to the rescue here! Offering an out-of-the box experience for PubSub systems.
Before we get started though, ensure that you have the following set-up:
Prerequisites
Azure Resources
- Azure EventHub (with an EventHub named: "main", which is our topic)
- Azure Storage Account (used for checkpoints)
- Azure CosmosDB for NoSQL
- A blank typescript project
Dapr Components
Once the above has been created, create 2 files in your directory to connect to the main resources:
components/pubsub-items.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub-items
spec:
type: pubsub.azure.eventhubs
version: v1
metadata:
- name: connectionString
value: "YOUR_CONNECTION_STRING"
- name: storageAccountName
value: "YOUR_ACCOUNT_NAME"
- name: storageAccountKey
value: "YOUR_STORAGE_ACCOUNT_KEY"
- name: storageContainerName
value: "YOUR_CONTAINER_NAME"
components/event-store.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: state-events
spec:
type: state.azure.cosmosdb
version: v1
metadata:
- name: url
value: "YOUR_COSMOSDB_URL"
- name: masterKey
value: "YOUR_COSMOSDB_KEY"
- name: database
value: "YOUR_COSMOSDB_DB"
- name: collection
value: "YOUR_COSMOSDB_COLL_OR_CONTAINER"
Creating our Application
Finally, let's create our application that will sink this all to the event store. As we are using Dapr, we can simplify the entire application to the code below:
import { DaprClient, DaprServer } from "@dapr/dapr";
import Config from "./config";
process.setMaxListeners(0);
const appHost = "127.0.0.1";
const appPort = 10010;
const sidecarHost = "127.0.0.1";
const sidecarPort = 10011;
const daprPubSub = "pubsub-items";
const daprPubSubTopic = "main;
const daprEventStore = "binding-events"
class Server {
daprClient: DaprClient;
daprServer: DaprServer;
constructor() {
this.daprServer = new DaprServer(appHost, appPort);
this.daprClient = new DaprClient(sidecarHost, sidecarPort);
}
async start() {
console.log(
`Subscribing to PubSub '${daprPubSub}' and topic '${daprPubSubTopic}'`,
);
await this.daprServer.pubsub.subscribe(
daprPubSub,
daprPubsubTopic,
async (data: any, headers: object) => this.handleMessage(data, headers),
);
console.log("Starting server");
await this.daprServer.start();
}
async handleMessage(data: any, headers: any) {
console.log(`Received message with data: ${JSON.stringify(data)} and headers: ${JSON.stringify(headers)}`);
await this.daprClient.binding.send(daprEventStore, "create", data);
}
}
async function start() {
const server = new Server();
await server.start();
}
start().catch((e) => console.error(e));
Starting up the application is then as simple as adding the following to your package.json
:
"scripts": {
"dev:watch": "nodemon --ext \".ts,.js\" --watch \"./src\" --exec \"ts-node --swc ./src/index.ts\" --ignore node_modules",
"dev": "dapr run --app-id '$Default' --app-port 10010 --dapr-http-port 10011 --dapr-grpc-port 10012 --app-protocol http --components-path ./components/ -- npm run dev:watch",
},
So that you can run:
pnpm run dev
Testing it out
To test this, we can use the Dapr CLI which offers a simple, but powerful command to test sending events to our pubsub system.
# Publish App ID: The Event Hub Consumer Group
# PubSub: The Dapr Component
# Topic: The Event Hub Name
dapr publish --publish-app-id '$Default' --pubsub "pubsub-items" --topic "main" --data '{"key":"value"}'
Once we perform a sent action, we will now see events coming in on our CosmosDB account!