4 min read

Enabling Serverless AI with Azure Container Apps and Dapr

Enabling Serverless AI with Azure Container Apps and Dapr

In my previous post I explained how you can get started with Container Apps. The one missing piece of that puzzle is how you can utilize a queue behind to dynamically scale your workload based on items in a queue! This has been one my most difficult blog items so far, taking me through a range of different issues with Container Apps and Dapr, but which definitely aided those services and myself in becoming more mature.

The following items have been created because of this:

This entire process took me around 2 months to complete, but once figured out it's super simple!

Source Code

The entire source code for this project can be found on my Public GitHub repository

Using a Queue - Made easy through Dapr

We will be using a queue in our source code. Luckily for us, this can be done easily by utilizing Dapr which takes away a lot of the implementation for the different queueing systems and their retry logic. For this project, I decided to utilize the Service Bus Queue.

To set this up, we create a component spec such as the below:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: my-queue
  namespace: default
spec:
  type: bindings.azure.servicebusqueues
  version: v1
  metadata:
    - name: connectionString # Required
      value: "YOUR_CONNECTION_STRING"
    - name: queueName
      value: worker-items
    - name: ttlInSeconds
      value: 60
Note: The component spec for Container Apps is defined differently, but more about that soon.

Once this is created, we can hook up our application to the queue by creating an Input Binding that will monitor the Service Bus queue and call the defined POST endpoint. When looking to the component spec above, we defined the name my-queue which will result in a POST call to /my-queue.

The code below shows us how we can process items from this binding.

from pydantic import BaseModel
from fastapi import FastAPI
from dapr.ext.fastapi import DaprApp

# Define Request DTO
class WorkerItemRequestDto(BaseModel):
    id: str
    image_url: str

# Add our Routes
@app.options("/my-queue")
async def processor_worker_item_options():
    return

@app.post("/my-queue")
async def processor_worker_item(item: WorkerItemRequestDto):
    return

Implementing YoloX

Now we know how to get items in, we just have to initialize our model and run the item through it. As we did before, we initialize the model, download our image and print the result:

# First load the model as we will always need this
print("Loading Model", flush=True)
model = torch.hub.load("ultralytics/yolov5", "yolov5s", pretrained=True)
model.eval()

# ... Other code

@app.post("/my-queue")
async def processor_worker_item(item: WorkerItemRequestDto):
    return handler_image_url(item.image_url)

# Handling the Image URL
def handler_image_url(image_url):
    print(f'Received: image_url="{image_url}"', flush=True)

    if not image_url:
        return

    # Download the image
    response = requests.get(image_url)
    img = Image.open(io.BytesIO(response.content))

    # Infer result
    results = model(img, size=640)
    bboxes = results.pandas().xyxy[0].to_json(orient="records")
    bboxes = json.loads(bboxes)

    # Print the Result
    items = {}
    for bbox in bboxes:
        items[bbox["name"]] = 1 if bbox["name"] not in items else items[bbox["name"]] + 1

    print(f'Results:', flush=True)
    for key, value in items.items():
        print(f'- {key}: {value}', flush=True)

    return

Running Locally

The above we can run locally by utilizing the dapr run command:

# Send 5 Events to Service Bus Queue
dapr run --app-id=send-events --app-protocol http --components-path=./example/components python example/send.py 5

# Start Server to Process the Events
cd app/code/src
dapr run --app-id=yolox --app-protocol http --components-path=../../../example/components -- uvicorn main:app --host 0.0.0.0 --port 5000

Resulting in our application receiving events and processing them! 🤩

Running in Cloud with Container Apps

Now the power of all of this is that we can enable Serverless AI! To achieve this, we will utilize Azure Container Apps that run our container whenever there is an item in our queue. When there are no items in the queue, we use a KEDA scaler to scale down to 0 instances!

The below illustrates how we utilize secrets that configure the Dapr Component as well as the KEDA scaler:

const containerApp = new web.ContainerApp("app", {
  resourceGroupName: resourceGroup.name,
  location: glbLocation,
  kubeEnvironmentId: kubeEnv.id,
  configuration: {
    // ... see source code
    activeRevisionsMode: ActiveRevisionsMode.Single,
    secrets: [
      {
        name: "acr-pwd",
        value: outAcrAdminPassword,
      },
      {
        name: "sb-connection-string",
        value: serviceBusNamespaceKeys.primaryConnectionString,
      },
    ],
  },
  template: {
    revisionSuffix: `${new Date().getTime()}`,
    containers: [
      {
        name: `container-${GLB_APP_ID}`,
        image: image.name,
        env: [
          // ... see source code
        ],
      },
    ],
    // https://keda.sh/docs/2.5/scalers/azure-service-bus/
    scale: {
      minReplicas: 0,
      maxReplicas: 3,
      rules: [
        {
          name: "rule-servicebus-scaler",
          custom: {
            type: "azure-servicebus",
            metadata: {
              queueName: "worker-items",
              messageCount: "2",
            },
            auth: [
              {
                secretRef: "sb-connection-string",
                triggerParameter: "connection",
              },
            ],
          },
        },
      ],
    },
    dapr: {
      enabled: true,
      appId: GLB_APP_ID,
      appPort: GLB_APP_PORT,
      // appProtocol: 'http', // Disabled for now since unsupported by ARM CLI
      components: [
        {
          name: "my-queue",
          type: "bindings.azure.servicebusqueues",
          version: "v1",
          metadata: [
            {
              name: "connectionString",
              secretRef: "sb-connection-string",
            },
            {
              name: "queueName",
              value: "worker-items",
            },
            {
              name: "ttlInSeconds",
              value: "60",
            },
          ],
        },
      ],
    },
  },
});

To set up this infrastructure, we can simply run the following command:

pulumi up \
    --stack demo-cae-yolox \
    -c glb-location=northeurope \
    -c glb-project-name=demo-cae-yolox \
    -c glb-project-env=prd \
    -c sb-topics="worker-items"
For setting up the entire infrastructure, I refer to the Pulumi file created in the source code.

We can now view the logs of our application in Log Analytics with the query:

ContainerAppConsoleLogs_CL
    | project TimeCreated = unixtime_milliseconds_todatetime(_timestamp_d * 1000), ContainerAppName_s, Log_s
    | order by TimeCreated desc
    | take 150

Conclusion

This article might not sound as something worth spending 2 months on! However, when looking at the benefits Container Apps brings in the field of Serverless AI, it's well worth spending this time! We can now finally run models, inference a result and shut it down again without needing to have a compute instance running 24/7 which can end up being expensive when using GPUs.

I hope you enjoyed this article, feel free to put a comment below or contact me!