Skip to main content

Queue message handlers

Process messages from queue topics in backend service methods.

Why Use Queue Message Handlers

Your application publishes events to a queue — an order placed, a file uploaded, a payment completed — and needs to act on each one reliably in the backend.

With @onQueueMessage, you decorate a method and deploy:

Backend code
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}

No polling. No routing logic. Squid invokes your handler the moment a message arrives on the topic.

Overview

Queue message handlers are backend methods decorated with @onQueueMessage (@on_queue_message in Python) that run automatically whenever a message is published to a specified topic. They support both the built-in Squid queue and external queue integrations such as Kafka.

When to use queue message handlers

Use CaseRecommendation
Process messages published to a queue topic✅ Queue message handler
React to database changesUse Triggers
Call a function from the clientUse Executables
Run code on a scheduleUse Schedulers
Expose an HTTP endpoint to external servicesUse Webhooks

How it works

  1. You decorate a method with @onQueueMessage() in a class that extends SquidService
  2. You specify the topic name, and optionally an integration ID
  3. Squid discovers and registers the handler at deploy time
  4. When a message arrives on the topic, Squid invokes the method with a QueueMessageRequest object
  5. The handler can be synchronous or return a Promise

Quick Start

Prerequisites

  • A Squid backend project initialized with squid init
  • The @squidcloud/backend package installed from NPM

Step 1: Create a handler

Create a service class that extends SquidService and add a handler method:

Backend code
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';

interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}

export class OrderService extends SquidService {
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
}

Step 2: Export the service

Ensure your service is exported from the service index file:

Backend code
export * from './example-service';

Step 3: Start or deploy the backend

For local development, run the backend locally using the Squid CLI:

squid start

To deploy to the cloud, see deploying your backend.

Step 4: Verify

Publish a message to the topic from the client or another service and check the Squid Console logs to confirm the handler was invoked.

Core Concepts

The decorator

The decorator accepts two parameters:

ParameterTypeRequiredDescription
topicNamestringYesThe name of the queue topic to subscribe to
integrationIdstringNoThe integration ID of the queue. Defaults to the built-in Squid queue integration
Backend code
// Built-in queue — integrationId defaults to 'built_in_queue'
@onQueueMessage('order-events')

// External integration (e.g. Kafka)
@onQueueMessage('order-events', 'kafka')

QueueMessageRequest

The QueueMessageRequest object passed to your handler contains:

PropertyTypeDescription
messageTThe typed message payload
topicNamestringThe name of the topic the message was sent to
integrationIdstringThe integration ID of the queue

In TypeScript, the generic type parameter T gives you end-to-end type safety from the publisher through to the handler. In Python, QueueMessageRequest is a TypedDict so fields are accessed via dictionary-style lookups:

Backend code
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}

@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
// request.message is typed as OrderEvent
const { orderId, status } = request.message;
}

Using external queue integrations

When integrationId is omitted, Squid's built-in queue is used. Providing an explicit integration ID allows the same handler pattern to work with external queue systems such as Kafka:

Backend code
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';

interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}

export class OrderService extends SquidService {
// Built-in queue
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
console.log('Built-in queue event:', request.message);
}

// External Kafka integration
@onQueueMessage<OrderEvent>('order-events', 'kafka')
async handleKafkaOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
console.log(`Kafka event on topic "${request.topicName}":`, request.message);
}
}

Error Handling

If a handler throws an error, the error is logged in the Squid Console. Wrap your logic in a try/catch (or try/except) block to handle failures gracefully:

Backend code
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
try {
await this.processOrder(request.message);
} catch (error) {
console.error(`Failed to process order ${request.message.orderId}:`, error);
}
}

Best Practices

  1. Type your message payload. Use the generic parameter QueueMessageRequest<MyType> in TypeScript, or a TypedDict subclass in Python, for type-safe access to the message body.

  2. Handle errors inside the handler. Wrap your logic in try/catch (or try/except) blocks and log errors so that a single bad message does not silently fail.

  3. Keep handlers focused. A handler should do one thing. If a message needs to trigger multiple workflows, delegate to other methods or services rather than putting all logic in the handler.

  4. Design for idempotency. Messages may occasionally be delivered more than once. Ensure your handler produces the same result when processing a duplicate message.

Code Examples

Writing queue messages to the database

Backend code
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';

interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}

export class OrderService extends SquidService {
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
}

Handling messages from multiple queue integrations

Backend code
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';

interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}

export class OrderService extends SquidService {
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
await this.processOrder(request.message);
}

@onQueueMessage<OrderEvent>('order-events', 'kafka')
async handleKafkaOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
await this.processOrder(request.message);
}

private async processOrder(event: OrderEvent): Promise<void> {
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(event.orderId)
.upsert({ orderId: event.orderId, status: event.status });
}
}

Full-Stack Example

This example shows the complete flow: a client publishes an order event, and the backend handler processes it.

Backend: register the handler

Backend code
import { SquidService, onQueueMessage, QueueMessageRequest, secureTopic } from '@squidcloud/backend';

interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}

export class OrderService extends SquidService {
@secureTopic('order-events', 'produce')
allowOrderEventPublish(): boolean {
return !!this.getUserAuth();
}

@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
console.log(`Received order event: ${orderId}${status}`);
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
}

Client: publish a message

Client code
const orderEvent = { orderId: 'order-123', status: 'placed' };
await squid.queue('order-events').produce([orderEvent]);

When produce is called, Squid delivers the message to the topic and the backend handler runs automatically.

See Also