メインコンテンツまでスキップ

Kafka キュー

既存の Kafka インスタンスを Squid に接続する

Kafka connector をセットアップする

組み込みキューではなく独自の Kafka connector を使用する場合は、まず Squid アプリケーションにその connector を追加します。

  • Squid ConsoleConnectors ページに移動し、Kafka connector を選択します。

  • 次の詳細を入力します。

    • Connector ID - 簡潔で、connector を識別しやすい ID を選択します。
    • Bootstrap servers - サーバーとポートのカンマ区切りリスト。
    • Key - Kafka API Key。
    • Secret - Kafka API secret。これは Squid Secrets に安全に保存されます。
    • Avro schema registry - Avro メッセージを consume / produce したい場合は、Avro schema registry の URL を指定します。この値はクラスター設定で確認できます。
  • connector 情報を検証するために Test connection をクリックします。検証できたら Add connector をクリックします。

Kafka connector

クライアントからキューにアクセスするには、queue を使用して、topic 名と connector ID を渡し、QueueManager への参照を作成します。

Client code
const queue = squid.queue('topic-name', 'kafka-connector-id');

キューからメッセージを読み取るには、consume メソッドを使用します。これは、新しいメッセージがキューに投稿されるたびに更新される observable を返します。メッセージは string 型です。

Client code
const topicMessagesObs = queue.consume();

topicMessagesObs.subscribe((message: string) => {
console.log(message);
});
Note

topic を subscribe すると、受け取る observable は、subscription が確立された後に produce されたメッセージのみを含むように設定されています。この挙動は、subscription のプロセスにサーバー呼び出しが含まれ、約 ~100ms の遅延が発生するためです。そのため、observable が新しいメッセージの配信を開始するまでに、短い待ち時間が発生することが想定されます。

メモリリークを防ぐため、不要になったら observable を完了(complete)してください。

Client code
topicMessagesObs.unsubscribe();

キューにメッセージを追加するには、produce メソッドを使用します。このメソッドは string メッセージの配列を受け取り、キューを subscribe しているクライアントがそれらを consume します。

Client code
queue.produce(['hello', 'world']);

Apache Kafka topic のセキュリティ保護

クライアントで Apache Kafka topic にアクセスするには、security functions が必要です。

Squid の queue topic を保護するには、@secureTopic デコレーターを使用し、topic 名とアクションの種類を渡します。次のコードは、'topic-name' topic のキューに対する読み取りおよび書き込みアクセスを許可します。

Backend code
import { SquidService, secureTopic } from '@squidcloud/backend';

export class ExampleService extends SquidService {
@secureTopic('topic-name', 'all', 'kafka-connector-id')
allowTopicAccess(): boolean {
return true;
}
}

security functions の詳細については、queue security docs を参照してください。