How to integrate Dapr and Apache Kafka through Bindings (input and output)

For a new project I am working on, I want to utilize Dapr to integrate Kafka such that a REST endpoint is automatically called in my application.

Of course, we want our development environment to be as easy as possible and set-up in just a few minutes. Which is why I chose for the following:

  • Minikube (for my Kubernetes wishes)
  • Kafka through Helm
  • Utilize: dapr run ... when testing my application for fast reboot.

So let's get started!

Dapr LOCAL installation

The entire purpose of our set-up is to run "difficult-to-setup" applications through Kubernetes, while keeping development easy through Dapr locally. Therefor we need to install Dapr locally and not in our Kubernetes cluster.

wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash
dapr init
Note: For a more detailed setup guide, follow this article

Minikube Installation

First we start by installing Minikube through the following commands:

# Check if virtualization is supported (should NOT be empty)
grep -E --color 'vmx|svm' /proc/cpuinfo

# Download Minikube
curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube

# Add it to your path
sudo mkdir -p /usr/local/bin/
sudo install minikube /usr/local/bin/

# Check installation
minikube status

Once that is done, we can start up a Kubernetes cluster

# Create a cluster with 4 cpus, 8GB RAM
minikube start --cpus=4 --memory=8196

Installing Helm

A great package manager for us so we don't have to worry about anything anymore is helm. To install it we can run this quick command:

curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3
chmod 700 get_helm.sh
./get_helm.sh

Installing Kafka with Helm

Now we're ready for the real stuff! Let's get a kafka cluster installed on our Minikube:

Creating Kafka

# Get Minikube IP
export MINIKUBE_IP=$(minikube ip)

# Install Kafka and run it on port 30090 external
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install kafka bitnami/kafka --set externalAccess.enabled=true,externalAccess.service.type=NodePort,externalAccess.service.domain=127.0.0.1,externalAccess.service.nodePorts[0]='30090'

Port Forwarding Kafka to Localhost

Networking wise, Kafka is running in an isolated network that we can't access from localhost. By creating an external access endpoint as shown above (helm install … externalAccess.enabled=true) we can access this remotely. Now we just need to create a port-forward as shown below

# Expose kafka
# Note: we expose 9092 for initial connection and 30090 for consumer to broker discovery
kubectl port-forward svc/kafka-0-external 30090:9094 9092:9094

Configuring Kafka bindings in Dapr

The hard part is done! We have Kafka set-up (which should have taken us around 5 minutes to do so) and now we can start running our applications. So let's configure our Input and Output bindings. But let's first have a small refresher on what those are:

  • Input Bindings: Invokes a REST endpoint on your application
  • Example: if we configure .metadata.name to kafka, then Dapr will call POST /kafka on our application one a message appears in kafka
  • Output Bindings: Invokes an external resource (e.g. you send to a dapr bindings endpoint, and it will send a message to Kafka)
  • Example: curl -X POST http://localhost:3500/v1.0/bindings/kafka -d '{ "data": { "message": "Hi!" }, "operation": "create" }'

With this in mind, create a file: components/binding-kafka.yaml with the following content:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka 
  namespace: default
spec:
  type: bindings.kafka
  metadata:
  - name: consumerGroup
    value: group1
  - name: authRequired
    value: "false"
  - name: brokers
    value: localhost:9092

  # Config Output Binding (invokes external resource)
  # This we can test by sending a CURL request
  # e.g. curl -X POST http://localhost:3500/v1.0/bindings/kafka -d '{ "data": { "message": "Hi!" }, "operation": "create" }'
  # note: start with `dapr run --port 3500` for the port
  - name: publishTopic # for output bindings (invokes external resource)
    value: stocks # comma separated string of topics for an input binding 

  # Config Input Binding (invokes app on endpoint from .metadata.name)
  - name: topics # for input bindings (trigger app on endpoint - see .metadata.name)
    value: stocks # comma separated string of topics for an input binding 

Running

We're now ready to boot-up our application! Start your application as follows (where you application is listening on port 5000):

sudo dapr run --components-path ./components --port 3500 --app-port 5000 --app-id kafka -- dotnet run --urls=http://0.0.0.0:5000

If we now send an event through a cURL request to the output binding:

curl -X POST http://localhost:3500/v1.0/bindings/kafka -d '{ "data": { "message": "Hi!" }, "operation": "create" }'

We can see that events are coming in through the input binding if we log them in the console!