メインコンテンツまでスキップ

Confluent キュー

既存の Confluent Cloud Apache Kafka インスタンスを Squid に接続する

Squid を通じて Confluent Cloud にアクセスするには、まず Squid Console でコネクタを追加してください。

  • In the Squid Console にて、Connectors ページに移動し、Confluent Kafka キューコネクタを選択します。

  • 次の詳細を入力してください:

    • Connector ID - 短く、かつコネクタの識別に役立つ ID を選択してください。
    • Bootstrap servers - サーバーとポートをカンマで区切ったリストです。この値はクラスタ設定で確認できます。
    • Key - あなたの Kafka API Key。クラスターをクリックし、API Keys をクリックして既存のキーを探すか、新しいキーを作成してください。
    • Secret - あなたの Kafka API secret。これは Squid Secrets に安全に保存されています。
    • Avro schema registry - Avro メッセージの consume または produce を行う場合、Avro schema registry の URL を入力してください。この値はクラスタ設定で確認できます。
  • Test connection をクリックしてコネクタ情報を検証し、検証後に Add connector をクリックします。

Confluent connector

クライアントからキューにアクセスするには、queue を使用して、QueueManager への参照を作成し、トピック名とコネクタ ID を渡します:

Client code
const queue = squid.queue('topic-name', 'confluent-connector-id');

キューからメッセージを読み取るには、consume メソッドを使用します。これにより、キューに新しいメッセージが投稿されるたびに更新される observable が返されます。

Client code
const topicMessagesObs = queue.consume<MyType>();

topicMessagesObs.subscribe((message: MyType) => {
console.log(message);
});
注意

トピックにサブスクライブすると、その時点から produce されたメッセージのみを含む observable が返されるように設定されています。この挙動は、サブスクリプションプロセスでサーバー呼び出しが行われ、約 100ms の遅延が発生するためです。したがって、observable が新しいメッセージの配信を開始する前に、短い待機時間が発生することが予想されます。

メモリリークを防ぐため、使用しなくなった observable は完了(unsubscribe)させてください:

Client code
topicMessagesObs.unsubscribe();

キューにメッセージを追加するには、produce メソッドを使用します。このメソッドは配列形式のメッセージを受け取り、その後キューにサブスクライブしているクライアントによって consume されます。

Client code
queue.produce(['hello', 'world']);

Confluent トピックのセキュリティ

クライアントで Confluent トピックにアクセスするには、セキュリティ関数が必要です。

Squid キューのトピックを保護するには、@secureTopic デコレータを使用し、トピック名とアクション種別を渡します。以下のコードは、'topic-name' トピックに対して読み書きアクセスを許可します:

Backend code
import { SquidService, secureTopic } from '@squidcloud/backend';

export class ExampleService extends SquidService {
@secureTopic('topic-name', 'all', 'confluent-connector-id')
allowTopicAccess(): boolean {
return true;
}
}

セキュリティ関数の詳細については、queue security docs を参照してください。