Queue message handlers
Process messages from queue topics in backend service methods.
Why Use Queue Message Handlers
Your application publishes events to a queue — an order placed, a file uploaded, a payment completed — and needs to act on each one reliably in the backend.
With @onQueueMessage, you decorate a method and deploy:
- TypeScript
- Python
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
event = request['message']
order_id = event['orderId']
status = event['status']
await self.squid.collection('orders').doc(order_id).upsert(
{'orderId': order_id, 'status': status}
)
No polling. No routing logic. Squid invokes your handler the moment a message arrives on the topic.
Overview
Queue message handlers are backend methods decorated with @onQueueMessage (@on_queue_message in Python) that run automatically whenever a message is published to a specified topic. They support both the built-in Squid queue and external queue integrations such as Kafka.
When to use queue message handlers
| Use Case | Recommendation |
|---|---|
| Process messages published to a queue topic | ✅ Queue message handler |
| React to database changes | Use Triggers |
| Call a function from the client | Use Executables |
| Run code on a schedule | Use Schedulers |
| Expose an HTTP endpoint to external services | Use Webhooks |
How it works
- You decorate a method with
@onQueueMessage()in a class that extendsSquidService - You specify the topic name, and optionally an integration ID
- Squid discovers and registers the handler at deploy time
- When a message arrives on the topic, Squid invokes the method with a
QueueMessageRequestobject - The handler can be synchronous or return a
Promise
Quick Start
Prerequisites
- TypeScript
- Python
- A Squid backend project initialized with
squid init - The
@squidcloud/backendpackage installed from NPM
- A Squid backend project initialized with
squid init - The
squidcloud-backendpackage installed from PyPI
Step 1: Create a handler
Create a service class that extends SquidService and add a handler method:
- TypeScript
- Python
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}
export class OrderService extends SquidService {
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
}
from squidcloud_backend import SquidService, on_queue_message
from squidcloud_backend.types import QueueMessageRequest
from typing import TypedDict
class OrderEvent(TypedDict):
orderId: str
status: str # 'placed' | 'shipped' | 'delivered'
class OrderService(SquidService):
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
event: OrderEvent = request['message']
order_id = event['orderId']
status = event['status']
await self.squid.collection('orders').doc(order_id).upsert(
{'orderId': order_id, 'status': status}
)
Step 2: Export the service
- TypeScript
- Python
Ensure your service is exported from the service index file:
export * from './example-service';
Ensure your service is exported from the service index file:
from .example_service import OrderService
Step 3: Start or deploy the backend
For local development, run the backend locally using the Squid CLI:
squid start
To deploy to the cloud, see deploying your backend.
Step 4: Verify
Publish a message to the topic from the client or another service and check the Squid Console logs to confirm the handler was invoked.
Core Concepts
The decorator
The decorator accepts two parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
topicName | string | Yes | The name of the queue topic to subscribe to |
integrationId | string | No | The integration ID of the queue. Defaults to the built-in Squid queue integration |
- TypeScript
- Python
// Built-in queue — integrationId defaults to 'built_in_queue'
@onQueueMessage('order-events')
// External integration (e.g. Kafka)
@onQueueMessage('order-events', 'kafka')
# Built-in queue — integration_id defaults to 'built_in_queue'
@on_queue_message('order-events')
# External integration (e.g. Kafka)
@on_queue_message('order-events', 'kafka')
QueueMessageRequest
The QueueMessageRequest object passed to your handler contains:
| Property | Type | Description |
|---|---|---|
message | T | The typed message payload |
topicName | string | The name of the topic the message was sent to |
integrationId | string | The integration ID of the queue |
In TypeScript, the generic type parameter T gives you end-to-end type safety from the publisher through to the handler. In Python, QueueMessageRequest is a TypedDict so fields are accessed via dictionary-style lookups:
- TypeScript
- Python
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
// request.message is typed as OrderEvent
const { orderId, status } = request.message;
}
class OrderEvent(TypedDict):
orderId: str
status: str
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
# access fields via dict-style lookups
event: OrderEvent = request['message']
order_id = event['orderId']
topic = request['topicName']
Using external queue integrations
When integrationId is omitted, Squid's built-in queue is used. Providing an explicit integration ID allows the same handler pattern to work with external queue systems such as Kafka:
- TypeScript
- Python
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}
export class OrderService extends SquidService {
// Built-in queue
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
console.log('Built-in queue event:', request.message);
}
// External Kafka integration
@onQueueMessage<OrderEvent>('order-events', 'kafka')
async handleKafkaOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
console.log(`Kafka event on topic "${request.topicName}":`, request.message);
}
}
from squidcloud_backend import SquidService, on_queue_message
from squidcloud_backend.types import QueueMessageRequest
class OrderService(SquidService):
# Built-in queue
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
print('Built-in queue event:', request['message'])
# External Kafka integration
@on_queue_message('order-events', 'kafka')
async def handle_kafka_order_event(self, request: QueueMessageRequest) -> None:
print(f"Kafka event on topic '{request['topicName']}':", request['message'])
Error Handling
If a handler throws an error, the error is logged in the Squid Console. Wrap your logic in a try/catch (or try/except) block to handle failures gracefully:
- TypeScript
- Python
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
try {
await this.processOrder(request.message);
} catch (error) {
console.error(`Failed to process order ${request.message.orderId}:`, error);
}
}
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
try:
await self.process_order(request['message'])
except Exception as error:
print(f"Failed to process order {request['message']['orderId']}:", error)
Best Practices
-
Type your message payload. Use the generic parameter
QueueMessageRequest<MyType>in TypeScript, or aTypedDictsubclass in Python, for type-safe access to the message body. -
Handle errors inside the handler. Wrap your logic in try/catch (or try/except) blocks and log errors so that a single bad message does not silently fail.
-
Keep handlers focused. A handler should do one thing. If a message needs to trigger multiple workflows, delegate to other methods or services rather than putting all logic in the handler.
-
Design for idempotency. Messages may occasionally be delivered more than once. Ensure your handler produces the same result when processing a duplicate message.
Code Examples
Writing queue messages to the database
- TypeScript
- Python
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}
export class OrderService extends SquidService {
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
}
from squidcloud_backend import SquidService, on_queue_message
from squidcloud_backend.types import QueueMessageRequest
from typing import TypedDict
class OrderEvent(TypedDict):
orderId: str
status: str
class OrderService(SquidService):
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
event: OrderEvent = request['message']
order_id = event['orderId']
status = event['status']
await self.squid.collection('orders').doc(order_id).upsert(
{'orderId': order_id, 'status': status}
)
Handling messages from multiple queue integrations
- TypeScript
- Python
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}
export class OrderService extends SquidService {
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
await this.processOrder(request.message);
}
@onQueueMessage<OrderEvent>('order-events', 'kafka')
async handleKafkaOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
await this.processOrder(request.message);
}
private async processOrder(event: OrderEvent): Promise<void> {
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(event.orderId)
.upsert({ orderId: event.orderId, status: event.status });
}
}
from squidcloud_backend import SquidService, on_queue_message
from squidcloud_backend.types import QueueMessageRequest
from typing import TypedDict
class OrderEvent(TypedDict):
orderId: str
status: str
class OrderService(SquidService):
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
await self._process_order(request['message'])
@on_queue_message('order-events', 'kafka')
async def handle_kafka_order_event(self, request: QueueMessageRequest) -> None:
await self._process_order(request['message'])
async def _process_order(self, event: OrderEvent) -> None:
order_id = event['orderId']
status = event['status']
await self.squid.collection('orders').doc(order_id).upsert(
{'orderId': order_id, 'status': status}
)
Full-Stack Example
This example shows the complete flow: a client publishes an order event, and the backend handler processes it.
Backend: register the handler
- TypeScript
- Python
import { SquidService, onQueueMessage, QueueMessageRequest, secureTopic } from '@squidcloud/backend';
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}
export class OrderService extends SquidService {
@secureTopic('order-events', 'produce')
allowOrderEventPublish(): boolean {
return !!this.getUserAuth();
}
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
console.log(`Received order event: ${orderId} → ${status}`);
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
}
from squidcloud_backend import SquidService, on_queue_message, secure_topic
from squidcloud_backend.types import QueueMessageRequest
from typing import TypedDict
class OrderEvent(TypedDict):
orderId: str
status: str
class OrderService(SquidService):
@secure_topic('order-events', 'produce')
def allow_order_event_publish(self) -> bool:
return self.get_user_auth() is not None
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
event: OrderEvent = request['message']
order_id = event['orderId']
status = event['status']
print(f'Received order event: {order_id} → {status}')
await self.squid.collection('orders').doc(order_id).upsert(
{'orderId': order_id, 'status': status}
)
Client: publish a message
const orderEvent = { orderId: 'order-123', status: 'placed' };
await squid.queue('order-events').produce([orderEvent]);
When produce is called, Squid delivers the message to the topic and the backend handler runs automatically.
See Also
- Built-in queue - Set up and use the built-in Squid queue
- Kafka connector - Connect to an external Kafka integration
- Queue security - Secure access to queue topics
- Triggers - React to database changes
- Schedulers - Run code on a schedule