メインコンテンツまでスキップ

キューメッセージハンドラー

バックエンドサービスメソッドでキュートピックからのメッセージを処理します。

Queue Message Handlers を使う理由

アプリケーションはイベントをキューに publish します — 注文の作成、ファイルのアップロード、支払いの完了など — そしてバックエンドでそれぞれを確実に処理する必要があります。

@onQueueMessage を使うと、メソッドを decorate して deploy するだけです。

Backend code
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}

polling は不要です。routing logic も不要です。トピックにメッセージが到着した瞬間に、Squid がハンドラーを呼び出します。

概要

Queue message handlers は、指定したトピックにメッセージが publish されるたびに自動実行される、@onQueueMessage(Python では @on_queue_message)で decorate されたバックエンドメソッドです。組み込みの Squid queue と、Kafka のような外部キュー連携の両方をサポートします。

Queue message handlers を使うタイミング

Use CaseRecommendation
キュートピックに publish されたメッセージを処理する✅ Queue message handler
データベースの変更に反応するTriggers を使用
クライアントから関数を呼び出すExecutables を使用
スケジュールに従ってコードを実行するSchedulers を使用
外部サービス向けに HTTP endpoint を公開するWebhooks を使用

仕組み

  1. SquidService を継承したクラス内のメソッドに @onQueueMessage() を付けます
  2. トピック名と、必要に応じて integration ID を指定します
  3. deploy 時に Squid がハンドラーを検出して登録します
  4. トピックにメッセージが到着すると、Squid が QueueMessageRequest オブジェクトを渡してメソッドを呼び出します
  5. ハンドラーは synchronous でも Promise を返す形でも使えます

Quick Start

前提条件

  • squid init で初期化された Squid backend project
  • NPM からインストールされた @squidcloud/backend パッケージ

Step 1: ハンドラーを作成する

SquidService を継承する service class を作成し、ハンドラーメソッドを追加します。

Backend code
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';

interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}

export class OrderService extends SquidService {
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
}

Step 2: service を export する

service index file から service が export されていることを確認してください。

Backend code
export * from './example-service';

Step 3: バックエンドを起動または deploy する

ローカル開発では、Squid CLI を使ってバックエンドをローカルで実行します。

squid start

cloud に deploy する方法については、deploying your backend を参照してください。

Step 4: 確認する

クライアントまたは別の service からトピックにメッセージを publish し、Squid Console のログでハンドラーが呼び出されたことを確認します。

Core Concepts

decorator

decorator は 2 つのパラメータを受け取ります。

ParameterTypeRequiredDescription
topicNamestringYessubscribe 対象となるキュートピックの名前
integrationIdstringNoキューの integration ID。デフォルトは組み込みの Squid queue integration です
Backend code
// Built-in queue — integrationId defaults to 'built_in_queue'
@onQueueMessage('order-events')

// External integration (e.g. Kafka)
@onQueueMessage('order-events', 'kafka')

QueueMessageRequest

ハンドラーに渡される QueueMessageRequest オブジェクトには、次の内容が含まれます。

PropertyTypeDescription
messageT型付きのメッセージ payload
topicNamestringメッセージが送信されたトピック名
integrationIdstringキューの integration ID

TypeScript では、generic type parameter T によって publisher からハンドラーまで end-to-end の型安全性が得られます。Python では QueueMessageRequestTypedDict のため、フィールドには辞書形式の lookup でアクセスします。

Backend code
interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}

@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
// request.message is typed as OrderEvent
const { orderId, status } = request.message;
}

外部キュー連携を使う

integrationId を省略すると、Squid の組み込みキューが使われます。明示的に integration ID を指定することで、Kafka のような外部キューシステムでも同じハンドラーパターンを利用できます。

Backend code
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';

interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}

export class OrderService extends SquidService {
// Built-in queue
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
console.log('Built-in queue event:', request.message);
}

// External Kafka integration
@onQueueMessage<OrderEvent>('order-events', 'kafka')
async handleKafkaOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
console.log(`Kafka event on topic "${request.topicName}":`, request.message);
}
}

エラーハンドリング

ハンドラーが error を throw すると、その error は Squid Console に記録されます。失敗を適切に処理するために、ロジックを try/catch(または try/except)で囲んでください。

Backend code
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
try {
await this.processOrder(request.message);
} catch (error) {
console.error(`Failed to process order ${request.message.orderId}:`, error);
}
}

ベストプラクティス

  1. メッセージ payload を型付けする。 TypeScript では generic parameter QueueMessageRequest<MyType> を、Python では TypedDict の subclass を使い、メッセージ本文に型安全にアクセスしましょう。

  2. ハンドラー内で error を処理する。 ロジックを try/catch(または try/except)で囲み、error をログに記録して、1 つの不正なメッセージで静かに失敗しないようにしましょう。

  3. ハンドラーの責務を絞る。 ハンドラーは 1 つのことを行うべきです。1 つのメッセージで複数の workflow を起動する必要がある場合は、すべてのロジックをハンドラーに詰め込むのではなく、他のメソッドや service に委譲してください。

  4. idempotency を考慮して設計する。 メッセージはまれに複数回配信されることがあります。重複メッセージを処理しても同じ結果になるよう、ハンドラーを設計してください。

コード例

キューメッセージをデータベースに書き込む

Backend code
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';

interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}

export class OrderService extends SquidService {
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
}

複数のキュー連携からのメッセージを処理する

Backend code
import { SquidService, onQueueMessage, QueueMessageRequest } from '@squidcloud/backend';

interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}

export class OrderService extends SquidService {
@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
await this.processOrder(request.message);
}

@onQueueMessage<OrderEvent>('order-events', 'kafka')
async handleKafkaOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
await this.processOrder(request.message);
}

private async processOrder(event: OrderEvent): Promise<void> {
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(event.orderId)
.upsert({ orderId: event.orderId, status: event.status });
}
}

Full-Stack Example

この例では、クライアントが注文イベントを publish し、バックエンドハンドラーがそれを処理するまでの完全な流れを示します。

Backend: ハンドラーを登録する

Backend code
import { SquidService, onQueueMessage, QueueMessageRequest, secureTopic } from '@squidcloud/backend';

interface OrderEvent {
orderId: string;
status: 'placed' | 'shipped' | 'delivered';
}

export class OrderService extends SquidService {
@secureTopic('order-events', 'produce')
allowOrderEventPublish(): boolean {
return !!this.getUserAuth();
}

@onQueueMessage<OrderEvent>('order-events')
async handleOrderEvent(request: QueueMessageRequest<OrderEvent>): Promise<void> {
const { orderId, status } = request.message;
console.log(`Received order event: ${orderId}${status}`);
await this.squid
.collection<{ orderId: string; status: string }>('orders')
.doc(orderId)
.upsert({ orderId, status });
}
}

Client: メッセージを publish する

Client code
const orderEvent = { orderId: 'order-123', status: 'placed' };
await squid.queue('order-events').produce([orderEvent]);

produce が呼び出されると、Squid はメッセージをトピックに配信し、バックエンドハンドラーが自動的に実行されます。

関連項目

  • Built-in queue - 組み込みの Squid queue を設定して使用する
  • Kafka connector - 外部 Kafka integration に接続する
  • Queue security - キュートピックへのアクセスを保護する
  • Triggers - データベースの変更に反応する
  • Schedulers - スケジュールに従ってコードを実行する