SpringBoot は Kafka を統合して高いデータスループットを実現します

SpringBoot は Kafka を統合して高いデータスループットを実現します

1. はじめに

前回の記事では、Kafka のアーキテクチャモデルについて詳しく紹介しました。クラスター環境では、Kafka はパーティションの数を設定することでデータの消費を高速化できます。

理論を知っているだけでは十分ではなく、実際に実践する必要があります。

次回は、本番環境の実例とSpringBoot技術フレームワークをもとに、Kafkaの使い方と高いデータスループットを実現する方法を紹介します!

2. 手順の練習

最近、同社のビッグデータチームは、顧客の注文データを毎朝計算し、パフォーマンスデータを私たちにプッシュして、営業スタッフが毎日昨日のパフォーマンスデータを確認できるようにしています。データ量は約1000万です。以下は私のドッキングプロセスです。

2.1. kafka依存パッケージを追加する

このプロジェクトの SpringBoot バージョンは 2.1.5.RELEASE で、依存する kafka バージョンは 2.2.6.RELEASE です。

 https : //back-media..com/editor ? id = 707646 / h6e90be6-7 EV6kJbV

2.2. Kafka設定変数を追加する

依存パッケージを追加した後は、application.properties に kafka 構成変数を追加するだけで、基本的に通常どおり使用できます。

 #Kafka サーバーのアドレスを指定します。クラスターには、カンマで区切られた複数のアドレスを含めることができます。
spring .kafka .bootstrap - サーバー= 197.168.25.196 : 9092
#再試行回数
spring .kafka .producer .retries = 3
#一括送信するメッセージの数
spring .kafka .producer .batch -サイズ= 1000
#32MB バッチバッファ
spring .kafka .producer .buffer -メモリ= 33554432
#デフォルトのコンシューマ グループ
spring .kafka .consumer .group - id = crm -マイクロサービス- newperformance
#最も古い未使用オフセット
spring .kafka .consumer .auto -オフセット-リセット=最も早い
#一度に取得できるデータの最大量
spring .kafka .consumer .max -ポーリング-レコード= 4000
#自動的に送信するかどうか
spring .kafka .consumer .enable - 自動コミット= true
#自動送信時間間隔、単位 ms
spring .kafka .consumer .auto -コミット-間隔= 1000

2.3.消費者を作成する

 @成分
パブリッククラス BigDataTopicListener {

プライベート静的最終 Logger ログ= LoggerFactory .getLogger ( BigDataTopicListener .class ) ;

/**
* kafkaデータを聴く
* @param コンシューマーレコード
* @param ack
*/
@KafkaListener (トピック= { "big_data_topic" } )
パブリック void コンシューマー( ConsumerRecord < ? ? >コンシューマー レコード) {
log.info ( "bigData によってプッシュされたデータ '{}' を受信しました" consumerRecord.toString ( ) ) ;
// ...
// db .save ( consumerRecord ) ; //データを挿入または更新する
}

}

2.4.相手をシミュレートしてデータテストをプッシュする

 @RunWith ( SpringRunner.クラス)
@SpringBootテスト
パブリッククラス KafkaProducerTest {

オートワイヤード
private KafkaTemplate < String , String > kafkaTemplate ;

@テスト
パブリックボイドテスト送信 {
( int i = 0 ; i < 5000 ; i ++ ) {
Map <文字列オブジェクト> map = new LinkedHashMap <> ( ) ;
マップ.put ( "datekey" , 20210610 ) ;
map .put ( "userid" i ) ;
map .put ( "salaryAmount" i ) ;
// Kafka の big_data_topic トピックにデータをプッシュします
kafkaTemplate .send ( "big_data_topic" JSONObject .toJSONString ( map ) ) ;
}
}
}

最初は、この単一のデータ消費方法を使用してプログラムをテストしても何も問題はありませんでした。

しかし、運用開始後に大きな問題が発覚しました。1,000 万個のデータを処理するのに少なくとも 3 時間かかり、データ ダッシュボードにデータが不足する結果となりました。

翌日、私は自分の失敗から学び、バッチ消費モデルに切り替えることにしました。どうやってやるんですか?以下をご覧ください!

2.5. Kafka の消費モードをバッチ消費に変更する

まず、次の内容の KafkaConfiguration 構成クラスを作成します。

 @構成
