Kafka にまた問題が発生しました!

Kafka にまた問題が発生しました!

[[384383]]

著者は、正確にスケジュールされたタスクと遅延キュー処理機能を備えた、高同時実行シナリオ向けのシンプルで安定したスケーラブルな遅延メッセージ キュー フレームワークを個人的に開発しました。半年以上前にオープンソース化されて以来、10 社を超える中小企業に正確でタイムリーなスケジューリング ソリューションを提供することに成功し、実稼働環境でのテストにも耐えてきました。より多くの人々の利益のために、オープンソース フレームワークのアドレスが提供されるようになりました: https://github.com/sunshinelyz/mykit-delay

序文

運営・保守が新年を迎える前にサーバーを拝んでいなかったものと推定されます。 Nginx の問題は修正されましたが、Kafka は再び動作しません。今日はもう少し寝たかったのですが、また電話が鳴りました。それはまだ操作でした、「こんにちは、Binghe、会社に到着しましたか?すぐにサーバーを確認してください、再び問題が発生しています。」 「今向かっています。運用保守担当者はまだ出勤していないのですか?」 「まだ休暇中…」、私:「…」おい、こいつは逃げたのか?今のところ彼を無視しましょう。問題はまだ解決する必要があります。

問題の再現

会社に到着後、私は専用のバックパックを置き、武器であるノートパソコンを取り出し、電源を入れてすぐに監視システムにログインし、メインの業務システムに問題がないことを確認しました。コア以外のサービスがアラームを発行し、監視システムではこのサービスが次の例外を頻繁にスローしていることが示されました。

  1. 2021-02-28 22:03:05 131 プール-7-スレッド-3 エラー [] -
  2. コミットに失敗しました
  3. org.apache.kafka.clients.consumer.CommitFailedException:グループがすでに再バランス調整されパーティションが別のメンバー割り当てられているため、コミットを完了できません。これは、時間 後続のpoll()呼び出し間隔が、設定された最大値.poll.interval.ms よりも長くなっていました。これは通常、ポーリング ループがメッセージ処理に時間がかかりすぎることを意味ます。これを解決するにはセッションタイムアウトを増やすか、  最大サイズを縮小することで  poll()返されるバッチ 最大ポーリングレコード数。
  4. org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest (ConsumerCoordinator.java:713) ~[MsgAgent-jar- with -dependencies.jar:na]
  5. org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596)~[MsgAgent-jar- with -dependencies.jar:na]
  6. org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218)~[MsgAgent-jar- with -dependencies.jar:na]
  7. com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121)~[MsgAgent-jar- with -dependencies.jar:na]
  8. java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
  9. java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
  10. java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

上記の例外情報出力から、システムの問題を大まかに判断できます。ポーリング メッセージのバッチを処理した後、Kafka コンシューマーはブローカーにオフセットを同期的に送信するときにエラーを報告しました。これはおそらく、Kafka がコンシューマーがクラッシュしたと判断し、現在のコンシューマー スレッドのパーティションがブローカーによって再利用されたためであり、これは以下の出力情報から確認できます。

  1. グループはすでに再バランス調整されパーティションが別のメンバー割り当てられているため、コミットを完了できません。これは、時間 後続のpoll()呼び出し間隔が、設定された最大値.poll.interval.ms よりも長くなっていました。これは通常、ポーリング ループがメッセージ処理に時間がかかりすぎることを意味ます。これを解決するにはセッションタイムアウトを増やすか、  最大サイズを縮小することで  poll()返されるバッチ 最大ポーリングレコード数。

Kafka 内で再バランス メカニズムが内部的にトリガーされ、問題が特定されます。次に、問題の分析を始めます。

問題を分析する

Kafka は Rebalance メカニズムをトリガーするので、Kafka が Rebalance をトリガーするタイミングについて説明します。

リバランスとは

より具体的な例を挙げると、グループの下に 10 個の Consumer インスタンスがあり、このグループは 50 個のパーティションを持つトピックをサブスクライブします。通常、Kafka は各コンシューマーに 5 つのパーティションを割り当てます。この分配プロセスはリバランスと呼ばれます。

