ログストリーミング:Kafka

Fastly のリアルタイムログストリーミング機能により、Apache Kafka にログを送信することができます。Kafka は、リアルタイムデータフィードを処理するための、高スループットで低レイテンシのオープンソースプラットフォームです。

注意

Fastly ではサードパーティのサービスに関する直接のサポートは行っておりません。詳細については、Fastly のご利用規約をご覧ください。

設定前の注意点

Fastly サービスのログエンドポイントとして Apache Kafka を追加する前に、Kafka がリモートサーバー上で作動していることを確認してください。1つ以上のサーバー (Broker) のホスト名や IP アドレス、およびメッセージが保存されるカテゴリーやフィードの名称 (Topic) が必要になります。Kafka の設定に関する詳細情報は、Apache Kafka クイックスタートガイドをご参照ください。

出力された各々の Fastly ログは、個別の Kafka レコードで構成されており、Fastly では、バッチで複数の Kafka パーティションに複数のレコードを作成する標準の Kafka クライアントプロトコルに従っています。適切なログメッセージのスループットを確実にするには、ログ処理インフラストラクチャの機能に合わせて、最大バイト圧縮コーデックの設定を調整します。

Kafka はデフォルトで共有のリソースであり、環境内の Kafka の他のユーザーが、Fastly ログのスループットに影響を与える場合があります。独自の Kafka インフラストラクチャを使用している場合、Fastly では、ログを送信する Kafka トピックのパーティションとして、専用のディスクストレージボリュームを指定することを推奨しています。容量の拡張が必要になる場合に備えて、IOPS を監視し、ストレージの最大値に対して帯域幅を消費する必要もあります。

ログエンドポイントとして Kafka を追加

