Confluent キュー
既存の Confluent Cloud Apache Kafka インスタンスを Squid に接続する
Squid 経由で Confluent Cloud にアクセスするには、まず Squid Console でコネクタを追加します。
-
Squid Console で Connectors ページに移動し、Confluent Kafka queue connector を選択します。
-
次の詳細を入力します。
- Connector ID - 簡潔で、コネクタを識別しやすい ID を選択します。
- Bootstrap servers - サーバーとポートのカンマ区切りリスト。この値は cluster settings で確認できます。
- Key - Kafka API Key。cluster をクリックし、その後 API Keys をクリックして既存の key を見つけるか、新規作成します。
- Secret - Kafka API secret。これは Squid Secrets に安全に保存されます。
- Avro schema registry - Avro メッセージを consume または produce したい場合は、Avro schema registry の URL を指定します。この値は cluster settings で確認できます。
-
Test connection をクリックしてコネクタ情報を検証します。検証できたら Add connector をクリックします。

クライアントから queue にアクセスするには、queue を使用して、topic 名と connector ID を渡して QueueManager への参照を作成します。
const queue = squid.queue('topic-name', 'confluent-connector-id');
queue からメッセージを読み取るには、consume メソッドを使用します。これは、新しいメッセージが queue に投稿されるたびに更新される observable(オブザーバブル)を返します。
const topicMessagesObs = queue.consume<MyType>();
topicMessagesObs.subscribe((message: MyType) => {
console.log(message);
});
topic を subscribe すると、受け取る observable は subscribe の確立後に produce されたメッセージのみを含むように構成されています。この挙動は、subscription プロセスに server call が含まれており、約 ~100ms の遅延が発生するためです。そのため、observable が新しいメッセージの配信を開始するまで、短い待ち時間が発生することがあります。
メモリリークを防ぐため、使用しなくなったら observable を完了(unsubscribe)します。
topicMessagesObs.unsubscribe();
queue にメッセージを追加するには、produce メソッドを使用します。このメソッドはメッセージ配列を受け取り、その後 queue を subscribe しているクライアントにより consume されます。
queue.produce(['hello', 'world']);
Confluent topics のセキュリティ保護
クライアントで Confluent topics にアクセスするには、security functions が必要です。
Squid の queue topic を保護するには、@secureTopic デコレータを使用し、topic 名とアクションの種類を渡します。次のコードは、'topic-name' topic の queue に対する読み取りおよび書き込みアクセスを許可します。
import { SquidService, secureTopic } from '@squidcloud/backend';
export class ExampleService extends SquidService {
@secureTopic('topic-name', 'all', 'confluent-connector-id')
allowTopicAccess(): boolean {
return true;
}
}
security functions の詳細については、queue security docs を参照してください。