Sinking Events from MQTT to Timescale with Dapr and Rust
Sinking events from a fast-forwarding MQTT broker to Timescale will need to ensure that we create a processor that can handle this load. For this one, I decided to utilize Dapr with the gRPC protocol and Rust.
Important to note is that Rust support in Dapr is still in "alpha" support, but it does help us create the gRPC connection.
Identifying Components
The first thing we do is to identify and configure our Dapr components. I picked to use MQTT as a PubSub component and Postgres as a binding:
components/pubsub-mqtt.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub-mqtt
spec:
type: pubsub.mqtt
version: v1
metadata:
- name: url
value: "tcp://USER:PASSWORD@localhost:1883"
- name: qos
value: 1
- name: retain
value: "false"
- name: cleanSession
value: "false"
- name: consumerID
value: "consumer-sink"
components/binding-postgres.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: binding-postgres
spec:
type: bindings.postgresql
version: v1
metadata:
- name: url # Required
value: postgresql://USER:PASSWORD@localhost:5432/postgres?sslmode=verify-ca
Creating a Rust Application
Next, we start by creating our rust application. Luckily, rust makes it super simple and we can just execute:
cargo new <project>
To the created toml file, we then add our dependencies:
[package]
name = "event_sink"
version = "0.1.0"
authors = ["Xavier Geerinck"]
edition = "2021"
description = "Demo for event sinking"
readme = "README.md"
[dependencies]
tonic = "0.8"
prost = "0.11"
bytes = "1"
prost-types = "0.11"
dapr = "0.12.0"
tokio = { version = "1", features = ["full"] }
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
[build-dependencies]
tonic-build = "0.8"
[[example]]
name = "main"
path = "src/main.rs"
Adding the Code for consuming events
Dapr has a small tutorial on how we can add code to consume events easily:
use serde_json::{json, Value};
use tokio::sync::{Mutex, MutexGuard};
use tonic::{transport::Channel as TonicChannel, transport::Server, Request, Response, Status};
use dapr::{
appcallback::*,
dapr::dapr::proto::runtime::v1::{
app_callback_server::{AppCallback, AppCallbackServer},
},
};
pub struct AppCallbackService {}
#[tonic::async_trait]
impl AppCallback for AppCallbackService {
/// Invokes service method with InvokeRequest.
async fn on_invoke(
&self,
_request: Request<InvokeRequest>,
) -> Result<Response<InvokeResponse>, Status> {
Ok(Response::new(InvokeResponse::default()))
}
/// Lists all topics subscribed by this app.
///
/// NOTE: Dapr runtime will call this method to get
/// the list of topics the app wants to subscribe to.
/// In this example, the app is subscribing to topic `A`.
async fn list_topic_subscriptions(
&self,
_request: Request<()>,
) -> Result<Response<ListTopicSubscriptionsResponse>, Status> {
let topic = "events".to_string();
let pubsub_name = "pubsub-mqtt".to_string();
let list_subscriptions = ListTopicSubscriptionsResponse::topic(pubsub_name, topic);
Ok(Response::new(list_subscriptions))
}
/// Subscribes events from Pubsub.
async fn on_topic_event(
&self,
request: Request<TopicEventRequest>,
) -> Result<Response<TopicEventResponse>, Status> {
let r = request.into_inner();
let data = String::from_utf8_lossy(&r.data);
let data_content_type = &r.data_content_type;
let obj: Value = serde_json::from_str(&data).unwrap();
println!("obj: {}", obj);
Ok(Response::new(TopicEventResponse::default()))
}
/// Lists all input bindings subscribed by this app.
async fn list_input_bindings(
&self,
_request: Request<()>,
) -> Result<Response<ListInputBindingsResponse>, Status> {
Ok(Response::new(ListInputBindingsResponse::default()))
}
/// Listens events from the input bindings.
async fn on_binding_event(
&self,
_request: Request<BindingEventRequest>,
) -> Result<Response<BindingEventResponse>, Status> {
Ok(Response::new(BindingEventResponse::default()))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr_server = "[::]:50051".parse().unwrap();
let callback_service = AppCallbackService::new();
println!("AppCallback server listening on: {}", addr_server);
// Create a gRPC server with the callback_service.
Server::builder()
.add_service(AppCallbackServer::new(callback_service))
.serve(addr_server)
.await?;
Ok(())
}
Ok, that was the easy part, now on to the more challenging part
Forwarding Events
To forward events we need to use the DaprClient as it is responsible for letting us interact with the Sidecar which takes care of the actual sinking for us.
Normally we would just create a DaprClient class, pass it to our constructor and set it so we can use it through the code (rather than creating a new DaprClient each time a method is invoked). However, Dapr does not expose mutable self references on the AppCallback
implementation, causing us to have to solve this.
The Dapr App Callback class does not expose mutable self references.
Beside this, another issue arises while using it. Whenever we use the DaprClient and want to use the invoke_binding
method, it appears that the even though the proto files allow us to pass the operation
and metadata
arguments, the SDK does not. So this is another issue we need to solve!
Theinvoke_binding
method does not expose theoperation
andmetadata
proto fields
Let's first resolve these issues before moving on to the final code.
Issue: The Dapr App Callback class does not expose mutable self-references
Resolving this issue requires us to take a look at Mutexes. As classes can become multi-threaded, we need to request a lock on the mutable references so they cannot issue the same write change at the same time (we request a lock first).
We then create a DaprClient initialization in our main code:
let client = DaprClient::connect(addr_client).await?;
And adapt our AppCallback to allow a client to be set. On setting this client, we wrap it in a mutex and create a small helper method named get_dapr_client
that we can call which will automatically request a lock on the mutex.
pub struct AppCallbackService {
dapr_client: Mutex<DaprClient<TonicChannel>>,
}
impl AppCallbackService {
pub fn new(client: DaprClient<TonicChannel>) -> Self {
let dapr_client = Mutex::new(client);
Self { dapr_client }
}
pub async fn get_dapr_client(&self) -> MutexGuard<'_, DaprClient<TonicChannel>> {
self.dapr_client.lock().await
}
}
Optionally we could use a RefCell in Rust, but it is important to note that this is not thread-safe.
The above actually illustrates the way Rust handles memory well and ensure we are working thread-safe!
Issue: Dapr does not expose the operation
and metadata
fields
For our final issue, we can just create the DaprClient
instead of the Client
exposed by the Dapr Rust SDK. As the Rust SDK is based on the proto files, calling the methods is actually more object-oriented as we would think.
Simply initializing the client with let client = DaprClient::connect(addr_client).await?;
allows us to call a method as such:
let mut req = dapr::client::InvokeBindingRequest::default();
req.operation = "exec".to_string();
req.name = "binding-postgres".to_string();
req.metadata = std::collections::HashMap::<String, String>::new();
req.metadata.insert("sql".to_string(), sql_stmt);
let mut client = self.get_dapr_client().await;
let _res = client.invoke_binding(req).await;
Finalizing our code
Knowing all of this, we can finalize our code and send the actual request as above and parse the response.
match _res {
Ok(res) => {
let msg = res.into_inner();
println!("Duration: {:?}", msg.metadata.get("duration"));
}
Err(e) => {
println!("Error: {:?}", e.message());
}
}
Executing
For the final part, let's execute all the above!
Starting an MQTT Broker and Postgres Database
# Start an EMQX MQTT Broker
docker run -d --rm --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
# Start a TimescaleDB
docker run -d --rm --name timescaledb -p 5432:5432 -e POSTGRES_PASSWORD=password timescale/timescaledb:latest-pg15
Starting our Application
Running our application can be simply done with the dapr cli
:
dapr run --app-id mqtt-to-postgres --app-protocol grpc --app-port 50051 cargo run
Testing it with an event
Now we can send an event with the Dapr CLI:
dapr publish --publish-app-id mqtt-to-postgres \
--pubsub pubsub-mqtt \
--topic events \
--data '{"key":"value"}' \
--metadata '{"ttlInSeconds":"10"}'
Summary
When we tried all above the above successful, we should see something as the below:
== APP == obj: {"key":"value"}
== APP == Received event: {} of type application/json
== APP == Duration: Some("38.524ms")
== APP == obj: {"key":"value"}
== APP == Received event: {} of type application/json
== APP == Duration: Some("2.553ms")
As a final optimization, we should bulk insert events every X events to eliminate the single-inserts. Single-inserts are known to be slow, but that should be fairly easy
Member discussion