Kafka から Hadoop にデータを素早くインポートするにはどうすればよいでしょうか?

Kafka から Hadoop にデータを素早くインポートするにはどうすればよいでしょうか?

Kafka は、強力な分散機能とパフォーマンス機能により、データ パイプラインの重要な部分として急速に普及した分散パブリッシュ/サブスクライブ システムです。メッセージング、メトリック収集、ストリーム処理、ログ集約など、さまざまなことを実行します。 Kafka のもう 1 つの効果的な使用法は、データを Hadoop にインポートすることです。 Kafka を使用する主な理由は、データ プロデューサーとコンシューマーが分離され、複数の独立したプロデューサー (異なる開発チームによって記述される可能性あり) を使用できるようになることです。同様に、独立したコンシューマーも複数存在します (異なるチームによって記述されている可能性もあります)。さらに、コンシューマーはリアルタイム/同期またはバッチ/オフライン/非同期にすることができます。後者の特性は、RabbitMQ などの他の pub-sub ツールと比較すると大きな違いを生みます。

Kafka を使用するには、理解しておく必要がある概念がいくつかあります。

  • トピック - トピックは関連メッセージのサブスクリプション ソースです。
  • パーティション - 各トピックは 1 つ以上のパーティションで構成されます。パーティションは、ログ ファイルによってバックアップされる順序付けられたメッセージ キューです。
  • プロデューサーとコンシューマー - プロデューサーとコンシューマーは、パーティションにメッセージを書き込み、パーティションからメッセージを読み取ります。

ブローカー — ブローカーは、トピックとパーティションを管理し、プロデューサーとコンシューマーのリクエストに対応する Kafka プロセスです。

Kafka はトピックの「完全な」順序付けを保証するのではなく、トピックを構成するパーティションが順序付けられていることのみを保証します。コンシューマー アプリケーションは、必要に応じて「グローバル」トピックの順序付けを強制できます。

図5.14はKafkaの概念モデルを示している。

図5.15は、Kafkaで分散パーティションをデプロイする方法の例を示しています。

フォールト トレランスをサポートするために、トピックを複製することができます。つまり、各パーティションは、異なるホスト上に構成可能な数のレプリカを持つことができます。これにより、耐障害性が向上し、単一のサーバーが停止しても、データやプロデューサーとコンシューマーの可用性に壊滅的な影響が及ぶことはありません。

ここでは、Kafka バージョン 0.8 と Camus バージョン 0.8.X が使用されています。

練習: Camus を使用して Kafka から HDFS に Avro データをコピーする

この手法は、すでに他の目的で Kafka にデータをストリーミングしていて、そのデータを HDFS に格納したい場合に役立ちます。

質問

データを HDFS にインポートするためのデータ配信メカニズムとして Kafka を使用したいと考えています。

解決

Kafka のデータは、LinkedIn が開発したソリューションである Camus を使用して HDFS に複製されます。

話し合う

Camus は LinkedIn が開発したオープンソース プロジェクトです。 LinkedIn では Kafka が広く導入されており、Camus は Kafka から HDFS にデータをコピーするために使用されます。

Camus は、Kafka で JSON と Avro の 2 つのデータ形式をすぐにサポートします。この手法では、Camus を介して Avro データを使用します。 Camus の Avro の組み込みサポートでは、Kafka パブリッシャーが独自の方法で Avro データを記述する必要があるため、この手法では、Kafka で標準のシリアル化データを使用することを前提としています。

この手法を機能させるには、3 つの部分が必要です。まず、Avro データを Kafka に書き込み、次に Camus が Avro データを逆シリアル化できるようにする簡単なクラスを記述し、最後に Camus ジョブを実行してデータのインポートを実行します。

Avro レコードを Kafka に書き込むには、次のコードで、必要な Kafka プロパティを構成して Kafka プロデューサーを設定し、ファイルからいくつかの Avro レコードをロードして、それらを Kafka に書き出す必要があります。

次のコマンドを使用して、test という名前の Kafka トピックにサンプル データをロードできます。

Kafka コンソール コンシューマーを使用すると、データが Kafka に書き込まれたことを確認できます。これにより、バイナリ Avro データがコンソールにダンプされます。

それができたら、Camus でこれらの Avro レコードを読み取れるように Camus コードをいくつか記述します。

実践: カミュとスキーマレジストリの書き方

まず、カミュの 3 つの概念を理解する必要があります。

  • デコーダー — デコーダーの役割は、Kafka から抽出された生データを Camus 形式に変換することです。
  • エンコーダー — エンコーダーは、デコードされたデータを HDFS に保存される形式にシリアル化します。
  • スキーマ レジストリ - エンコードされる Avro データに関するスキーマ情報を提供します。