リバランスをトリガーするタイミング

Kafka で次の条件が満たされると、再バランスがトリガーされます。

  • グループ内のメンバー数が変更されました。たとえば、新しい消費者が消費者グループに参加したり、消費者グループから脱退したりしました。コンシューマー グループを離れるグループ メンバーには、クラッシュしたグループ メンバーや積極的にコンシューマー グループを離れるグループ メンバーが含まれます。
  • 購読されているトピックの数が変更されました。
  • サブスクライブされたトピックのパーティション数が変更されました。

後者の 2 つの状況は人為的に回避できます。実際の作業では、Kafka リバランスの最も一般的な理由は、コンシューマー グループ メンバーの変更です。

消費者メンバーの通常の追加と削除は、避けられないリバランスにつながります。ただし、場合によっては、コンシューマー インスタンスがコーディネーターによって誤って「停止」されたと判断され、グループから「追い出され」、再バランスが発生する可能性があります。

コンシューマー グループが再バランスを完了すると、各コンシューマー インスタンスは、まだ稼働していることを示すハートビート要求をコーディネーターに定期的に送信します。コンシューマー インスタンスがこれらのハートビート要求をタイムリーに送信できない場合、コーディネーターはコンシューマーを「デッド」と見なし、グループから削除して、新しいラウンドの再バランスを開始します。この時間は、コンシューマー側のパラメータ session.timeout.ms を通じて設定できます。デフォルト値は 10 秒です。

このパラメータに加えて、Consumer はハートビート要求の送信頻度を制御するパラメータ (heartbeat.interval.ms) も提供します。この値が小さいほど、コンシューマー インスタンスがハートビート要求を送信する頻度が高くなります。ハートビート要求を頻繁に送信すると追加の帯域幅リソースが消費されますが、コーディネーターが各コンシューマー インスタンスにリバランスを有効にするように通知する現在の方法は、ハートビート要求の応答本文に REBALANCE_NEEDED フラグをカプセル化することであるため、リバランスが現在有効になっているかどうかをより迅速に知ることができるという利点があります。

上記の 2 つのパラメータに加えて、コンシューマー側には、コンシューマーの実際の消費容量がリバランスに与える影響を制御するために使用されるパラメータ、つまり max.poll.interval.ms パラメータもあります。これは、コンシューマー アプリケーションによるポーリング メソッドへの 2 回の呼び出し間の最大時間間隔を制限します。デフォルト値は 5 分です。つまり、コンシューマー プログラムが 5 分以内にポーリング メソッドによって返されたすべてのメッセージを消費できない場合、コンシューマーは「グループを離れる」要求を積極的に開始し、コーディネーターは新しいラウンドの再バランスを開始します。

上記の分析により、どのリバランスを回避できるかがわかります。

最初のタイプの不要なリバランスは、ハートビートを時間内に送信できないことが原因で発生し、その結果、コンシューマーがグループから「追い出され」ます。この場合、再バランスをできるだけ回避するために、session.timeout.ms と heartbeat.interval.ms の値を設定できます。 (以下の構成はインターネットで見つかったベストプラクティスであり、まだテストされていません)

  • session.timeout.ms = 6s に設定します。
  • heartbeat.interval.ms = 2s に設定します。
  • Consumer インスタンスが「デッド」と判断される前に少なくとも 3 ラウンドのハートビート要求を送信できるようにします。つまり、session.timeout.ms >= 3 * heartbeat.interval.ms です。

session.timeout.ms を 6 秒に設定する主な目的は、コーディネーターが電話を切ったコンシューマーをより迅速に見つけ、できるだけ早くグループから追い出せるようにすることです。

2 番目のタイプの不必要な再バランスは、消費者が長期間支出を続けることで発生します。このとき、max.poll.interval.ms パラメータ値の設定が特に重要になります。予期しない再バランスを回避するには、このパラメータ値を、ダウンストリームの最大処理時間よりもわずかに長い大きな値に設定することをお勧めします。

