Kafka キュー
既存の Kafka インスタンスを Squid に接続
Kafka コネクタの設定
組み込みキューの代わりにご自身の Kafka コネクタを使用する場合、最初にコネクタを Squid アプリケーションに追加してください。
-
In the Squid Console, navigate to the Connectors page and select the Kafka connector.
-
以下の詳細情報を入力します:
- Connector ID - 簡潔でコネクタを識別しやすい ID を選択してください。
- Bootstrap servers - サーバーとポートのカンマ区切りリスト。
- Key - あなたの Kafka API Key。
- Secret - あなたの Kafka API secret。これは Squid Secrets に安全に保存されます。
- Avro schema registry - Avro メッセージの消費または生成を希望する場合は、Avro schema registry の URL を入力してください。この値はクラスター設定で確認できます。
-
コネクタ情報を検証するために Test connection をクリックしてください。検証が完了したら、Add connector をクリックします。
クライアントからキューにアクセスするには、queue
を使用して QueueManager への参照を作成し、トピック名とコネクタ ID を渡します:
const queue = squid.queue('topic-name', 'kafka-connector-id');
キューからメッセージを読み取るには、consume
メソッドを使用します。これは、キューに新しいメッセージが投稿されるたびに更新される observable を返します。メッセージは文字列型です。
const topicMessagesObs = queue.consume();
topicMessagesObs.subscribe((message: string) => {
console.log(message);
});
トピックにサブスクライブすると、受け取る observable はサブスクリプション確立後に生成されたメッセージのみを含むように設定されます。この動作は、サブスクリプションプロセスでサーバーコールが行われるため、およそ100msの遅延が発生することによります。そのため、observable が新しいメッセージの配信を開始する前に短い待機が発生することが予想されます。
メモリリークを防ぐため、使用が終了したら observable を完了させてください:
topicMessagesObs.unsubscribe();
キューにメッセージを追加するには、produce
メソッドを使用します。このメソッドは、クライアントによってサブスクライブされたキューで消費される文字列メッセージの配列を引数に取ります。
queue.produce(['hello', 'world']);
Apache Kafka トピックのセキュリティ確保
クライアントから Apache Kafka トピックにアクセスするためには、セキュリティ機能が必要です。
Squid キューのトピックをセキュアにするには、@secureTopic
デコレーターを使用し、トピック名とアクションタイプを渡します。以下のコードは、'topic-name'
トピックに対して読み取りおよび書き込みアクセスを許可します:
import { SquidService, secureTopic } from '@squidcloud/backend';
export class ExampleService extends SquidService {
@secureTopic('topic-name', 'all', 'kafka-connector-id')
allowTopicAccess(): boolean {
return true;
}
}
セキュリティ機能の詳細については、queue security docs を参照してください。