前述のように、Camus は Avro データをサポートしていますが、Kafka プロデューサーが Camus の KafkaAvroMessageEncoder クラスを使用してデータを書き込む必要があります。このクラスは、Avro シリアル化されたバイナリ データに独自のデータを追加します。これは、Camus のデコーダーが、そのクラスによって書き込まれたことを検証できるようにするためだと考えられます。

この例では、シリアル化に Avro シリアル化が使用されているため、独自のデコーダーを作成する必要があります。幸いなことに、これは簡単です:

Kafka では特定の Avro レコードを書き込んだのに、Camus ではそのレコードを特定の Avro レコードではなく汎用 Avro レコードとして読み取ることに気づいたかもしれません。これは、CamusWrapper クラスが汎用 Avro レコードのみをサポートしているためです。それ以外の場合、生成されたコードは、付属するすべてのセキュリティ機能とともに使用できるため、特定の Avro レコードの使用がはるかに簡単になります。

CamusWrapper オブジェクトは Kafka からデータを抽出します。このクラスが存在する理由は、タイムスタンプ、サーバー名、サービス詳細などのメタデータをエンベロープに貼り付けることができるようにするためです。使用されるデータには、各レコードに意味のあるタイムスタンプを関連付けることを強くお勧めします (通常、これはレコードが作成または生成された時刻になります)。次に、タイムスタンプをパラメーターとして受け入れる CamusWrapper コンストラクターを使用できます。

  1. public CamusWrapper(R レコード、長いタイムスタンプ) {...}

タイムスタンプが設定されていない場合、Camus はラッパーの作成時に新しいタイムスタンプを作成します。このタイムスタンプとその他のメタデータは、出力レコードの HDFS の場所を決定するときに Camus で使用されます。

次に、Camus Avro エンコーダーが HDFS に書き込まれる Avro レコードのスキーマの詳細を認識できるように、スキーマ レジストリを作成する必要があります。スキーマを登録するときに、Avro レコードを取得する Kafka トピックの名前も指定します。

ラン・カミュ

Camus は、Kafka データをインポートする Hadoop クラスター上で MapReduce ジョブとして実行されます。 Camus に提供する必要のあるプロパティが多数あります。これは、コマンド ラインまたはプロパティ ファイルを使用して実行できます。次の手法を使用してプロパティ ファイルを使用します。

プロパティからわかるように、Camus にどのトピックをインポートするかを明示的に指示する必要はありません。 Camus は Kafka と自動的に通信して、トピック (およびパーティション) と現在の開始オフセットと終了オフセットを検出します。

インポートされたトピックを正確に制御したい場合は、kafka.whitelist.topics と kafka.blacklist.topics を使用して、それぞれホワイトリスト (トピックを制限) とブラックリスト (トピックを除外) をリストできます。区切り文字としてカンマを使用して複数のトピックを指定できます。次の例に示すように、正規表現もサポートされており、トピック「topic1」または「abc」で始まりその後に 1 つ以上の数字が続く任意のトピックと一致します。値とまったく同じ構文を使用してブラックリストを指定できます。

  1. kafka.whitelist.topics=トピック1、abc[0-9]+

プロパティがすべて設定されたら、Camus ジョブを実行できます。

これにより、Avro データが HDFS に保存されることになります。 HDFS に何が含まれているか見てみましょう。

最初のファイルにはインポートされたデータが含まれ、他のファイルは Camus によって管理されます。

HDFS 内のデータ ファイルは、AvroDump ユーティリティを使用して表示できます。

では、Camus ジョブが実行されると、具体的に何が起こるのでしょうか? Camus インポート プロセスは、図 5.16 に示すように、MapReduce ジョブとして実行されます。

MapReduce の Camus タスクが成功すると、Camus OutputCommitter (タスクの完了時にカスタム作業を実行できるようにする MapReduce 構造) がタスクのデータ ファイルをアトミックに宛先ディレクトリに移動します。 OutputCommitter は、タスクによって処理されるすべてのパーティションのオフセット ファイルも作成します。同じジョブ内の他のタスクが失敗する可能性がありますが、これは成功したタスクの状態には影響しません。成功したタスクのデータとオフセット出力はそのまま残っているので、後続の Camus 実行では、最後に成功した状態から処理が再開されます。

次に、Camus がデータをインポートする場所と動作を制御する方法を見てみましょう。

データの分割

先ほど、Camus が Kafka にある Avro データをインポートすることを確認しました。図 5.17 に示す HDFS パス構造を詳しく見て、場所を特定するために何ができるかを見てみましょう。

図5.17 HDFSにエクスポートされたデータを解析するためのCamus出力パス