つまり、ビジネス処理ロジックに十分な時間を残しておくということです。この方法では、これらのメッセージの処理に時間がかかりすぎるため、コンシューマーはリバランスをトリガーしません。

オフセットのプルとコミット

Kafka のオフセットはコンシューマーによって管理されます。オフセットには、プル オフセット (位置) とコミット オフセット (コミット済み) の 2 種類があります。プル オフセットは、現在のコンシューマー パーティションの消費の進行状況を表します。各メッセージが消費された後、オフセットを送信する必要があります。オフセットをコミットするとき、Kafka はプルされたオフセットの値をパーティションのコミットされたオフセットとして使用し、それをコーディネータに送信します。

オフセットが送信されない場合、コンシューマーが次にブローカーに再接続したときに、現在のコンシューマー グループによってブローカーに送信されたオフセットから消費を開始します。

つまり、問題はここにあります。メッセージの処理時間が長すぎると、ブローカーによってメッセージが削除され、オフセットを送信するとエラーが発生します。したがって、プルされたオフセットはブローカーに送信されず、パーティションは再バランスされます。次にパーティションが再割り当てされると、コンシューマーは最新のコミットされたオフセットから消費を開始します。ここで重複消費の問題が発生します。

異常なログプロンプトの解決方法

実際、ここまで述べたように、対応する解決策は、Kafka コンシューマーによって出力される例外ログにも示されています。

次に、Kafka のプル オフセットとコミット オフセットについて説明します。

実際、出力されたログ情報からも、問題の大まかな解決策がわかります。簡単に言えば、max.poll.interval.ms と session.timeout.ms の期間を増やし、max.poll.records の構成値を減らし、コンシューマーはメッセージを処理した後に時間内にオフセットを送信する必要があります。

問題解決

これまでの分析を通じて、この問題をどのように解決するかがわかるはずです。ここで言及する必要があるのは、Kafka を統合したときに、SpringBoot と Kafka コンシューマー リスナーを使用したことです。コンシューマー側の主なコード構造は次のとおりです。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS、パーティション = { "0" }) }、グループ ID = "kafka-consumer" 、コンテナ ファクトリ = "kafkaListenerContainerFactory" )
  2. パブリックvoid consumerReceive (ConsumerRecord<?, ?> レコード、確認応答 ack){
  3. logger.info( "トピックは{}、オフセットは{}、値は{} n" 、record.topic()、record.offset()、record.value());
  4. 試す {
  5. オブジェクト値 = record.value();
  6. logger.info(値.toString());
  7. ack.acknowledge();
  8. } キャッチ (例外 e) {
  9. logger.error( "ログコンシューマー例外: {}" , e);
  10. }
  11. }

上記のコードのロジックは比較的単純で、Kafka 内のメッセージを取得した後、それをログ ファイルに直接出力します。

解決してみる

ここでは、まず例外ログのプロンプト情報に合わせて設定を行うため、SpringBoot の application.yml ファイルに以下の設定情報を追加しました。

  1. 春:
  2. カフカ:
  3. 消費者:
  4. プロパティ:
  5. 最大ポーリング間隔(ミリ秒): 3600000
  6. 最大投票レコード数: 50
  7. セッションタイムアウト: 60000
  8. ハートビート間隔: 3000

構成が完了したら、コンシューマー ロジックを再度テストし、Rebalance 例外が引き続きスローされることを確認します。

最終解決策

Kafka コンシューマーの問題を別の観点から見てみましょう。1 つのコンシューマーがメッセージを生成し、別のコンシューマーがそのメッセージを消費します。同じグループ ID の下に配置することはできません。いずれかのグループ ID を変更するだけです。

ここでは、ビジネス プロジェクトはモジュールとサブシステムで開発されます。たとえば、モジュール A がメッセージを生成し、モジュール B がモジュール A によって生成されたメッセージを消費します。この時点で、session.timeout.ms: 60000 などの構成パラメータを変更してもまったく効果はなく、依然として Rebalance 例外がスローされます。

