メインコンテンツまでスキップ

Confluent キュー

既存の Confluent Cloud Apache Kafka インスタンスを Squid に接続する

Squid を介して Confluent Cloud にアクセスするには、まず Squid Console でコネクタを追加してください。

  • Squid Console にアクセスし、Connectors ページで Confluent Kafka キューコネクタを選択します。

  • 次の詳細を入力してください:

    • Connector ID - コネクタを識別するために、簡潔で分かりやすい ID を選んでください。
    • Bootstrap servers - サーバーとポートをコンマで区切ったリストです。この値はクラスタ設定で確認できます。
    • Key - Kafka API Key です。クラスタをクリックし、API Keys をクリックして、既存のキーを確認するか新たに作成してください。
    • Secret - Kafka API シークレットです。これは Squid Secrets に安全に保存されます。
    • Avro schema registry - Avro メッセージの消費または生成を希望する場合は、Avro schema registry の URL を入力してください。この値はクラスタ設定で確認できます。
  • Test connection をクリックして、コネクタ情報を確認します。確認後、Add connector をクリックしてください。

Confluent connector

クライアントからキューにアクセスするには、queue を使い、トピック名とコネクタ ID を渡して QueueManager の参照を作成します:

Client code
const queue = squid.queue('topic-name', 'confluent-connector-id');

キューからメッセージを読み取るには、consume メソッドを使用します。これにより、キューに新しいメッセージが投稿されるたびに更新される observable が返されます。

Client code
const topicMessagesObs = queue.consume<MyType>();

topicMessagesObs.subscribe((message: MyType) => {
console.log(message);
});
注意

トピックにサブスクライブすると、受け取る observable はサブスクリプションが確立された後に生成されたメッセージのみを含むように構成されます。この動作は、サブスクリプションプロセスでサーバー呼び出しが行われ、約 100ms の遅延が生じるためです。そのため、observable が新しいメッセージの配信を開始するまで、短い待ち時間が発生することがあります。

メモリリークを防ぐため、使用しなくなった observable は必ず解除してください:

Client code
topicMessagesObs.unsubscribe();

キューにメッセージを追加するには、produce メソッドを使用します。このメソッドは、メッセージの配列を受け取り、キューにサブスクライブしているクライアントによって消費されます。

Client code
queue.produce(['hello', 'world']);

Confluent トピックのセキュリティ

クライアント側から Confluent トピックにアクセスするには、セキュリティ機能が必要です。

Squid キュートピックのセキュリティを確保するには、@secureTopic デコレーターを使用し、トピック名とアクションの種類を渡してください。以下のコードは、'topic-name' トピックへの読み書きアクセスを許可します:

Backend code
import { SquidService, secureTopic } from '@squidcloud/backend';

export class ExampleService extends SquidService {
@secureTopic('topic-name', 'all', 'confluent-connector-id')
allowTopicAccess(): boolean {
return true;
}
}

セキュリティ機能の詳細については、queue security docsを参照してください。