5 min read

Sinking Events from MQTT to Timescale with Dapr and Rust

Learn how to forward events from MQTT to TimescaleDB (Postgres) at high-speed and low latencies with Dapr and Rust
Sinking Events from MQTT to Timescale with Dapr and Rust

Sinking events from a fast-forwarding MQTT broker to Timescale will need to ensure that we create a processor that can handle this load. For this one, I decided to utilize Dapr with the gRPC protocol and Rust.

Important to note is that Rust support in Dapr is still in "alpha" support, but it does help us create the gRPC connection.

Identifying Components

The first thing we do is to identify and configure our Dapr components. I picked to use MQTT as a PubSub component and Postgres as a binding:

components/pubsub-mqtt.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub-mqtt
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
  - name: url
    value: "tcp://USER:PASSWORD@localhost:1883"
  - name: qos
    value: 1
  - name: retain
    value: "false"
  - name: cleanSession
    value: "false"
  - name: consumerID
    value: "consumer-sink"

components/binding-postgres.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: binding-postgres
spec:
  type: bindings.postgresql
  version: v1
  metadata:
  - name: url # Required
    value: postgresql://USER:PASSWORD@localhost:5432/postgres?sslmode=verify-ca

Creating a Rust Application

Next, we start by creating our rust application. Luckily, rust makes it super simple and we can just execute:

cargo new <project>

To the created toml file, we then add our dependencies:

[package]
name = "event_sink"
version = "0.1.0"
authors = ["Xavier Geerinck"]
edition = "2021"
description = "Demo for event sinking"
readme = "README.md"

[dependencies]
tonic = "0.8"
prost = "0.11"
bytes = "1"
prost-types = "0.11"
dapr = "0.12.0"
tokio = { version = "1", features = ["full"] }
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }

[build-dependencies]
tonic-build = "0.8"

[[example]]
name = "main"
path = "src/main.rs"

Adding the Code for consuming events

Dapr has a small tutorial on how we can add code to consume events easily:

use serde_json::{json, Value};
use tokio::sync::{Mutex, MutexGuard};
use tonic::{transport::Channel as TonicChannel, transport::Server, Request, Response, Status};

use dapr::{
    appcallback::*,
    dapr::dapr::proto::runtime::v1::{
        app_callback_server::{AppCallback, AppCallbackServer},
    },
};

pub struct AppCallbackService {}

#[tonic::async_trait]
impl AppCallback for AppCallbackService {
    /// Invokes service method with InvokeRequest.
    async fn on_invoke(
        &self,
        _request: Request<InvokeRequest>,
    ) -> Result<Response<InvokeResponse>, Status> {
        Ok(Response::new(InvokeResponse::default()))
    }

    /// Lists all topics subscribed by this app.
    ///
    /// NOTE: Dapr runtime will call this method to get
    /// the list of topics the app wants to subscribe to.
    /// In this example, the app is subscribing to topic `A`.
    async fn list_topic_subscriptions(
        &self,
        _request: Request<()>,
    ) -> Result<Response<ListTopicSubscriptionsResponse>, Status> {
        let topic = "events".to_string();
        let pubsub_name = "pubsub-mqtt".to_string();

        let list_subscriptions = ListTopicSubscriptionsResponse::topic(pubsub_name, topic);

        Ok(Response::new(list_subscriptions))
    }

    /// Subscribes events from Pubsub.
    async fn on_topic_event(
        &self,
        request: Request<TopicEventRequest>,
    ) -> Result<Response<TopicEventResponse>, Status> {
        let r = request.into_inner();
        let data = String::from_utf8_lossy(&r.data);
        let data_content_type = &r.data_content_type;

        let obj: Value = serde_json::from_str(&data).unwrap();
        println!("obj: {}", obj);

        Ok(Response::new(TopicEventResponse::default()))
    }

    /// Lists all input bindings subscribed by this app.
    async fn list_input_bindings(
        &self,
        _request: Request<()>,
    ) -> Result<Response<ListInputBindingsResponse>, Status> {
        Ok(Response::new(ListInputBindingsResponse::default()))
    }

