Kafka データバックログとデータ重複処理のケース

Kafka データバックログとデータ重複処理のケース

データのバックログとデータの重複は、Kafka をメッセージング システムとして使用する場合によく発生する問題です。これらの問題を扱った例をいくつか示します。

データバックログ処理:

  • コンシューマーの数を増やす: データのバックログが深刻な場合は、コンシューマー インスタンスの数を増やして消費速度を上げることができます。
 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "consumer-group"); // 增加消费者数量props.put("max.poll.records", 500); // 每次拉取的最大记录数props.put("max.partition.fetch.bytes", 1048576); // 每次拉取的最大字节数KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 处理消息} }
  • コンシューマー グループのパーティション割り当て戦略を調整します。Kafka は、トピックのパーティションをコンシューマー グループ内のコンシューマー インスタンスに割り当てます。パーティション割り当て戦略を調整することで、各コンシューマー インスタンスがバランスの取れた数のパーティションを処理するようになり、全体的な消費容量が向上します。
 consumer.subscribe(Collections.singletonList("topic"), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 在重新分配分区之前,进行一些清理工作} @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 在分配新的分区之后,进行一些初始化工作} });
  • コンシューマー処理機能の向上: メッセージのバッチ処理、マルチスレッド、非同期処理などのコンシューマー ロジックを最適化して、コンシューマー処理速度を向上させます。
 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); List<SomeRecord> batch = new ArrayList<>(); for (ConsumerRecord<String, String> record : records) { SomeRecord processedRecord = processRecord(record); batch.add(processedRecord); if (batch.size() >= 100) { // 批量处理消息saveBatchToDatabase(batch); batch.clear(); } } if (!batch.isEmpty()) { // 处理剩余的消息saveBatchToDatabase(batch); } }
  • Kafka クラスターを拡張する: Kafka ブローカー ノードとパーティションを追加して、全体的なメッセージ処理能力を向上させます。

データ重複処理:

  • メッセージの一意の識別子を使用する: プロデューサー側で各メッセージに一意の識別子を設定し、コンシューマーはメッセージを処理するときにその識別子に基づいてメッセージを重複排除できます。メッセージ内のフィールドを使用したり、メッセージの識別子としてグローバル一意識別子 (GUID) を生成したりできます。
 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { String messageId = record.key(); if (!isMessageProcessed(messageId)) { // 处理消息processRecord(record); // 标记消息为已处理markMessageAsProcessed(messageId); } } }
  • トランザクションを使用する: メッセージの処理にデータ変更操作が含まれる場合は、Kafka のトランザクション機能を使用して、メッセージの冪等性と一貫性を確保できます。
 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "consumer-group"); // 设置事务ID props.put("transactional.id", "kafka-transactional-id"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("topic")); consumer.beginTransaction(); try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 处理消息processRecord(record); } consumer.commitTransaction(); } catch (Exception e) { consumer.abortTransaction(); }
  • コンシューマー側での重複排除: データベースやキャッシュなどを使用して、コンシューマー側で処理されたメッセージの記録を保持します。メッセージを受信するたびに、まずレコードを照会します。レコードがすでに存在する場合は、メッセージを無視します。
 Set<String> processedMessages = new HashSet<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { String messageId = record.key(); if (!processedMessages.contains(messageId)) { // 处理消息processRecord(record); // 添加到已处理消息集合processedMessages.add(messageId); } } }
  • コンシューマー側でのべき等性処理: 重複したメッセージを受信して​​も最終的な処理結果が一貫していることを確認するために、コンシューマー側のビジネス ロジックにべき等性を実装します。

データ バックログとデータ重複の問題に対するソリューションは、特定のビジネス ニーズとシステム条件に基づいて調整および最適化する必要があります。さらに、監視および測定システムも非常に重要であり、データのバックログや重複の問題を迅速に検出して解決するのに役立ちます。

<<:  安徽国際ビジネス専門学校のクラウド変革が明らかに:PC島からクラウド大陸への変革

>>:  Byte インタビュー: MQ メッセージ バックログ問題を解決するには?

推薦する

ハイブリッドクラウドを構成する方法

IT リーダーは、ハイブリッド クラウド環境に向けてネットワーク チームとインフラストラクチャ チー...

「自動運転でハンドルをなくそう」——イーステクノロジーの実用化への道

「無人運転車ではハンドルが不要になるのか?」この文は少し奇妙に思えます。自動運転車とハンドルは矛盾し...

ガートナーは、世界のパブリッククラウドのエンドユーザーの支出が2021年に23%増加すると予測している。

情報技術調査・コンサルティング会社であるガートナーは、2020年のパブリッククラウドIaaSの市場デ...

zji: 台湾 cn2 (8 コア) 専用サーバー - 595 元/月、香港 Alibaba 専用サーバー - 700 元/月、高速 CN2 直接アクセス、登録不要

zji.net は現在、台湾の cn2+bgp 回線に接続された台湾サーバー (物理マシン) と、香...

クラウドを超えた持続可能なコンピューティング

翻訳者 |トゥ・チェンイエ工業情報化部が「2022年中国コンピューティングパワー大会」で明らかにした...

年齢の異なるウェブサイトはどのようなプロモーションプランを採用すべきでしょうか?

ウェブサイトのプロモーションは長期的な作業です。通常の方法で継続的にプロモーションすることによっての...

鉄道省:12306以外のウェブサイトでは列車の切符の購入は認められていない

テンセントテクノロジーニュース4月6日:JD.comが鉄道チケット事業に参入した後、鉄道部運輸局は最...

ブログマーケティングはソフト記事マーケティングの道をたどるべきではない

Baidu 百科事典では、ブログ マーケティングを次のように定義しています。ブログ マーケティングと...

オンラインプロモーションチャネルはたくさんありますが、どれが私たちに適しているのでしょうか?

2014年に初めてこの業界に入り、CPAに初めて触れたときのことを今でも覚えています。当時のトラフィ...

速報:新浪微博が自社メディア広告の利益分配計画を開始

Lieyun.comが3月28日に報じた。 Lieyun.comは、Weiboの上級管理職に近い人物...

「百度地震」から青大根アルゴリズム2.0まで、百度のアルゴリズムの今後のアップデート方向を分析

昨年6月の「百度地震」から今年の「青大根アルゴリズム2.0」まで、百度はいくつかの大きなアップデート...

インターネット後半では、ARM クラウドが新たな活路となるでしょうか?

2018年を振り返ると、寒くて忘れられない年でした。過去 20 年間で、インターネット従事者をこれほ...