パブリッククラス KafkaConfiguration {

@値( "${spring.kafka.bootstrap-servers}" )
プライベート文字列 bootstrapServers ;

@値( "${spring.kafka.producer.retries}" )
プライベート整数再試行;

@値( "${spring.kafka.producer.batch-size}" )
プライベート整数バッチサイズ;

@Value ( "${spring.kafka.producer.buffer-memory}" )
プライベート整数バッファメモリ;

@値( "${spring.kafka.consumer.group-id}" )
プライベート文字列グループID ;

@Value ( "${spring.kafka.consumer.auto-offset-reset}" )
プライベート文字列 autoOffsetReset ;

@Value ( "${spring.kafka.consumer.max-poll-records}" )
プライベート整数maxPollRecords ;

@値( "${spring.kafka.consumer.batch.concurrency}" )
プライベートInteger batchConcurrency ;

@値( "${spring.kafka.consumer.auto-commit}" )
プライベートブール値autoCommit ;

@値( "${spring.kafka.consumer.auto-commit-interval}" )
プライベート整数autoCommitInterval ;


/**
* プロデューサー設定情報
*/
@ビーン
パブリックマップ<文字列オブジェクト>プロデューサー構成( ) {
Map < String , Object > props = new HashMap <> ( ) ;
props .put ( ProducerConfig .ACKS_CONFIG "0" ) ;
props .put ( ProducerConfig .BOOTSTRAP_SERVERS_CONFIG bootstrapServers ) ;
props .put ( ProducerConfig .RETRIES_CONFIG 再試行) ;
props .put ( ProducerConfig .BATCH_SIZE_CONFIG batchSize ) ;
props .put ( ProducerConfig .LINGER_MS_CONFIG 1 ) ;
props .put ( ProducerConfig .BUFFER_MEMORY_CONFIG bufferMemory ) ;
props .put ( ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG StringSerializer .class ) ;
props .put ( ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG StringSerializer .class ) ;
プロパティを返します
}

/**
* メーカー工場
*/
@ビーン
パブリックプロデューサーファクトリー<文字列,文字列>プロデューサーファクトリー( ) {
新しい DefaultKafkaProducerFactory <> ( producerConfigs ( ) )を返します
}

/**
* プロデューサーテンプレート
*/
@ビーン
パブリック KafkaTemplate <文字列,文字列> kafkaTemplate ( ) {
新しい KafkaTemplate <> ( producerFactory ( ) )を返します
}


/**
* 消費者設定情報
*/
@ビーン
パブリックマップ<文字列オブジェクト> consumerConfigs ( ) {
Map < String , Object > props = new HashMap <> ( ) ;
props .put ( ConsumerConfig .GROUP_ID_CONFIG groupId ) ;
props .put ( ConsumerConfig .AUTO_OFFSET_RESET_CONFIG autoOffsetReset ) ;
props .put ( ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG bootstrapServers ) ;
props .put ( ConsumerConfig .MAX_POLL_RECORDS_CONFIG maxPollRecords ) ;
props .put ( ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG autoCommit ) ;
props .put ( ConsumerConfig .SESSION_TIMEOUT_MS_CONFIG 30000 ) ;
props .put ( ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG 30000 ) ;
props .put ( ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG StringDeserializer .class ) ;
props .put ( ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG StringDeserializer .class ) ;
プロパティを返します
}

/**
* 消費者向けバッチ工場
*/
@ビーン
パブリック KafkaListenerContainerFactory < ? >バッチファクトリー( ) {
ConcurrentKafkaListenerContainerFactory < Integer String >ファクトリー= new ConcurrentKafkaListenerContainerFactory <> ( ) ;
ファクトリ.setConsumerFactory (新しい DefaultKafkaConsumerFactory <> ( consumerConfigs ( ) ) ) ;
//同時実行数をトピックパーティションの数以下に設定します
ファクトリ.setConcurrency ( batchConcurrency ) ;
ファクトリ.getContainerProperties ( ) . setPollTimeout ( 1500 ) ;
ファクトリ.getContainerProperties ( ) .setAckMode ( ContainerProperties .AckMode .MANUAL_IMMEDIATE ) ;
//バッチ消費に設定され、各バッチの数は Kafka 構成パラメータConsumerConfig.MAX_POLL_RECORDS_CONFIGで設定されます
ファクトリ.setBatchListener ( true ) ;
工場を返却します
}

}

同時に、同時実行数を設定するための新しい spring.kafka.consumer.batch.concurrency 変数が追加されました。このパラメータを通じて、消費を実現するために複数のスレッドを指定できます。

application.properties 構成ファイルに、次の変数を追加します。

 #バッチ消費の同時実行数、トピックパーティションの数以下
spring .kafka .consumer .batch .concurrency = 3

#バッチプルの最大数を4000に設定する
spring .kafka .consumer .max -ポーリング-レコード= 4000

#自動送信をfalseに設定する
spring .kafka .consumer .enable - 自動コミット= false

最後に、単一消費方式をバッチ消費方式モードに変更します。

 @成分
