Confluent キュー
既存の Confluent Cloud Apache Kafka インスタンスを Squid に接続する
Squid を通じて Confluent Cloud にアクセスするには、まず Squid Console でコネクタを追加してください。
-
In the Squid Console にて、Connectors ページに移動し、Confluent Kafka キューコネクタを選択します。
-
次の詳細を入力してください:
- Connector ID - 短く、かつコネクタの識別に役立つ ID を選択してください。
- Bootstrap servers - サーバーとポートをカンマで区切ったリストです。この値はクラスタ設定で確認できます。
- Key - あなたの Kafka API Key。クラスターをクリックし、API Keys をクリックして既存のキーを探すか、新しいキーを作成してください。
- Secret - あなたの Kafka API secret。これは Squid Secrets に安全に保存されています。
- Avro schema registry - Avro メッセージの consume または produce を行う場合、Avro schema registry の URL を入力してください。この値はクラスタ設定で確認できます。
-
Test connection をクリックしてコネクタ情報を検証し、検証後に Add connector をクリックします。
クライアントからキューにアクセスするには、queue
を使用して、QueueManager への参照を作成し、トピック名とコネクタ ID を渡します:
const queue = squid.queue('topic-name', 'confluent-connector-id');
キューからメッセージを読み取るには、consume
メソッドを使用します。これにより、キューに新しいメッセージが投稿されるたびに更新される observable が返されます。
const topicMessagesObs = queue.consume<MyType>();
topicMessagesObs.subscribe((message: MyType) => {
console.log(message);
});
トピックにサブスクライブすると、その時点から produce されたメッセージのみを含む observable が返されるように設定されています。この挙動は、サブスクリプションプロセスでサーバー呼び出しが行われ、約 100ms の遅延が発生するためです。したがって、observable が新しいメッセージの配信を開始する前に、短い待機時間が発生することが予想されます。
メモリリークを防ぐため、使用しなくなった observable は完了(unsubscribe)させてください:
topicMessagesObs.unsubscribe();
キューにメッセージを追加するには、produce
メソッドを使用します。このメソッドは配列形式のメッセージを受け取り、その後キューにサブスクライブしているクライアントによって consume されます。
queue.produce(['hello', 'world']);
Confluent トピックのセキュリティ
クライアントで Confluent トピックにアクセスするには、セキュリティ関数が必要です。
Squid キューのトピックを保護するには、@secureTopic
デコレータを使用し、トピック名とアクション種別を渡します。以下のコードは、'topic-name'
トピックに対して読み書きアクセスを許可します:
import { SquidService, secureTopic } from '@squidcloud/backend';
export class ExampleService extends SquidService {
@secureTopic('topic-name', 'all', 'confluent-connector-id')
allowTopicAccess(): boolean {
return true;
}
}
セキュリティ関数の詳細については、queue security docs を参照してください。