パスの日付/時刻は、CamusWrapper から抽出されたタイムスタンプによって決定されます。 MessageDecoder の Kafka レコードからタイムスタンプを抽出し、CamusWrapper に渡すことが可能です。これにより、データは、デフォルトの Kafka レコードが MapReduce で読み取られた時刻ではなく、意味のある日付でパーティション分割されるようになります。

Camus はプラグ可能なパーティショナーをサポートしており、図 5.18 に示すパスの一部を制御できます。

図5.18 Camusパーティションパス

Camus Partitioner インターフェースには、実装する必要がある 2 つのメソッドが用意されています。

たとえば、カスタム パーティショナーは Hive パーティショニング用のパスを作成できます。

要約する

Camus は、HDFS 内の Kafka からデータを取得するための完全なソリューションを提供し、問題が発生した場合の状態の維持とエラー処理を担当します。 Azkaban または Oozie と統合することで、簡単に自動化し、メッセージ時間に基づいて HDFS データを整理することでシンプルなデータ管理を実行できます。 ETL に関しては、Flume と比較してその機能が比類のないものであることは特筆に値します。

Kafka には、データを HDFS に取り込むためのメカニズムがバンドルされています。 MapReduce ジョブで Kafka からデータを抽出するために使用できる KafkaETLInputFormat 入力形式クラスがあります。インポートを実行するには MapReduce ジョブを記述する必要がありますが、データの中間ストレージとして HDFS を使用する代わりに、データを MapReduce フロー内で直接使用できるという利点があります。次に、Hadoop にあるデータをファイルシステムなどの他のシステムに転送する方法について説明します。

<<:  クラウドデータ管理を解き放つ4つの鍵

>>:  NetEase Cloudのリアルタイムオーディオフレームワークの背後にあるアルゴリズムの最適化により、製品エクスペリエンスが全面的に向上

推薦する

新しい SEO 時代: SEO 担当者の皆さん、まだインターネットのゴミを作成し続けていますか?

2003年頃、Baiduが設立されたばかりで、ネット上に情報がほとんどなかったため、ウェブサイトを構...

モノのインターネットは、今後10年間でテクノロジー大手にとってホットな話題となるだろう

現在、エッジ コンピューティング市場はまだ開発の初期段階にあります。クラウドコンピューティング市場を...

スパイダーの気質と原則を分析してウェブサイトの掲載をガイドする

検索エンジンの巡回ロボットであるスパイダーは、毎日あらゆる場所を歩き回り、制限なくあちこちをつかんで...

6月28日の百度騒動でウェブマスターはどうすれば安心できるか

最近、Baidu のアルゴリズムが頻繁に調整および更新されたため、多くの Web サイトのランキング...

企業ウェブサイトランキングの不安定さはウェブサイトのキーワードに関連している

ショートビデオ、セルフメディア、インフルエンサーのためのワンストップサービス今日の検索エンジンは、最...

Baiduオリジナル記事と転載検出アルゴリズム

インターネットの急速な発展に伴い、ネットワーク上には重複したリソース ファイルが大量に存在します。た...

中小企業に利益をもたらすクラウド コンピューティングの 5 つの活用方法

あなたのビジネスでクラウドを利用すべきかどうかお悩みですか?クラウド コンピューティングの最大のメリ...

tmhhostはどうですか?米国AS9929ハイエンドネットワークVPSの簡単なレビュー

tmhhostはどうですか? tmhhostのUS VPSは、AS4809(CN2 GIAネットワー...

ウェブサイトのランキングの「可愛さ」から判断すると、コンテンツが多いだけでは十分ではありません。

QQタイプのウェブサイトを運営している友人なら誰でも、「Kwai Dian」というウェブサイトに注目...

サービスメッシュと OpenTelemetry の連携を探る: 分散トレース

この記事が公開された後、一部の読者から javaagent の「非侵入性」についてコメントがありまし...

第12回ウェブサイトはすぐにホームページ日記を​​集めました

多くのインターネット企業や、自社のウェブサイトを構築し最適化している友人は、実はキーワードは顧客によ...

コンテンツが王様の時代:検索エンジンはブログをターゲットにするでしょうか?

コンテンツこそが王様だということは、SEO の初期の頃から言われてきました。これまでの SEO に関...

デジタル変革は必須です。企業はどのようにこれを活用できるでしょうか?

[[398721]] COVID-19の流行の影響を経験した後、ほぼすべての企業がデジタル変革の重要...

2018年の中国のオンラインオーディオコンテンツ消費市場の分析

現在、国内のオンラインオーディオコンテンツ消費市場は急速な発展段階にあります。 Analysys I...

悪魔は細部に宿る:インタラクティブなウェブボタンのサイズ

製品がコア要件を満たしたら、詳細の作業をゆっくりと開始できます。製品のほぼすべてのレベルで、表面上見...