パブリッククラス BigDataTopicListener {

プライベート静的最終 Logger ログ= LoggerFactory .getLogger ( BigDataTopicListener .class ) ;

/**
* kafka データの監視 (バッチ消費)
* @param コンシューマーレコード
* @param ack
*/
@KafkaListener (トピック= { "big_data_topic" } コンテナファクトリー= "batchFactory" )
パブリック void batchConsumer (リスト< ConsumerRecord < ? ? >> consumerRecords 確認応答 ack ) {
長い開始= System .currentTimeMillis ( ) ;

// ...
// db .batchSave ( consumerRecords ) ; //データの一括挿入または一括更新

//手動送信
ack .acknowledge ( ) ;
log .info ( "bigData によってプッシュされたデータを受信しました。プルされたデータの量: {}、消費時間: {} ミリ秒" consumerRecords .size ( ) ( System .currentTimeMillis ( ) - start ) ) ;
}

}

このとき、消費パフォーマンスが大幅に向上し、データ処理が非常に高速になります。最大30分で500万個のデータが消費される可能性があります。

この例では、コンシューマー マイクロサービスには本番環境に 3 つのサーバーがデプロイされており、big_data_topic トピックのパーティション数は 3 であるため、同時実行数を 3 に設定する方が適切です。

プッシュされるデータ量が増え続けるにつれて、消費速度が十分に速くないと感じた場合は、毎回プルされるバッチの最大数をリセットするか、マイクロサービスのクラスターインスタンス数とトピックパーティション数を水平に拡張して、データ消費を高速化することができます。

ただし、1 台のマシンで一度にプルされるバッチの最大数が大きすぎると、大きなオブジェクトも大きくなり、GC 警告が頻繁に発生します。

したがって、実際の使用においては、1 回あたりのバッチ プルの最大数は、多ければ多いほど良いというわけではありません。現在のサーバーのハードウェア構成に応じて、適切なしきい値に調整するのが最善の選択です。

3. まとめ

この記事では、主に SpringBoot 技術フレームワークを背景として、実際のビジネスニーズを組み合わせ、データ消費に kafka を使用して高いデータ スループットを実現します。次回は、消費不良の処理フローについて紹介します。

<<:  クラウド コンピューティングの進化: 「分散型クラウド」が最終形態となるか?

>>:  SaaSビジネスの成長に関する真実

推薦する

360 と Baidu の検索ボリュームの違いを確認するために 360 Index がリリースされました

最近、360はひっそりと360 Indexをリリースしました。その全体的なレイアウトと機能はBaid...

ローカルポータルの利益を最大限活用する方法 - 重要なポイント - ブランド広告

簡単な説明: 中国のすべてのローカルポータルの収益分析によると、収益の約 60% ~ 80% はロー...

いくつかの簡単な手順でDigitalOceanにスワップパーティションを追加します

統計を調べてみると、digitalocean swapを検索している人が何人かいることがわかりました...

競合サイトのSEOデータを分析し、SEO戦略を立てる方法

著者は、ウェブサイトの最適化に長年携わってきました。SEO プロジェクトを実施するたびに、競合他社の...

9月22日の百度インデックスボリュームの全体的な減少の理由の分析

9月22日、多くのウェブマスターが、Baiduのサイトコマンドを使用したところ、自分のウェブサイトの...

velocihost - $199 / GPU サーバー / ハイエンド グラフィック カード サーバー / デュアル カード クロスファイア

velocihost は、米国フロリダ州のホスティング プロバイダーです。主な業務は、通常のサーバー...

Baidu 入札で除外する必要がある IP アドレスはどれですか?

ご存知のとおり、Baidu 入札バックグラウンドには「IP 除外」という機能があります。この機能は主...

AIが新たな応用シナリオを切り開き、ファーウェイクラウドModelArts Proが新たな開発パラダイムを創出

私たちは、すべてが感知され、すべてがつながり、すべてがインテリジェントになる時代に入りつつあります。...

ウェブサイト構築後の重要なテストタスク14選

ウェブサイトが完成してオンラインになる前に、私たちはさまざまなバグのテストと処理に忙しくなります。こ...

テンセントの唐道生氏:クラウドコンピューティングの究極の価値は人々へのサービス提供

11月3日から4日にかけて、2021年テンセントデジタルエコシステムカンファレンスが武漢で正式に開催...

My Sky Media|志科の「武器」が企業のマーケティング向上を支援

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています中国インタ...

#12.12# 六易クラウド:香港CN2/ロサンゼルス高防御/CDN、「リベート+割引」ダブル割引、割引プラス割引で実物をゲット

Liuyi Cloudは「12.12」プロモーションを先行して開始しました:(1)全製品とも年間払い...

ターゲットの 10 年にわたるクラウドへの取り組みから学んだ教訓

企業がビジネスをクラウドに移行することを決定する場合、アプリケーションの再設計、計画、およびどのワー...

Baidu の大規模なアップデートからどのような教訓を得たのでしょうか?

今回の百度のアップデートは規模が大きく、多くのウェブサイトに歴史的な混乱を招いた。4、5年運営されて...

ホストオンはどうですか?ソルトレイクシティデータセンターのVPSレビューを共有する

2017年9月3日、ホステオンズは米国西海岸近郊のソルトレイクシティデータセンターでVPS事業を開始...