Kafka キュー
既存の Kafka インスタンスを Squid に接続する
Kafka connector をセットアップする
組み込みキューではなく独自の Kafka connector を使用する場合は、まず Squid アプリケーションにその connector を追加します。
-
Squid Console で Connectors ページに移動し、Kafka connector を選択します。
-
次の詳細を入力します。
- Connector ID - 簡潔で、connector を識別しやすい ID を選択します。
- Bootstrap servers - サーバーとポートのカンマ区切りリスト。
- Key - Kafka API Key。
- Secret - Kafka API secret。これは Squid Secrets に安全に保存されます。
- Avro schema registry - Avro メッセージを consume / produce したい場合は、Avro schema registry の URL を指定します。この値はクラスター設定で確認できます。
-
connector 情報を検証するために Test connection をクリックします。検証できたら Add connector をクリックします。

クライアントからキューにアクセスするには、queue を使用して、topic 名と connector ID を渡し、QueueManager への参照を作成します。
const queue = squid.queue('topic-name', 'kafka-connector-id');
キューからメッセージを読み取るには、consume メソッドを使用します。これは、新しいメッセージがキューに投稿されるたびに更新される observable を返します。メッセージは string 型です。
const topicMessagesObs = queue.consume();
topicMessagesObs.subscribe((message: string) => {
console.log(message);
});
topic を subscribe すると、受け取る observable は、subscription が確立された後に produce されたメッセージのみを含むように設定されています。この挙動は、subscription のプロセスにサーバー呼び出しが含まれ、約 ~100ms の遅延が発生するためです。そのため、observable が新しいメッセージの配信を開始するまでに、短い待ち時間が発生することが想定されます。
メモリリークを防ぐため、不要になったら observable を完了(complete)してください。
topicMessagesObs.unsubscribe();
キューにメッセージを追加するには、produce メソッドを使用します。このメソッドは string メッセージの配列を受け取り、キューを subscribe しているクライアントがそれらを consume します。
queue.produce(['hello', 'world']);
Apache Kafka topic のセキュリティ保護
クライアントで Apache Kafka topic にアクセスするには、security functions が必要です。
Squid の queue topic を保護するには、@secureTopic デコレーターを使用し、topic 名とアクションの種類を渡します。次のコードは、'topic-name' topic のキューに対する読み取りおよび書き込みアクセスを許可します。
import { SquidService, secureTopic } from '@squidcloud/backend';
export class ExampleService extends SquidService {
@secureTopic('topic-name', 'all', 'kafka-connector-id')
allowTopicAccess(): boolean {
return true;
}
}
security functions の詳細については、queue security docs を参照してください。