メインコンテンツまでスキップ

Confluent キュー

既存の Confluent Cloud Apache Kafka インスタンスを Squid に接続する

Squid 経由で Confluent Cloud にアクセスするには、まず Squid Console でコネクタを追加します。

  • Squid ConsoleConnectors ページに移動し、Confluent Kafka キューコネクタを選択します。

  • 次の詳細を入力します。

    • Connector ID - 簡潔で、コネクタを識別しやすい ID を選択します。
    • Bootstrap servers - サーバーとポートのカンマ区切りリスト。この値はクラスター設定で確認できます。
    • Key - Kafka API Key。クラスターをクリックし、次に API Keys をクリックして既存のキーを確認するか、新しく作成します。
    • Secret - Kafka API secret。これは Squid Secrets に安全に保存されます。
    • Avro schema registry - Avro メッセージを consume または produce したい場合は、Avro schema registry の URL を指定します。この値はクラスター設定で確認できます。
  • Test connection をクリックしてコネクタ情報を検証します。検証できたら Add connector をクリックします。

Confluent connector

クライアントからキューにアクセスするには、queue を使用して QueueManager への参照を作成し、topic 名と connector ID を渡します。

Client code
const queue = squid.queue('topic-name', 'confluent-connector-id');

キューからメッセージを読み取るには、consume メソッドを使用します。これは、キューに新しいメッセージが投稿されるたびに更新される observable を返します。

Client code
const topicMessagesObs = queue.consume<MyType>();

topicMessagesObs.subscribe((message: MyType) => {
console.log(message);
});
Note

topic を subscribe すると、受け取る observable は subscribe が確立された後に produce されたメッセージのみを含むように構成されています。この挙動は、subscribe プロセスに server call が含まれており、約 ~100ms の遅延が発生するためです。そのため、observable が新しいメッセージの配信を開始するまで、短い待ち時間が発生することがあります。

メモリリークを防ぐため、使用しなくなったら observable を完了(complete)します。

Client code
topicMessagesObs.unsubscribe();

キューにメッセージを追加するには、produce メソッドを使用します。このメソッドはメッセージの配列を受け取り、それらはキューを subscribe しているクライアントによって consume されます。

Client code
queue.produce(['hello', 'world']);

Confluent topics のセキュリティ保護

クライアントで Confluent topics にアクセスするには security functions が必要です。

Squid のキュー topic を保護するには、@secureTopic デコレーターを使用し、topic 名と action の種類を渡します。次のコードは、'topic-name' topic のキューに対して read と write のアクセスを許可します。

Backend code
import { SquidService, secureTopic } from '@squidcloud/backend';

export class ExampleService extends SquidService {
@secureTopic('topic-name', 'all', 'confluent-connector-id')
allowTopicAccess(): boolean {
return true;
}
}

security functions の詳細は、queue security docs を参照してください。