Confluent キュー
既存の Confluent Cloud Apache Kafka インスタンスを Squid に接続する
Squid を介して Confluent Cloud にアクセスするには、まず Squid Console でコネクタを追加してください。
-
Squid Console にアクセスし、Connectors ページで Confluent Kafka キューコネクタを選択します。
-
次の詳細を入力してください:
- Connector ID - コネクタを識別するために、簡潔で分かりやすい ID を選んでください。
- Bootstrap servers - サーバーとポートをコンマで区切ったリストです。この値はクラスタ設定で確認できます。
- Key - Kafka API Key です。クラスタをクリックし、API Keys をクリックして、既存のキーを確認するか新たに作成してください。
- Secret - Kafka API シークレットです。これは Squid Secrets に安全に保存されます。
- Avro schema registry - Avro メッセージの消費または生成を希望する場合は、Avro schema registry の URL を入力してください。この値はクラスタ設定で確認できます。
-
Test connection をクリックして、コネクタ情報を確認します。確認後、Add connector をクリックしてください。

クライアントからキューにアクセスするには、queue を使い、トピック名とコネクタ ID を渡して QueueManager の参照を作成します:
const queue = squid.queue('topic-name', 'confluent-connector-id');
キューからメッセージを読み取るには、consume メソッドを使用します。これにより、キューに新しいメッセージが投稿されるたびに更新される observable が返されます。
const topicMessagesObs = queue.consume<MyType>();
topicMessagesObs.subscribe((message: MyType) => {
console.log(message);
});
トピックにサブスクライブすると、受け取る observable はサブスクリプションが確立された後に生成されたメッセージのみを含むように構成されます。この動作は、サブスクリプションプロセスでサーバー呼び出しが行われ、約 100ms の遅延が生じるためです。そのため、observable が新しいメッセージの配信を開始するまで、短い待ち時間が発生することがあります。
メモリリークを防ぐため、使用しなくなった observable は必ず解除してください:
topicMessagesObs.unsubscribe();
キューにメッセージを追加するには、produce メソッドを使用します。このメソッドは、メッセージの配列を受け取り、キューにサブスクライブしているクライアントによって消費されます。
queue.produce(['hello', 'world']);
Confluent トピックのセキュリティ
クライアント側から Confluent トピックにアクセスするには、セキュリティ機能が必要です。
Squid キュートピックのセキュリティを確保するには、@secureTopic デコレーターを使用し、トピック名とアクションの種類を渡してください。以下のコードは、'topic-name' トピックへの読み書きアクセスを許可します:
import { SquidService, secureTopic } from '@squidcloud/backend';
export class ExampleService extends SquidService {
@secureTopic('topic-name', 'all', 'confluent-connector-id')
allowTopicAccess(): boolean {
return true;
}
}
セキュリティ機能の詳細については、queue security docsを参照してください。