メインコンテンツまでスキップ

Kafkaキュー

既存のKafkaインスタンスをSquidに接続する

Kafkaコネクタの設定

組み込みのキューではなくご自身のKafkaコネクタを使用する場合は、まずコネクタをSquidアプリケーションに追加してください。

  • Squid Consoleにアクセスし、Connectorsページに移動してKafkaコネクタを選択します。

  • 次の情報を入力してください:

    • Connector ID - 短く、コネクタの識別に役立つIDを選んでください。
    • Bootstrap servers - サーバーとポートのコンマ区切りのリスト。
    • Key - あなたのKafka API Key。
    • Secret - あなたのKafka API secret。これはSquid Secretsに安全に保存されます。
    • Avro schema registry - Avroメッセージの消費または生成を希望する場合は、Avro schema registryのURLを入力してください。この値はクラスタ設定で確認できます。
  • Test connectionをクリックしてコネクタ情報を検証します。検証後、Add connectorをクリックしてください。

Kafka connector

クライアントからキューにアクセスするには、トピック名とコネクタIDを渡してqueueを使用し、QueueManagerへの参照を作成します:

Client code
const queue = squid.queue('topic-name', 'kafka-connector-id');

キューからメッセージを読み取るには、consumeメソッドを使用します。このメソッドは、新しいメッセージがキューに投稿されるたびに更新される observable を返します。メッセージはstring型です。

Client code
const topicMessagesObs = queue.consume();

topicMessagesObs.subscribe((message: string) => {
console.log(message);
});
注意

トピックにサブスクライブすると、受け取ったobservableはサブスクリプション確立後に生成されたメッセージのみが含まれるように構成されています。この動作は、サブスクリプションプロセスがサーバー呼び出しを伴い、約100msの遅延が発生するためです。したがって、observableが新しいメッセージの配信を開始するまで、短い待機時間が発生することが予想されます。

メモリリークを防ぐため、observableの使用を終了したら完了させてください:

Client code
topicMessagesObs.unsubscribe();

キューにメッセージを追加するには、produceメソッドを使用します。このメソッドは、サブスクライブしているクライアントによって消費される文字列メッセージの配列を受け取ります。

Client code
queue.produce(['hello', 'world']);

Apache Kafkaトピックのセキュリティ強化

クライアントからApache Kafkaトピックにアクセスするには、セキュリティ機能が必要です。

Squidキューのトピックを保護するには、@secureTopicデコレーターを使用し、トピック名とアクションの種類を渡します。次のコードは、'topic-name'トピックに対して読み取りおよび書き込みアクセスを許可します:

Backend code
import { SquidService, secureTopic } from '@squidcloud/backend';

export class ExampleService extends SquidService {
@secureTopic('topic-name', 'all', 'kafka-connector-id')
allowTopicAccess(): boolean {
return true;
}
}

セキュリティ機能の詳細については、queue security docsを参照してください。