Golang 言語の Kafka クライアント ライブラリ Sarama

Golang 言語の Kafka クライアント ライブラリ Sarama

01. はじめに

Apache Kafka はオープンソースのメッセージング エンジン システムです。プロジェクトにおけるその主な役割は、ピーク負荷を軽減し、谷を埋めて分離することです。この記事では、Apache Kafka 用の Golang クライアント ライブラリである Sarama のみを紹介します。 Sarama は、Apache Kafka 0.8 以降用の MIT ライセンスの Golang クライアント ライブラリです。

Apache Kafka サーバーに慣れていない場合は、まず公式ドキュメントの「Getting Started」セクションを読むことをお勧めします。この記事では、Apache Kafka 2.8 バージョンを使用します。

[[397879]]

02. プロデューサー

Sarama ライブラリの AsyncProducer または SyncProducer を使用してメッセージを生成できます。ほとんどの場合、メッセージを生成するには AsyncProducer を使用することをお勧めします。チャネルを通じてメッセージを受信し、バックグラウンドで可能な限り効率的に非同期的にメッセージを生成します。

SyncProducer は、Kafka メッセージを送信した後、ACK 確認を受信するまでブロックします。 SyncProducer には 2 つの注意点があります。一般的に効率が低く、実際の耐久性の保証は Producer.RequiredAcks の構成値によって異なります。構成によっては、SyncProducer によって確認されたメッセージが失われる場合もありますが、使用方法はより簡単になります。

読者の理解を深めるために、この記事では SyncProducer をプロデューサーとして使用する方法を紹介します。 AsyncProducer をプロデューサーとして使用する方法を知りたい読者は、公式ドキュメントを参照してください。

SyncProducer をプロデューサーとして使用するサンプル コード:

  1. func sendMessage (brokerAddr []string, config *sarama.Config, topic string, value sarama.Encoder) {
  2. プロデューサー、エラー:= sarama.NewSyncProducer(brokerAddr, config)
  3. err != nil の場合 {
  4. fmt.Println(エラー)
  5. 戻る 
  6. }
  7. 遅延関数() {
  8. err = プロデューサーの場合。近い();エラー != ゼロ {
  9. fmt.Println(エラー)
  10. 戻る 
  11. }
  12. }()
  13. メッセージ:= &sarama.ProducerMessage{
  14. トピック: トピック、
  15. 値: 値、
  16. }
  17. パーティション、オフセット、エラー:=producer.SendMessage(msg)
  18. err != nil の場合 {
  19. fmt.Println(エラー)
  20. 戻る 
  21. }
  22. fmt.Printf( "パーティション:%d オフセット:%d\n" , パーティション, オフセット)
  23. }

上記のコードを読むと、ブローカーのアドレスと構成情報を指定して、NewSyncProducer() を呼び出して新しい SyncProducer を作成します。 SendMessage() を呼び出すと、指定されたメッセージが生成され、生成が成功したか失敗したかの場合にのみ戻ります。生成されたメッセージのパーティションとオフセットを返します。メッセージの生成が失敗した場合はエラーを返します。

リークを回避するには、プロデューサーで Close() を呼び出す必要があることに注意することが重要です。スコープ外になったときに自動的にガベージ コレクションされない可能性があるためです。

03. 消費者

Sarama ライブラリの Consumer または ConsumerGroup API を使用してメッセージを消費できます。読者の理解を容易にするために、この記事では Consumer を使用してメッセージを消費する方法を紹介します。

Consumer は、ブローカーからの Kafka メッセージを処理する PartitionConsumers を管理します。

メッセージを消費するコンシューマーのサンプル コード:

  1. func consumer (brokenAddr []string, topic string, partition int32, offset int64) {
  2. 消費者、エラー:= sarama.NewConsumer(brokenAddr, nil)
  3. err != nil の場合 {
  4. fmt.Println(エラー)
  5. 戻る 
  6. }
  7. 遅延関数() {
  8. err = コンシューマーの場合。近い();エラー != ゼロ {
  9. fmt.Println(エラー)
  10. 戻る 
  11. }
  12. }()
  13. パーティションコンシューマー、エラー:= consumer.ConsumePartition(トピック、パーティション、オフセット)
  14. err != nil の場合 {
  15. fmt.Println(エラー)
  16. 戻る 
  17. }
  18. 遅延関数() {
  19. err = partitionConsumer の場合。近い();エラー != ゼロ {
  20. fmt.Println(エラー)
  21. 戻る 
  22. }
  23. }()
  24. msg := range の場合、partitionConsumer.Messages() {
  25. fmt.Printf( "パーティション:%d オフセット:%d キー:%s 値:%s\n" , msg.Partition, msg.Offset, msg.Key , msg.Value)
  26. }
  27. }

