5 min read

Creating a Pub/Sub system with 100.000 subscribers and 1 publisher

Creating a Pub/Sub system with 100.000 subscribers and 1 publisher

I decided to do something exciting and challenging for personal growth in my Data & AI area and posed myself the hypothesis: "Can I create a system that with a few publishers but a massive amount of subscribers?" with the constraints of:

  • Should be easy to maintain (as much PaaS as possible)
  • Should be easy to scale (can I autoscale? Container support? …)
  • Should be able to handle real-time data flows (a real pub/sub system!)

Thinking back this was something I actually encountered before in 2014 while working on the Feedient socket implementation, here however I was utilizing SockJS and only had a need for around 20k connected clients.

After doing some research on different Azure components (since it's my comfort zone), I came to the following conclusion:

  • Apache Kafka: We can go high here, but it seems to be a bad design. Next to that, it instantly starts off at 1.8k for a base HA cluster…
  • Redis: Might be possible! I could however not find anything about this (if anyone knows, please comment below :))
  • Azure Event Hubs: Not possible, it will scale well but would be too expensive + there is a limit of around 3200 connections
  • Note: this could be increased through dedicated event hubs or contacting support but this would grow too expensive
  • Azure Event Grid: 500 / topic
  • Note: this could potentially be increased by multiple chained Event Grids, but this would cause too much implementation work on the moment
  • Azure Service Bus: 2.000 subscriptions on the basic or standard tier
  • Note: Again, can be increased but not ideal for the use case of growing towards > 10k connections.

After checking all of the above, I came to the conclusion that nothing in my existing skill portfolio is going to cover this… so I started my search online. After a bit of searching I came on a product calledd emqx.io which sounded reasonable! And actually it was!

5 minutes later, I found and deployed the docker container through the following command:

Note: This is being deployed on a 1 GB RAM, 1 vCPU container instance on Azure
# Create container
az container create \
    -g General \
    --name emqx \
    --dns-name-label xemqx \
    --image emqx/emqx:latest \
    --cpu 1 \
    --memory 1 \
    --ports 1883 8883 8083 8084 8080 18083

With the port mapping being:

|Port|Name|isPrivate?| |-|-|-| |1883|MQTT|No| |8883|MQTT/SSL|No| |8083|MQTT/Websocket|No| |8084|MQTT/Websocket/SSL|No| |8080|Management API|No| |18083|Dashboard|No| |4369|Node Discovery|Yes| |5369|Cluster RPC|Yes| |6369|Cluster Channel|Yes|

After which I could open the dashboard by loading the FQDN:18083:

./dashboard.png

Stress Testing - 10k Connections

Ok, so we now got our broker done. How can we actually start stress testing? Well for this I wrote a small script that we can just execute locally:

client.js

const mqtt = require('mqtt')
const noOfClients = process.env.CLIENT_CONNECTIONS || 250; // How many clients to create?
const serverUrl = process.env.CLIENT_URL;

console.log(`Connecting ${noOfClients} to ${serverUrl}`);

for (let clientNo = 1; clientNo <= noOfClients; clientNo++) {
    // Unique MQTT Object per client
    const client = mqtt.connect(serverUrl)

    client.on('connect', () => {
        client.subscribe('test-message', (err) => {

            console.log(`Client Connected #${clientNo}`);
            if (err) {
                console.log(err);
                // client.publish('presence', 'Hello mqtt')
            }
        })
    })

    client.on('message', (topic, message) => {
        // message is Buffer
        console.log(`Client #${clientNo} Received: ${message.toString()}`)
        client.end(); // In this test we instantly end them
    })
}

console.log(`Connected ${noOfClients} clients`);

package.json

{
  "name": "mqtt-tester",
  "version": "1.0.0",
  "description": "Testing mqtt subscriptions",
  "main": "client.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "async-mqtt": "^2.4.2",
    "mqtt": "^3.0.0"
  }
}

After executing this locally with CLIENT_CONNECTIONS=10000 CLIENT_URL=mqtt://xemqx.northeurope.azurecontainer.io node client.js I got to 10k connections! With a stable running EMQX service.

./10k.png

Next to that, I could also send to my subscribers by utilizing this small script (nothing fancy, I know):

sender.js

const mqtt = require('mqtt')
const client = mqtt.connect('mqtt://xemqx.northeurope.azurecontainer.io')

client.on('connect', () => {
    let i = 1;

    setInterval(() => {
        i++;
        client.publish('test-message', `Hello World: #${i}`);
        console.log(`Sent Message ${i}`);
    }, 1000)
});

However when wanting to go higher, I started noticing delays on my computer not being able to handle the amount of subscriptions open… so back to the drawing board!

Stress Testing - 28k Connections

How can we improve this? Well one simple way is to create a docker container that we can put on a cloud host and simply spin up more instances! So let's do just that and create a Dockerfile + build the image.

Dockerfile

# Note: build with `docker build -t thebillkidy/mqtt-tester:1.0 .`
# Note: push to docker hub with: `docker push thebillkidy/mqtt-test:1.0`
FROM node:latest
WORKDIR /usr/src/app

# Install package.json dependencies
COPY package*.json ./
RUN npm install

# Install app
COPY . .

# Default ENV variables
# Note: specify those while running `docker run -e CLIENT_URL=mqtt://localhost`
ENV CLIENT_CONNECTIONS=1000
ENV CLIENT_URL='mqtt://localhost'

CMD [ "node", "client_massive.js" ]

Finishing with building the container and pushing it so that we can use it.

But how can we now deploy this on cloud? Well in Azure we can use the az cli to create out containers after being pushed. So creating a small line we can multiply this * 50!

for i in {1..50}; do az container create -g POC-EMQX --name subscriber-$RANDOM --image thebillkidy/mqtt-tester:1.0 --cpu 1 --memory 1 --environment-variables CLIENT_CONNECTIONS=10000 CLIENT_URL=mqtt://xemqx.northeurope.azurecontainer.io --no-wait; echo "Deployed Container $i"; done

Allowing a couple of minutes to make sure the containers are being spun up, we can now check the dashboard again. However! it seems that it's crashing at around 28k connections…

./28k.png

Looking at our statistics of the machine, we see that the memory cap is close to being hit - so let's see if we can work on this.

./28k-stats.png

Stress Testing - 100k Connections

Rather than deploying our emqx container with 1GB of RAM and 1 CPU, let's take 4 of both and retry.

# Create container
az container create \
    -g General \
    --name emqx \
    --dns-name-label xemqx \
    --image emqx/emqx:latest \
    --cpu 4 \
    --memory 4 \
    --ports 1883 8883 8083 8084 8080 18083

With a script to create 50 x 2k connections

for i in {1..50}; do az container create -g POC-EMQX --name subscriber-$RANDOM --image thebillkidy/mqtt-tester:1.0 --cpu 1 --memory 1 --environment-variables CLIENT_CONNECTIONS=2000 CLIENT_URL=mqtt://xemqx.northeurope.azurecontainer.io --no-wait; echo "Deployed Container $i"; done

Which after a while of waiting to allow the subscriptions to come up shows us!

./100k.png
./100k-stats.png

So we did it, we scaled it to 100k subscribers!

Further

So the above is by no means a production system, it's merely a POC showing that we can scale to a certain amount of users. For further research I would like to scale this up towards 1M and even 10M subscribers. To do this, I would need to adapt the tooling written to make sure it scales better (there is a container limit and I can only open 2k subscribers per container in a stable way on this size). Next to that, does it not show how to run this in production and maintain it for a longer amount of time.