キューメッセージハンドラー
バックエンドサービスメソッドでキュートピックからのメッセージを処理します。
Queue Message Handlers を使う理由
アプリケーションはイベントをキューに publish します — 注文の作成、ファイルのアップロード、支払いの完了など — そしてバックエンドでそれぞれを確実に処理する必要があります。
@onQueueMessage を使うと、メソッドを decorate して deploy するだけです。
- TypeScript
- Python
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
event = request['message']
order_id = event['orderId']
status = event['status']
await self.squid.collection('orders').doc(order_id).upsert(
{'orderId': order_id, 'status': status}
)
polling は不要です。routing logic も不要です。トピックにメッセージが到着した瞬間に、Squid がハンドラーを呼び出します。
概要
Queue message handlers は、指定したトピックにメッセージが publish されるたびに自動実行される、@onQueueMessage(Python では @on_queue_message)で decorate されたバックエンドメソッドです。組み込みの Squid queue と、Kafka のような外部キュー連携の両方をサポートします。
Queue message handlers を使うタイミング
| Use Case | Recommendation |
|---|---|
| キュートピックに publish されたメッセージを処理する | ✅ Queue message handler |
| データベースの変更に反応する | Triggers を使用 |
| クライアントから関数を呼び出す | Executables を使用 |
| スケジュールに従ってコードを実行する | Schedulers を使用 |
| 外部サービス向けに HTTP endpoint を公開する | Webhooks を使用 |
仕組み
SquidServiceを継承したクラス内のメソッドに@onQueueMessage()を付けます- トピック名と、必要に応じて integration ID を指定します
- deploy 時に Squid がハンドラーを検出して登録します
- トピックにメッセージが到着すると、Squid が
QueueMessageRequestオブジェクトを渡してメソッドを呼び出します - ハンドラーは synchronous でも
Promiseを返す形でも使えます
Quick Start
前提条件
- TypeScript
- Python
squid initで初期化された Squid backend project- NPM からインストールされた
@squidcloud/backendパッケージ
squid initで初期化された Squid backend project- PyPI からインストールされた
squidcloud-backendパッケージ
Step 1: ハンドラーを作成する
SquidService を継承する service class を作成し、ハンドラーメソッドを追加します。
- TypeScript
- Python
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}
export class OrderService extends SquidService {
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
}
from squidcloud_backend import SquidService, on_queue_message
from squidcloud_backend.types import QueueMessageRequest
from typing import TypedDict
class OrderEvent(TypedDict):
orderId: str
status: str # 'placed' | 'shipped' | 'delivered'
class OrderService(SquidService):
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
event: OrderEvent = request['message']
order_id = event['orderId']
status = event['status']
await self.squid.collection('orders').doc(order_id).upsert(
{'orderId': order_id, 'status': status}
)
Step 2: service を export する
- TypeScript
- Python
service index file から service が export されていることを確認してください。
export * from './example-service';
service index file から service が export されていることを確認してください。
from .example_service import OrderService
Step 3: バックエンドを起動または deploy する
ローカル開発では、Squid CLI を使ってバックエンドをローカルで実行します。
squid start
cloud に deploy する方法については、deploying your backend を参照してください。
Step 4: 確認する
クライアントまたは別の service からトピックにメッセージを publish し、Squid Console のログでハンドラーが呼び出されたことを確認します。
Core Concepts
decorator
decorator は 2 つのパラメータを受け取ります。
| Parameter | Type | Required | Description |
|---|---|---|---|
topicName | string | Yes | subscribe 対象となるキュートピックの名前 |
integrationId | string | No | キューの integration ID。デフォルトは組み込みの Squid queue integration です |
- TypeScript
- Python
// Built-in queue — integrationId defaults to 'built_in_queue'
@onQueueMessage('order-events')
// External integration (e.g. Kafka)
@onQueueMessage('order-events', 'kafka')
# Built-in queue — integration_id defaults to 'built_in_queue'
@on_queue_message('order-events')
# External integration (e.g. Kafka)
@on_queue_message('order-events', 'kafka')
QueueMessageRequest
ハンドラーに渡される QueueMessageRequest オブジェクトには、次の内容が含まれます。
| Property | Type | Description |
|---|---|---|
message | T | 型付きのメッセージ payload |
topicName | string | メッセージが送信されたトピック名 |
integrationId | string | キューの integration ID |
TypeScript では、generic type parameter T によって publisher からハンドラーまで end-to-end の型安全性が得られます。Python では QueueMessageRequest は TypedDict のため、フィールドには辞書形式の lookup でアクセスします。
- TypeScript
- Python
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
// request.message is typed as OrderEvent
const { orderId, status } = request.message;
}
class OrderEvent(TypedDict):
orderId: str
status: str
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
# access fields via dict-style lookups
event: OrderEvent = request['message']
order_id = event['orderId']
topic = request['topicName']
外部キュー連携を使う
integrationId を省略すると、Squid の組み込みキューが使われます。明示的に integration ID を指定することで、Kafka のような外部キューシステムでも同じハンドラーパターンを利用できます。
- TypeScript
- Python
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}
export class OrderService extends SquidService {
// Built-in queue
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
console.log('Built-in queue event:', request.message);
}
// External Kafka integration
@onQueueMessage<OrderEvent>('order-events', 'kafka')
async handleKafkaOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
console.log(`Kafka event on topic "${request.topicName}":`, request.message);
}
}
from squidcloud_backend import SquidService, on_queue_message
from squidcloud_backend.types import QueueMessageRequest
class OrderService(SquidService):
# Built-in queue
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
print('Built-in queue event:', request['message'])
# External Kafka integration
@on_queue_message('order-events', 'kafka')
async def handle_kafka_order_event(self, request: QueueMessageRequest) -> None:
print(f"Kafka event on topic '{request['topicName']}':", request['message'])
エラーハンドリング
ハンドラーが error を throw すると、その error は Squid Console に記録されます。失敗を適切に処理するために、ロジックを try/catch(または try/except)で囲んでください。
- TypeScript
- Python
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
try {
await this.processOrder(request.message);
} catch (error) {
console.error(`Failed to process order ${request.message.orderId}:`, error);
}
}
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
try:
await self.process_order(request['message'])
except Exception as error:
print(f"Failed to process order {request['message']['orderId']}:", error)
ベストプラクティス
-
メッセージ payload を型付けする。 TypeScript では generic parameter
QueueMessageRequest<MyType>を、Python ではTypedDictの subclass を使い、メッセージ本文に型安全にアクセスしましょう。 -
ハンドラー内で error を処理する。 ロジックを try/catch(または try/except)で囲み、error をログに記録して、1 つの不正なメッセージで静かに失敗しないようにしましょう。
-
ハンドラーの責務を絞る。 ハンドラーは 1 つのことを行うべきです。1 つのメッセージで複数の workflow を起動する必要がある場合は、すべてのロジックをハンドラーに詰め込むのではなく、他のメソッドや service に委譲してください。
-
idempotency を考慮して設計する。 メッセージはまれに複数回配信されることがあります。重複メッセージを処理しても同じ結果になるよう、ハンドラーを設計してください。
コード例
キューメッセージをデータベースに書き込む
- TypeScript
- Python
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}
export class OrderService extends SquidService {
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
}
from squidcloud_backend import SquidService, on_queue_message
from squidcloud_backend.types import QueueMessageRequest
from typing import TypedDict
class OrderEvent(TypedDict):
orderId: str
status: str
class OrderService(SquidService):
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
event: OrderEvent = request['message']
order_id = event['orderId']
status = event['status']
await self.squid.collection('orders').doc(order_id).upsert(
{'orderId': order_id, 'status': status}
)
複数のキュー連携からのメッセージを処理する
- TypeScript
- Python
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}
export class OrderService extends SquidService {
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
await this.processOrder(request.message);
}
@onQueueMessage<OrderEvent>('order-events', 'kafka')
async handleKafkaOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
await this.processOrder(request.message);
}
private async processOrder(event: OrderEvent): Promise<void> {
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(event.orderId)
.upsert({ orderId: event.orderId, status: event.status });
}
}
from squidcloud_backend import SquidService, on_queue_message
from squidcloud_backend.types import QueueMessageRequest
from typing import TypedDict
class OrderEvent(TypedDict):
orderId: str
status: str
class OrderService(SquidService):
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
await self._process_order(request['message'])
@on_queue_message('order-events', 'kafka')
async def handle_kafka_order_event(self, request: QueueMessageRequest) -> None:
await self._process_order(request['message'])
async def _process_order(self, event: OrderEvent) -> None:
order_id = event['orderId']
status = event['status']
await self.squid.collection('orders').doc(order_id).upsert(
{'orderId': order_id, 'status': status}
)
Full-Stack Example
この例では、クライアントが注文イベントを publish し、バックエンドハンドラーがそれを処理するまでの完全な流れを示します。
Backend: ハンドラーを登録する
- TypeScript
- Python
import { SquidService, onQueueMessage, QueueMessageRequest, secureTopic } from '@squidcloud/backend';
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}
export class OrderService extends SquidService {
@secureTopic('order-events', 'produce')
allowOrderEventPublish(): boolean {
return !!this.getUserAuth();
}
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
console.log(`Received order event: ${orderId} → ${status}`);
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
}
from squidcloud_backend import SquidService, on_queue_message, secure_topic
from squidcloud_backend.types import QueueMessageRequest
from typing import TypedDict
class OrderEvent(TypedDict):
orderId: str
status: str
class OrderService(SquidService):
@secure_topic('order-events', 'produce')
def allow_order_event_publish(self) -> bool:
return self.get_user_auth() is not None
@on_queue_message('order-events')
async def handle_order_event(self, request: QueueMessageRequest) -> None:
event: OrderEvent = request['message']
order_id = event['orderId']
status = event['status']
print(f'Received order event: {order_id} → {status}')
await self.squid.collection('orders').doc(order_id).upsert(
{'orderId': order_id, 'status': status}
)
Client: メッセージを publish する
const orderEvent = { orderId: 'order-123', status: 'placed' };
await squid.queue('order-events').produce([orderEvent]);
produce が呼び出されると、Squid はメッセージをトピックに配信し、バックエンドハンドラーが自動的に実行されます。
関連項目
- Built-in queue - 組み込みの Squid queue を設定して使用する
- Kafka connector - 外部 Kafka integration に接続する
- Queue security - キュートピックへのアクセスを保護する
- Triggers - データベースの変更に反応する
- Schedulers - スケジュールに従ってコードを実行する