上記のコードを読むと、ブローカーのアドレスと構成情報を指定して、NewConsumer() を呼び出して新しいコンシューマーを作成します。トピック、パーティション、オフセットを指定して、ConsumePartition() を呼び出して PartitionConsumer を作成します。 PartitionConsumer は、指定されたトピックとパーティションからの Kafka メッセージを処理します。

リークを防ぐために、consumer とpartitionConsumer で Close() を呼び出す必要があることに注意することが重要です。スコープ外になったときに、自動的にガベージ コレクションされない可能性があるためです。

04. 結論

この記事では主に、Apache Kafka の Golang 言語クライアント ライブラリ Sarama を使用して Kafka メッセージを生成および消費する方法を紹介します。生産者と消費者の両方に対して簡単な例を示します。さらに、Sarama ライブラリは他の多くの API も提供します。興味のある読者は、公式ドキュメントを読んで詳細を確認してください。

<<:  クラウド移行を実施する前にコストを計算する方法

>>:  Tencent Qianfanと提携し、SalesEasy PaaSプラットフォームが企業のアプリケーションのカスタマイズを支援

推薦する

XFTP チュートリアル

XFTP を使用すると、Windows ユーザーは UNIX/Linux と Windows PC ...

インターネットパーソナルブランディングネットワーク名声メソッド

段階的かつプロセス指向の方法でインターネット パーソナル ブランドを構築したい場合は、次の 9 つの...

ウェブサイトのキーワードの配置方法の簡単な分析

キーワードの配置は、SEO のサイクルや SEO の成功または失敗を直接決定する場合があります。この...

3大通信事業者の4Gアプリケーションの比較

3Gはまだ普及しておらず、4Gが本格的に普及しつつある。TD-LTEライセンスが発行されてから半年以...

予算ノード - $15/年/カスタム ISO/KVM/256MB メモリ/10GB ハードディスク/500GB トラフィック

budgetnode.com は、OVZ ベースの仮想 VPS の提供だけでは満足できなくなりました...

#干货# hosteons: VPS無料アップグレード構成、無制限のトラフィック、20G防御

hosteons VPS の所有者が新しい割引コードを導入しました。この割引コードを使用すると、ho...

DockerからPodmanへ: オープンソース、効率的、信頼性

今日の急速に進化するクラウド コンピューティングとコンテナ化された環境では、強力で信頼性の高いコンテ...

mokvm: マカオ VPS、500M 帯域幅、ネイティブ マカオ IP、Netflix を視聴可能

概要: mokvm は、主にマカオ VPS、マカオ テレコム データ センター (CTM) に従事し...

魏無慧:ビッグデータ時代の構造と抵抗

デジタル世界の発展はハッカーと密接に関係していることを多くの人が知っています。たとえば、マイクロソフ...

raksmart: 8 つの C セグメント、米国クラスタ サーバー/香港クラスタ サーバー/日本クラスタ サーバー、最低 189 ドル

米国の老舗データセンターであるRaksmartは、アメリカのクラスタサーバー、香港のクラスタサーバー...

Oracle NetSuiteは企業が期待以上の成長を達成できるよう支援します

[[261393]] Oracle NetSuite は本日、さまざまな業界の組織が成長を加速するた...

注: Baidu Webmaster Platformの「ハッキングアラートと不正行為アラート」機能が本日正式にリリースされました

A5ウェブマスターネットワークは6月5日に報道した。百度ウェブマスタープラットフォームの公式ニュース...

WeChat電子商取引で眠れないのは誰ですか? WeChatはタオバオストアをターゲットにしているのではなく、Tmallを模倣しようとしている

昨日(5月29日)午後、WeChatは公式サイトで新しいWeChatストア機能を発表しました。認証に...

商人は必ず読むべき:ダイヤモンドブース制作の秘密

店舗運営の鍵はトラフィックであることは誰もが知っています。日常生活に関わるトラフィックの大部分は、自...