この時点で、コンシューマグループのgroupIdを変更しようとし、次のコードを変更しました。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS、パーティション = { "0" }) }、グループ ID = "kafka-consumer" 、コンテナ ファクトリ = "kafkaListenerContainerFactory" )
  2. パブリックvoid consumerReceive (ConsumerRecord<?, ?> レコード、確認応答 ack){

コードを次のように変更します。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS、パーティション = { "0" }) }、グループ ID = "kafka-consumer-logs" 、コンテナ ファクトリ = "kafkaListenerContainerFactory" )
  2. パブリックvoid consumerReceive (ConsumerRecord<?, ?> レコード、確認応答 ack){

もう一度テストすると、問題は解決しました~~

この記事はWeChatの公開アカウント「Glacier Technology」から転載したものです。下のQRコードからフォローできます。この記事を転載する場合は、Glacier Technology 公式アカウントまでご連絡ください。

<<:  Kubernetes をバックアップするための 5 つのベスト プラクティス

>>:  図解分散システム - 上級プログラマーへの道

推薦する

Baidu のクリック原理を使用してキーワードランキングを向上させる

Baidu のクリック原則は、2012 年初頭のアルゴリズム更新後に導入されたアルゴリズム ルールで...

ウェブサイトのプロモーションをより効果的にするために、さまざまなものを活用する

皆さんは、このことわざを聞いたことがあると思います。「車と馬を借りた者は、足は速くないが、千里を行く...

QingCloud が CNCF に参加: オープンソース エコシステムに貢献し、クラウドネイティブ アプリケーションの推進を継続

エンタープライズレベルのフルスタッククラウドICTサービスプロバイダーであるQingCloudは最近...

オンライン教育のもう一つのルート: Hujiang.com のサンプル

インターネット教育の核となるのは、テクノロジーだけではなく、人間同士の交流です。 Hujiang.c...

SEOで見落とされがちな4つの運用エラー

おそらく、多くの初心者SEO担当者は、最適化の役割はウェブサイトへのトラフィックを増やすことだという...

中国越境電子商取引産業調査レポート

中国の越境電子商取引は急速に成長しており、過去5年間の取引量の年平均成長率は16.2%に達し、対外貿...

年間100元以下の最も人気の海外格安クラウドサーバーベンダー

国内のクラウドサーバーは比較的高価なので、より安価なクラウドサーバーを使いたい場合には、やはり海外の...

モバイルインターネットの到来、企業のマーケティングは挑戦に備える必要がある

最近、インターネットでは「お父さん、どこへ行くの?」のようにモバイルインターネットが話題になっており...

クラウド構成エラーは、主に企業のIT管理者のせいにされる

クラウド セキュリティに関しては、特に組織が最小権限アクセス モデルから逸脱した場合、従業員は組織に...

携帯電話カードの悪意あるカードのすり替えと詐欺の脆弱性

中国移動は2か月連続で私にテキストメッセージを送り、「携帯電話番号を実名で登録する」ことと、電話料金...

中小企業がイベントマーケティングをうまく行う方法を共有する

現在、ますます多くの企業がオンライン マーケティングに注目していますが、オンライン マーケティングに...

教育ウェブサイトの発展機会をつかむ4つの方向性

オンラインの世界で最も大きなグループは何かと問われれば、それは間違いなく若者であり、その若者の多くは...

ライブストリーミングが618の変化を牽引:DouyinとKuaishouは積極的、一方商人は冷静

要点ライブストリーミング販売ブームは引き続き熱を帯びており、今年の伝統的な電子商取引プロモーションフ...

競合他社を顧客として扱うことで、ランキングがより高いレベルに上がります

現在、SEO実践者はますます増えており、SEOはほぼすべての業界に存在します。一部の業界では、特定の...

マルチクラウド戦略を成功させるための 7 つのヒント

ますます多くの企業にとって、クラウドへの移行は、AWS、Microsoft Azure、またはその他...