Enabling Serverless AI with Azure Container Apps and Dapr
In my previous post I explained how you can get started with Container Apps. The one missing piece of that puzzle is how you can utilize a queue behind to dynamically scale your workload based on items in a queue! This has been one my most difficult blog items so far, taking me through a range of different issues with Container Apps and Dapr, but which definitely aided those services and myself in becoming more mature.
The following items have been created because of this:
- Dapr: Python SDK - Input Bindings for HTTP
- Log Analytics: Difference between TimeGenerated and _timestamp_d #79
- Support for setting Dapr App Protocol to gRPC
- Container Apps seems to be missing appProtocol
- Azure Container Apps not Updating Docker Container
- Metadata (and queryString?) should use an Array Type instead of Tuple Type
- Azure Container App Demonstration
This entire process took me around 2 months to complete, but once figured out it's super simple!
Source Code
The entire source code for this project can be found on my Public GitHub repository
Using a Queue - Made easy through Dapr
We will be using a queue in our source code. Luckily for us, this can be done easily by utilizing Dapr which takes away a lot of the implementation for the different queueing systems and their retry logic. For this project, I decided to utilize the Service Bus Queue.
To set this up, we create a component spec such as the below:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: my-queue
namespace: default
spec:
type: bindings.azure.servicebusqueues
version: v1
metadata:
- name: connectionString # Required
value: "YOUR_CONNECTION_STRING"
- name: queueName
value: worker-items
- name: ttlInSeconds
value: 60
Note: The component spec for Container Apps is defined differently, but more about that soon.
Once this is created, we can hook up our application to the queue by creating an Input Binding that will monitor the Service Bus queue and call the defined POST endpoint. When looking to the component spec above, we defined the name my-queue
which will result in a POST call to /my-queue
.
The code below shows us how we can process items from this binding.
from pydantic import BaseModel
from fastapi import FastAPI
from dapr.ext.fastapi import DaprApp
# Define Request DTO
class WorkerItemRequestDto(BaseModel):
id: str
image_url: str
# Add our Routes
@app.options("/my-queue")
async def processor_worker_item_options():
return
@app.post("/my-queue")
async def processor_worker_item(item: WorkerItemRequestDto):
return
Implementing YoloX
Now we know how to get items in, we just have to initialize our model and run the item through it. As we did before, we initialize the model, download our image and print the result:
# First load the model as we will always need this
print("Loading Model", flush=True)
model = torch.hub.load("ultralytics/yolov5", "yolov5s", pretrained=True)
model.eval()
# ... Other code
@app.post("/my-queue")
async def processor_worker_item(item: WorkerItemRequestDto):
return handler_image_url(item.image_url)
# Handling the Image URL
def handler_image_url(image_url):
print(f'Received: image_url="{image_url}"', flush=True)
if not image_url:
return
# Download the image
response = requests.get(image_url)
img = Image.open(io.BytesIO(response.content))
# Infer result
results = model(img, size=640)
bboxes = results.pandas().xyxy[0].to_json(orient="records")
bboxes = json.loads(bboxes)
# Print the Result
items = {}
for bbox in bboxes:
items[bbox["name"]] = 1 if bbox["name"] not in items else items[bbox["name"]] + 1
print(f'Results:', flush=True)
for key, value in items.items():
print(f'- {key}: {value}', flush=True)
return
Running Locally
The above we can run locally by utilizing the dapr run
command:
# Send 5 Events to Service Bus Queue
dapr run --app-id=send-events --app-protocol http --components-path=./example/components python example/send.py 5
# Start Server to Process the Events
cd app/code/src
dapr run --app-id=yolox --app-protocol http --components-path=../../../example/components -- uvicorn main:app --host 0.0.0.0 --port 5000
Resulting in our application receiving events and processing them! 🤩
Running in Cloud with Container Apps
Now the power of all of this is that we can enable Serverless AI! To achieve this, we will utilize Azure Container Apps that run our container whenever there is an item in our queue. When there are no items in the queue, we use a KEDA scaler to scale down to 0 instances!
The below illustrates how we utilize secrets that configure the Dapr Component as well as the KEDA scaler:
const containerApp = new web.ContainerApp("app", {
resourceGroupName: resourceGroup.name,
location: glbLocation,
kubeEnvironmentId: kubeEnv.id,
configuration: {
// ... see source code
activeRevisionsMode: ActiveRevisionsMode.Single,
secrets: [
{
name: "acr-pwd",
value: outAcrAdminPassword,
},
{
name: "sb-connection-string",
value: serviceBusNamespaceKeys.primaryConnectionString,
},
],
},
template: {
revisionSuffix: `${new Date().getTime()}`,
containers: [
{
name: `container-${GLB_APP_ID}`,
image: image.name,
env: [
// ... see source code
],
},
],
// https://keda.sh/docs/2.5/scalers/azure-service-bus/
scale: {
minReplicas: 0,
maxReplicas: 3,
rules: [
{
name: "rule-servicebus-scaler",
custom: {
type: "azure-servicebus",
metadata: {
queueName: "worker-items",
messageCount: "2",
},
auth: [
{
secretRef: "sb-connection-string",
triggerParameter: "connection",
},
],
},
},
],
},
dapr: {
enabled: true,
appId: GLB_APP_ID,
appPort: GLB_APP_PORT,
// appProtocol: 'http', // Disabled for now since unsupported by ARM CLI
components: [
{
name: "my-queue",
type: "bindings.azure.servicebusqueues",
version: "v1",
metadata: [
{
name: "connectionString",
secretRef: "sb-connection-string",
},
{
name: "queueName",
value: "worker-items",
},
{
name: "ttlInSeconds",
value: "60",
},
],
},
],
},
},
});
To set up this infrastructure, we can simply run the following command:
pulumi up \
--stack demo-cae-yolox \
-c glb-location=northeurope \
-c glb-project-name=demo-cae-yolox \
-c glb-project-env=prd \
-c sb-topics="worker-items"
For setting up the entire infrastructure, I refer to the Pulumi file created in the source code.
We can now view the logs of our application in Log Analytics with the query:
ContainerAppConsoleLogs_CL
| project TimeCreated = unixtime_milliseconds_todatetime(_timestamp_d * 1000), ContainerAppName_s, Log_s
| order by TimeCreated desc
| take 150
Conclusion
This article might not sound as something worth spending 2 months on! However, when looking at the benefits Container Apps brings in the field of Serverless AI, it's well worth spending this time! We can now finally run models, inference a result and shut it down again without needing to have a compute instance running 24/7 which can end up being expensive when using GPUs.
I hope you enjoyed this article, feel free to put a comment below or contact me!