メインコンテンツまでスキップ

Kafka キュー

既存の Kafka インスタンスを Squid に接続

Kafka コネクタの設定

組み込みキューの代わりにご自身の Kafka コネクタを使用する場合、最初にコネクタを Squid アプリケーションに追加してください。

  • In the Squid Console, navigate to the Connectors page and select the Kafka connector.

  • 以下の詳細情報を入力します:

    • Connector ID - 簡潔でコネクタを識別しやすい ID を選択してください。
    • Bootstrap servers - サーバーとポートのカンマ区切りリスト。
    • Key - あなたの Kafka API Key。
    • Secret - あなたの Kafka API secret。これは Squid Secrets に安全に保存されます。
    • Avro schema registry - Avro メッセージの消費または生成を希望する場合は、Avro schema registry の URL を入力してください。この値はクラスター設定で確認できます。
  • コネクタ情報を検証するために Test connection をクリックしてください。検証が完了したら、Add connector をクリックします。

Kafka connector

クライアントからキューにアクセスするには、queue を使用して QueueManager への参照を作成し、トピック名とコネクタ ID を渡します:

Client code
const queue = squid.queue('topic-name', 'kafka-connector-id');

キューからメッセージを読み取るには、consume メソッドを使用します。これは、キューに新しいメッセージが投稿されるたびに更新される observable を返します。メッセージは文字列型です。

Client code
const topicMessagesObs = queue.consume();

topicMessagesObs.subscribe((message: string) => {
console.log(message);
});
Note

トピックにサブスクライブすると、受け取る observable はサブスクリプション確立後に生成されたメッセージのみを含むように設定されます。この動作は、サブスクリプションプロセスでサーバーコールが行われるため、およそ100msの遅延が発生することによります。そのため、observable が新しいメッセージの配信を開始する前に短い待機が発生することが予想されます。

メモリリークを防ぐため、使用が終了したら observable を完了させてください:

Client code
topicMessagesObs.unsubscribe();

キューにメッセージを追加するには、produce メソッドを使用します。このメソッドは、クライアントによってサブスクライブされたキューで消費される文字列メッセージの配列を引数に取ります。

Client code
queue.produce(['hello', 'world']);

Apache Kafka トピックのセキュリティ確保

クライアントから Apache Kafka トピックにアクセスするためには、セキュリティ機能が必要です。

Squid キューのトピックをセキュアにするには、@secureTopic デコレーターを使用し、トピック名とアクションタイプを渡します。以下のコードは、'topic-name' トピックに対して読み取りおよび書き込みアクセスを許可します:

Backend code
import { SquidService, secureTopic } from '@squidcloud/backend';

export class ExampleService extends SquidService {
@secureTopic('topic-name', 'all', 'kafka-connector-id')
allowTopicAccess(): boolean {
return true;
}
}

セキュリティ機能の詳細については、queue security docs を参照してください。