技術的な乾物共有: HBase のデータから Kafka への移行の実践

技術的な乾物共有: HBase のデータから Kafka への移行の実践

1. 概要

実際のアプリケーション シナリオでは、データは HBase クラスターに保存されますが、何らかの特別な理由により、データを HBase から Kafka に移行する必要があります。通常の状況では、ソース データは Kafka に送信され、その後コンシューマーがデータを処理して HBase に書き込みます。しかし、逆のプロセスを実行する場合、HBase データを Kafka に移行するにはどうすればよいでしょうか?今日は具体的な実装プロセスについてお話しします。

2. コンテンツ

一般的なビジネス シナリオは次のとおりです。データ ソースがデータを生成し、Kafka に入り、その後コンシューマー (Flink、Spark、Kafka API など) によって処理されて HBase に入ります。これは典型的なリアルタイム処理フローです。フローチャートは次のとおりです。

上記のリアルタイム処理フローは、結局のところ、データフローが順次処理されるため、データの処理が比較的容易です。しかし、このプロセスを逆にすると、いくつかの問題が発生します。

2.1 膨大なデータ

HBase の分散性とクラスターの水平拡張により、HBase 内のデータは数百億、数千億、あるいはそれ以上になることがよくあります。このレベルのデータは、このタイプの逆データフローのシナリオで、データ取得の問題という非常に厄介な問題を引き起こします。この膨大な量のデータを HBase から取得するにはどうすればよいでしょうか?

2.2 データパーティションなし

HBase は高速かつ簡単にデータを取得したり一覧表示したりできることがわかっています。ただし、Hive のようなデータ ウェアハウスのパーティショニングの概念がなく、一定期間内のデータを提供できません。最新の週のデータを抽出する場合は、テーブル全体をスキャンし、タイムスタンプをフィルタリングしてその週のデータを取得する必要がある場合があります。数が少ない場合は大きな問題にならないかもしれませんが、データ量が多い場合、HBase のテーブル全体をスキャンするのは困難です。

3. 解決策

このような逆データフローをどのように処理するか。実際、これを実現するには、HBase の Get と List の機能を使用できます。 HBase は RowKey を通じてプライマリ インデックスを構築するため、RowKey レベルでのデータ取得速度は非常に高速です。実装プロセスの詳細は次のとおりです。

データフローは上の図に示されています。以下では、各プロセスの実装の詳細と注意事項を分析します。

3.1 行キーの抽出

HBase には Rowkey 取得用のプライマリ インデックスがあることがわかっているので、この機能を使用して拡張できます。 HBase テーブルから大量のデータ内の Rowkey を抽出し、設定した抽出および保存ルールに従って、抽出した Rowkey を HDFS に保存できます。

ここで注意が必要な問題の 1 つは、HBase Rowkey の抽出です。大規模なデータレベルの Rowkey 抽出の場合は、MapReduce を使用して実装することをお勧めします。これは、HBase が提供する TableMapReduceUtil クラスのおかげで実現されます。 MapReduce タスクを通じて、HBase 内の Rowkey はマップ フェーズで指定された時間範囲に従ってフィルタリングされ、reduce フェーズで Rowkey が複数のファイルに分割され、最終的に HDFS に保存されます。

ここで質問がある学生もいるかもしれません。 Rowkey の抽出には MapReduce が使用されているので、列クラスターの下の列データを直接スキャンして処理してみませんか?ここでは、MapReduce タスクを開始すると、HBase データをスキャンするときに Rowkey のみをフィルター処理し (FirstKeyOnlyFilter を使用)、列クラスター データは処理しません。これでかなり速くなります。 HBase RegionServer への負荷も大幅に軽減されます。

  • 行列行001情報:名前行001情報:年齢行001情報:性別行001情報:sn

ここに例があります。上記の表のデータの場合、Rowkey (row001) のみを抽出する必要があります。ただし、実際のビジネス データでは、HBase テーブルには、多くの特徴的な属性 (名前、性別、年齢、ID カードなど) を持つデータが記述される場合があります。一部のビジネス データでは、1 つの列クラスターの下に 12 を超える特性が存在する場合がありますが、Rowkey は 1 つだけであり、必要な Rowkey はこの 1 つだけです。その場合、FirstKeyOnlyFilter を使用して実装するのが適切です。

  1. /**
  2. * フィルター 各行最初KVを返します
  3. * <p>
  4. * このフィルターを使用するとカウント操作をより効率的に実行できます。
  5. */