次の手順に従って、ログエンドポイントとして Kafka を追加してください。

  1. リモートログストリーミングの設定に関するガイドをご確認ください。
  2. Apache Kafka Create endpoint ボタンをクリックします。Create an Apache Kafka endpoint ページが表示されます。
  3. Create an Apache Kafka endpoint フィールドに次のように入力します。
    • Name フィールドに分かりやすいエンドポイントの名前を入力します。
    • Placement セクションでは、生成される VCL にログコールが配置される場所を選択します。有効な値は Format Version Defaultwaf_debug (waf_debug_log)None です。詳細については、ログ配置の変更に関するガイドをご覧ください。
    • Log format フィールドには、ログ形式に使用する Apache 形式の文字列や VCL 変数を任意で入力します。詳細については、形式例のセクションをご覧ください。
    • Brokers フィールドには、1つ以上のサーバー (Kafka ブローカー) のホスト名または IP アドレスを入力します。デフォルトでは、ポート9092を使用します。デフォルトのポートと異なる場合、ホスト名にフォーマット :[port](例: :9093) を使用してポートを追加してください。複数のサーバーをコンマ区切りの文字列を使用して指定することができます。
    • Topic フィールドには、ログを送信するトピック名を入力します。
    • 最大バイトフィールドには、Kafka が生成するリクエストメッセージの最大サイズをバイト単位で入力します。
    • Parse key-values コントロールから、Kafka レコードヘッダへログ形式内のキーと値ペアを解析するかどうかを任意で選択します。キーと値のペアは、フォーマットされたログ行を key=val で導くもので、コンマで区切られていなければなりません。キーも値も二重引用符から始めることはできません。キーはの先頭には任意の数のスペースを置くことができますが、スペースを含めることはできず、少なくとも1文字が必要となります。値は、key= 内で空にすることができます。ログ行にキーと値のペア __record_key を含めて、パーティションするレコードキーを指定します。ログは、Kafka の統一したスティッキーパーティション戦略に従ってパーティション化されます。指定されたレコードキーがない場合、Fastly の Kafka クライアントは、トピックの利用可能なパーティションにログメッセージを均一に配信します。
    • Write acknowledgement エリアでは、リーダーの Kafka ブローカーが成功するために生成されるリクエストを受信する必要がある適切な書き込み確認を任意で選択します。Fastly の Kafka クライアントは、指数バックオフとジッタで失敗した生成リクエストの限定的再配信を試み、thundering herd シナリオを低減します。
    • Compression codec エリアには、ログの圧縮に使用する適切なコーデックを任意で選択します。
    • Use SASL 制御から、SASL 認証を有効にするかどうかを任意で選択します。SASL 認証は、TLS の暗号化と同時に有効にすることができます。Yes を選択すると、追加の SASL 認証フィールドが表示されます。
    • SASL authentication mechanism メニューから、SASL クライアント認証のユーザー名とパスワードの認証に使用する適切なチャレンジレスポンスを選択します。
    • User フィールドに、SASL クライアント認証ユーザー名を入力します。
    • Password フィールドに、SASL クライアント認証パスワードを入力します。
    • Use TLS 制御から、Kafka エンドポイントで TLS 暗号化を有効にするかどうかを任意で選択します。TLS 暗号化は、SASL 認証と同時に有効にすることができます。Yes を選択すると、追加の TLS フィールドが表示されます。
    • TLS CA certificate フィールドには、Kafka ブローカーの証明書が有効であることを確認するために使用する認証機関 (CA) の証明書を任意でコピー&ペーストしてください。アップロードする証明書は PEM 形式である必要があります。周知の認証機関によって署名されてない場合には、証明書のアップロードを熟慮してください。TLS 証明書が周知の機関によって署名されていれば、必要ありません。このフィールドは、Use TLS メニューから Yes を選択した場合にのみ表示されます。
    • TLS client certificate フィールドには、Kafka ブローカーへの認証に使用する TLS クライアント証明書を任意でコピー&ペーストしてください。アップロードする TLS クライアント証明書は、PEM 形式で、クライアントキーを添付する必要があります。TLS クライアント証明書により、Fastly が接続を実行していることを Kafka サーバーが認証できるようになります。このフィールドは、Use TLS メニューから Yes を選択した場合にのみ表示されます。
    • TLS client key フィールドには、Kafka ブローカーへの認証に使用する TLS クライアントキーを任意でコピー&ペーストしてください。アップロードする TLS クライアントキーは、PEM 形式で、TLS クライアント証明書を添付する必要があります。TLS クライアントキーにより、Fastly が接続を実行していることを Kafka ブローカーが認証できるようになります。このフィールドは、Use TLS メニューから Yes を選択した場合にのみ表示されます。
  4. Create ボタンをクリックすると、新しいログエンドポイントが作成されます。
  5. Activate ボタンをクリックして設定変更をデプロイします。

の形式例

以下に示すのは、Apache Kafka にデータを送信する書式の例です。書式の詳細については、こちらをご覧ください。

1{
2 "timestamp": "%{strftime(\{"%Y-%m-%dT%H:%M:%S%z"\}, time.start)}V",
3 "client_ip": "%{req.http.Fastly-Client-IP}V",
4 "geo_country": "%{client.geo.country_name}V",
5 "geo_city": "%{client.geo.city}V",
6 "host": "%{if(req.http.Fastly-Orig-Host, req.http.Fastly-Orig-Host, req.http.Host)}V",
7 "url": "%{json.escape(req.url)}V",
8 "request_method": "%{json.escape(req.method)}V",
9 "request_protocol": "%{json.escape(req.proto)}V",
10 "request_referer": "%{json.escape(req.http.referer)}V",
11 "request_user_agent": "%{json.escape(req.http.User-Agent)}V",
12 "response_state": "%{json.escape(fastly_info.state)}V",
13 "response_status": %{resp.status}V,
14 "response_reason": %{if(resp.response, "%22"+json.escape(resp.response)+"%22", "null")}V,
15 "response_body_size": %{resp.body_bytes_written}V,
16 "fastly_server": "%{json.escape(server.identity)}V",
17 "fastly_is_edge": %{if(fastly.ff.visits_this_service == 0, "true", "false")}V
18}

翻訳についての注意事項
このガイドは役に立ちましたか?

このフォームを使用して機密性の高い情報を送信しないでください。サポートが必要な場合は、サポートチームまでご連絡ください