    /// Listens events from the input bindings.
    async fn on_binding_event(
        &self,
        _request: Request<BindingEventRequest>,
    ) -> Result<Response<BindingEventResponse>, Status> {
        Ok(Response::new(BindingEventResponse::default()))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr_server = "[::]:50051".parse().unwrap();

    let callback_service = AppCallbackService::new();

    println!("AppCallback server listening on: {}", addr_server);

    // Create a gRPC server with the callback_service.
    Server::builder()
        .add_service(AppCallbackServer::new(callback_service))
        .serve(addr_server)
        .await?;

    Ok(())
}

Ok, that was the easy part, now on to the more challenging part

Forwarding Events

To forward events we need to use the DaprClient as it is responsible for letting us interact with the Sidecar which takes care of the actual sinking for us.

Normally we would just create a DaprClient class, pass it to our constructor and set it so we can use it through the code (rather than creating a new DaprClient each time a method is invoked). However, Dapr does not expose mutable self references on the AppCallback implementation, causing us to have to solve this.

The Dapr App Callback class does not expose mutable self references.

Beside this, another issue arises while using it. Whenever we use the DaprClient and want to use the invoke_binding method, it appears that the even though the proto files allow us to pass the operation and metadata arguments, the SDK does not. So this is another issue we need to solve!

The invoke_binding method does not expose the operation and metadata proto fields

Let's first resolve these issues before moving on to the final code.

Issue: The Dapr App Callback class does not expose mutable self-references

Resolving this issue requires us to take a look at Mutexes. As classes can become multi-threaded, we need to request a lock on the mutable references so they cannot issue the same write change at the same time (we request a lock first).

We then create a DaprClient initialization in our main code:

let client = DaprClient::connect(addr_client).await?;

And adapt our AppCallback to allow a client to be set. On setting this client, we wrap it in a mutex and create a small helper method named get_dapr_client that we can call which will automatically request a lock on the mutex.

pub struct AppCallbackService {
    dapr_client: Mutex<DaprClient<TonicChannel>>,
}

impl AppCallbackService {
    pub fn new(client: DaprClient<TonicChannel>) -> Self {
        let dapr_client = Mutex::new(client);
        Self { dapr_client }
    }

    pub async fn get_dapr_client(&self) -> MutexGuard<'_, DaprClient<TonicChannel>> {
        self.dapr_client.lock().await
    }
}
Optionally we could use a RefCell in Rust, but it is important to note that this is not thread-safe.
The above actually illustrates the way Rust handles memory well and ensure we are working thread-safe!

Issue: Dapr does not expose the operation and metadata fields

For our final issue, we can just create the DaprClient instead of the Client exposed by the Dapr Rust SDK. As the Rust SDK is based on the proto files, calling the methods is actually more object-oriented as we would think.

Simply initializing the client with let client = DaprClient::connect(addr_client).await?; allows us to call a method as such:

let mut req = dapr::client::InvokeBindingRequest::default();
req.operation = "exec".to_string();
req.name = "binding-postgres".to_string();
req.metadata = std::collections::HashMap::<String, String>::new();
req.metadata.insert("sql".to_string(), sql_stmt);

let mut client = self.get_dapr_client().await;
let _res = client.invoke_binding(req).await;

Finalizing our code

Knowing all of this, we can finalize our code and send the actual request as above and parse the response.

match _res {
    Ok(res) => {
        let msg = res.into_inner();
        println!("Duration: {:?}", msg.metadata.get("duration"));
    }
    Err(e) => {
        println!("Error: {:?}", e.message());
    }
}

Executing

For the final part, let's execute all the above!

Starting an MQTT Broker and Postgres Database

# Start an EMQX MQTT Broker
docker run -d --rm --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx

# Start a TimescaleDB 
docker run -d --rm --name timescaledb -p 5432:5432 -e POSTGRES_PASSWORD=password timescale/timescaledb:latest-pg15

Starting our Application

Running our application can be simply done with the dapr cli:

dapr run --app-id mqtt-to-postgres --app-protocol grpc --app-port 50051 cargo run

Testing it with an event

Now we can send an event with the Dapr CLI:

dapr publish --publish-app-id mqtt-to-postgres \
    --pubsub pubsub-mqtt \
    --topic events \
    --data '{"key":"value"}' \
    --metadata '{"ttlInSeconds":"10"}'

Summary

When we tried all above the above successful, we should see something as the below:

== APP == obj: {"key":"value"}
== APP == Received event: {} of type application/json
== APP == Duration: Some("38.524ms")
== APP == obj: {"key":"value"}
== APP == Received event: {} of type application/json
== APP == Duration: Some("2.553ms")

As a final optimization, we should bulk insert events every X events to eliminate the single-inserts. Single-inserts are known to be slow, but that should be fairly easy