これは、最初の KV データを返すために使用される FirstKeyOnlyFilter の機能の説明です。実際に役人が数えるのに使っています。ここで少し改善して、FirstKeyOnlyFilter を使用して Rowkey を抽出します。

3.2 行キー生成

抽出された Rowkey を生成するにはどうすればいいですか? Reduce の数は、実際の桁数に基づいて決定できます。 Rowkey ファイルを生成するときは、実際のデータ量に基づいて Reduce の数を計算することをお勧めします。使いやすさを優先して、HDFS ファイルを 1 つだけ使用しないようにしてください。後でメンテナンスが難しくなります。たとえば、HBase テーブルが 100 GB の場合、それを 100 個のファイルに分割できます。

3.3 データ処理

ステップ 1 では、抽出ルールと保存ルールに従って、MapReduce を介して HBase からデータが抽出され、RowKey が HDFS に保存されます。次に、MapReduce タスクを通じて HDFS 上の Rowkey ファイルを読み取り、List を通じて HBase からデータを取得します。分解の詳細は次のとおりです。

Map フェーズでは、HDFS から Rowkey データ ファイルを読み取り、バッチ Get を通じて HBase からデータを取得し、データを組み立てて Reduce フェーズに送信します。 Reduce フェーズでは、Map フェーズからのデータが取得され、Kafka に書き込まれます。 Kafka プロデューサー コールバック関数は、Kafka に書き込まれるデータのステータス情報を取得するために使用され、ステータス情報はデータが正常に書き込まれたかどうかを判断するために使用されます。成功した場合は、成功した進行状況の統計を容易にするために、成功した Rowkey を HDFS に記録します。失敗した場合は、失敗した進行状況の統計を容易にするために、失敗した Rowkey を HDFS に記録します。

3.4 失敗と再試行

MapReduce タスクを通じて Kafka にデータを書き込むときに、失敗が発生する可能性があります。障害が発生した場合は、HDFS に Rowkey を記録するだけで済みます。タスクが完了すると、プログラムは HDFS 上に失敗した Rowkey ファイルがあるかどうかを確認します。存在する場合は、手順 3 を再度開始します。つまり、HDFS 上の失敗した Rowkey ファイルを読み取り、HBase にデータをリストし、データを処理し、最後に Kafka に書き込み、HDFS 上の失敗した Rowkey が処理されるまでこれを繰り返します。

4. 実装コード

ここで実装されているコードの量は複雑ではありません。以下は、これに基づいて変更できる疑似コードです (たとえば、Rowkey を抽出し、MapReduce を使用して Rowkey を読み取り、HBase テーブルをバッチで取得してから、Kafka に書き込むなど)。サンプルコードは次のとおりです。

  1. パブリッククラスMRROW2HDFS {
  2. 公共 静的void main(String[] args)は例外をスローします{
  3. 構成 config = HBaseConfiguration。作成する(); // HBase 構成情報
  4. ジョブ job = Job.getInstance(config, "MRROW2HDFS" );
  5. ジョブにJarByClassを設定します(MRROW2HDFS.class);
  6. ROWReducer クラスをジョブに設定します。
  7. 文字列 hbaseTableName = "hbase_tbl_name" ;
  8. スキャン scan = new Scan();
  9. スキャンキャッシュの設定(1000);
  10. スキャン.setCacheBlocks( false );
  11. scan.setFilter(新しい FirstKeyOnlyFilter());
  12. TableMapReduceUtil.initTableMapperJob(hbaseTableName、スキャン、ROWMapper.class、Text.class、Text.class、ジョブ);
  13. FileOutputFormat.setOutputPath(ジョブ、新しいパス( "/tmp/rowkey.list" )); // ストレージの行キーの HDFs パスを入力します
  14. System.exit(job.waitForCompletion( true ) ? 0 : 1);
  15. }
  16. 公共 静的クラス ROWMapper は TableMapper<Text, Text> を拡張します {
  17. @オーバーライド
  18. protected void map(ImmutableBytesWritableキー, 結果値,
  19. Mapper<ImmutableBytesWritable, Result, Text, Text>.Context コンテキスト)
  20. IOException、InterruptedException をスローします {
  21. for (セル cell : value.rawCells()) {
  22. //日付範囲をフィルターする
  23. // コンテキストを書き込みます(...);
  24. }
  25. }
  26. }
  27.   
  28. 公共 静的クラス ROWReducer は Reducer<Text,Text,Text,Text> を拡張します{
  29. プライベートテキスト結果 = 新しいテキスト();
  30.   
  31. @オーバーライド
  32. protected void Reduce(Text key , Iterable<Text> values ,Context context) は IOException、InterruptedException をスローします {
  33. for (テキスト値:​​){
  34. 結果を設定します(val);
  35. context.write(キー, 結果 );
  36. }
  37. }
  38. }
  39. }

5. まとめ

逆データ処理プロセス全体は複雑ではなく、実装は論理処理が複雑すぎない非常に基本的な MapReduce ロジックです。処理中は、いくつかの詳細を考慮する必要があります。 Rowkey が HDFS 上で生成される場合、行内にスペースが含まれることがあります。 HDFS 上の Rowkey ファイルをリストに読み込むときは、データごとにスペースをフィルタリングするのが最適です。さらに、失敗したタスクの再実行とデータの調整を容易にするために、成功した Rowkey 処理と失敗した Rowkey 処理の記録が保持されます。データ移行の進行状況と完了状況を知ることができます。同時に、Kafka Eagle 監視ツールを使用して、Kafka の書き込みの進行状況を表示することもできます。

<<:  IoTデータ管理には重要なタスクを処理するためのエッジコンピューティングが必要

>>:  ネットワーク + ストレージ + 仮想化: 新しいネットワークを構築するための 3 つの要素

推薦する

オンラインショッピング、グループ購入、ショッピングガイドから始まるクローズドビジネスループの価値と課題

多くの人が「クローズドループは誤った命題だ」と話すとき、多くの本当の業界関係者は「ははは」と反応する...

「どのVPSが優れているか」についてみんなでチャットしましょう。

どの VPS が優れていますか?どのブランドの VPS が良いですか?どの VPS の方が安いですか...

上海は「両会」のオンライン相談プラットフォームの設立を先導し、ガバナンスのデジタル変革を加速

上海はガバナンスのデジタル変革をさらに加速します。本日、第13期市政協第4回会議が開幕し、上海は正式...

未来のスマート製造業:中国とドイツが協力してスマート製造業の革新と向上を推進

【朗報】SAP中国研究所と中国科学院瀋陽オートメーション研究所が共同で提出した「適応型再構成可能生産...

ウェブマスターの皆さん、年末はキーワードの最適化に最適な時期ですので、お見逃しなく。

今日、偶然クライアントのサイトを検査したところ、非常に奇妙な現象が見つかりました。2ページ目と3ペー...

2017年第3四半期モバイルインターネット業界四半期データ調査レポート

Sogou 入力方式は、41.2% の普及率で入力方式分野をリードしており、米国でのIPOにおけるS...

ウェブサイトのブランド価値体系を言葉で表現する方法

最近では、特に草の根のウェブマスターの間で、製品について語りながらブランド価値体系を結びつけることを...

個人ウェブマスターの告白

ショートビデオ、セルフメディア、インフルエンサーのためのワンストップサービス私はウェブサイトのストア...

2022 年のクラウド コンピューティングにはどのような新しいトレンドが生まれるでしょうか?

クラウド コンピューティングは、COVID-19 パンデミックの間も企業とリモート ワーカーのつなが...

#格安 VPS# VMISS: 18 元/月、100M 帯域幅、香港 CN2、韓国 CN2、米国 CN2/米国 AS9929、日本 IIJ

カナダの会社 vmiss は主に安価な VPS サービスを提供しており、香港 VPS、韓国 VPS、...

クラウド コンピューティングの指標をアジリティの指標に変える方法

クラウド コンピューティングは進化を続ける科学であり、クラウド コンピューティングのビジネス上の利点...

国家電網清毓線超高圧送電が開始、百度スマートクラウドと連携しスマートグリッド検査ソリューションを実装

UHV とBaidu AI が融合すると、電気高速道路の安全性が確実に保証されます。 7月15日、ク...

hosteons: 5周年、すべてのVPSのトラフィックが2倍、米国に5つのデータセンター、100Gの高防御を内蔵、年間わずか16ドル

5周年を記念して、hosteonsは特別な記念イベントを開始しました。すべてのVPSのトラフィックが...

hosthatch: 米国 VPS、40% 割引 + トラフィック 4 倍、AMD シリーズ VPS - 年間 26 ドル、1T 大容量ハードディスク VPS - 年間 33 ドル

Hosthatchは、米国(ロサンゼルス、シカゴ、ニューヨークのみ)の通常のAMDシリーズおよび大容...

株主はヤフーに月曜までにCEO解任を要求

ロイター通信によると、ヘッジファンドのサード・ポイントはヤフーの投資家として、学歴を偽ったスコット・...