分散WebSocketソリューションについてお話しましょう

分散WebSocketソリューションについてお話しましょう

序文

最近、自分でプロジェクトを構築しました。プロジェクト自体は非常にシンプルですが、メッセージリマインダーに WebSocket を使用する機能が含まれています。大まかな状況は次のようになります。

発行者はシステム内でメッセージを送信し、対応する部門のすべての人にメッセージをリアルタイムでプッシュします。

これがスタンドアロン アプリケーションである場合、部門 ID とユーザー ID を使用して一意のキーを形成し、アプリケーション サーバーとの WebSocket 永続接続を確立し、パブリッシャーから送信されたメッセージを受信できます。

しかし、実際にプロジェクトを本番環境に適用する場合、スタンドアロンのアプリケーションをデプロイすることはできず、クラスターをデプロイする必要があります。

[[343393]]

そこで、テスト用に Nginx と 2 つの Tomcat を使用したシンプルな負荷分散クラスターを構築しました。

しかし、問題があります。クライアント ブラウザは、1 つのサーバーとの WebSocket の長時間接続のみを確立します。したがって、発行者がメッセージを送信した場合、対象部門のすべてのユーザーがメッセージを受信できることを保証する方法はありません (これらのユーザーが同じサーバーに接続されていない可能性があるため)。

この記事では、このような問題について説明し、解決策を提案します。もちろん、解決策は複数あるので、始めましょう。

WebSocketモノリシックアプリケーションの紹介

分散クラスターを紹介する前に、Prince の WebSocket コード実装を見てみましょう。まず、次の Java バックエンド コードを見てみましょう。

  1. javax.websocket.* をインポートします。
  2. javax.websocket.server.PathParam をインポートします。
  3. javax.websocket.server.ServerEndpoint をインポートします。
  4. com.alibaba.fastjson.JSON をインポートします。
  5. com.alibaba.fastjson.JSONObject をインポートします。java.io.IOException をインポートします。
  6. java.util.Map をインポートします。
  7. java.util.concurrent.ConcurrentHashMap をインポートします。
  8. @ServerEndpoint( "/webSocket/{キー}" )
  9. パブリッククラスWebSocket{
  10. プライベートスタティック  intオンラインカウント = 0;
  11. /**
  12. * 接続を保存するクライアント
  13. */
  14. プライベート静的Map<String, WebSocket> クライアント = 新しい ConcurrentHashMap<String, WebSocket>();
  15. プライベートセッションセッション。
  16. /**
  17. * 送信対象部門コード
  18. */
  19. 秘密の文字列キー;
  20. オープン中
  21. public void onOpen(@PathParam( "key" ) String key , Session session) はIOExceptionをスローします {
  22. this.key =キー;
  23. this.session = セッション;
  24. if (!clients.containsKey(キー)) {
  25. オンラインカウントを追加します。 } クライアント.put(キー、これ);
  26. Log.info( key + "メッセージ サービスに接続されました!" );
  27. } @閉じる時
  28. パブリックvoid onClose()はIOExceptionをスローします{
  29. クライアントを削除します(キー);サブオンラインカウント(); } @メッセージ時
  30. パブリックvoid onMessage(String message) は IOException をスローします {
  31. if (message.equals( "ping" )) {
  32. 戻る;
  33. } JSONObject jsonTo = JSON.parseObject(メッセージ);文字列 mes = (文字列) jsonTo.get( "message" );
  34. if (!jsonTo.get( "to" ).equals( "すべて" )){
  35. メッセージ送信先(mes、jsonTo.get( "to" ).toString());
  36. }それ以外{
  37. メッセージをすべて送信します(mes); } } @エラー時
  38. パブリックvoid onError(セッション セッション、Throwable error) {
  39. エラー.printStackTrace(); } プライベート void sendMessageTo(String message, String To ) は IOException をスローします {
  40. for (WebSocket アイテム: クライアント.values ()) {
  41. if (item. key . contains ( To ))の場合
  42. item.session.getAsyncRemote().sendText(メッセージ); } } プライベート void sendMessageAll(String message) は IOException をスローします {
  43. for (WebSocket アイテム: クライアント.values ()) {
  44. item.session.getAsyncRemote().sendText(メッセージ); } }公共 静的同期int getOnlineCount() {
  45. onlineCountを返します
  46. }公共 静的同期void addOnlineCount() {
  47. WebSocket.onlineCount++; }公共 静的同期void subOnlineCount() {
  48. WebSocket.onlineCount --; } パブリック静的同期 Map<String, WebSocket> getClients() {  
  49. リピーター顧客
  50. }}

サンプル コードは Spring を使用しておらず、ネイティブ Java Web で記述されています。その中の手法を簡単に紹介します。

  • onOpen: クライアントがWebSocketサービスに接続したときにメソッドの実行をトリガーします
  • onClose: クライアントがWebSocket接続を切断したときにトリガーされます
  • onMessage: クライアントから送信されたメッセージを受信したときにトリガーされます
  • onError: エラーが発生したときにトリガーされます

ご覧のとおり、onMessage メソッドでは、クライアントから送信されたメッセージに基づいてメッセージを直接転送しますが、これは単一メッセージ サービスでは問題になりません。

もう一度jsコードを見てみましょう

  1. var ホスト = ドキュメント.場所.ホスト;
  2. // 現在ログインしている部門を取得します。 var deptCodes= '${sessionScope.$UserContext.departmentID}' ;
  3. deptコード=deptコード。置換(/[\[|\]|\s]+/g, "" );
  4. varキー= '${sessionScope.$UserContext.userID}' +deptCodes;
  5. var lockReconnect = false ; //wsの繰り返し接続を避ける
  6. var ws = null ; // 現在のブラウザが WebSocket をサポートしているかどうかを判断します var wsUrl = 'ws://' + host + '/webSocket/' + key ;
  7. WebSocket を作成します (wsUrl); //wsに接続function createWebSocket(url) {
  8. 試してください{ if( 'WebSocket'  ウィンドウ){
  9. ws = 新しい WebSocket(url); }そうでない場合は( 'MozWebSocket'  ウィンドウ){
  10. ws = 新しい MozWebSocket(url); }それ以外{
  11. layer.alert( "お使いのブラウザはWebSocketプロトコルをサポートしていません。Google、Firefoxなどの新しいバージョンを使用することをお勧めします。IE10より前のブラウザは使用しないでください。360ブラウザの場合は、互換モードではなく超高速モードを使用してください!" );
  12. } イベントハンドルを初期化します。 }catch(e){ 再接続(url);コンソールログ(e);
  13. } }関数initEventHandle() {
  14. ws.onclose =関数(){
  15. 再接続(wsUrl); console.log( "llws 接続が閉じられました!" + new Date ().toUTCString());
  16. }; ws.onerror =関数(){
  17. 再接続(wsUrl); console.log( "llws 接続エラー!" );
  18. }; ws.onopen =関数(){
  19. heartCheck.reset().start(); //ハートビート検出リセット console.log( "llws 接続が成功しました!" +new Date ().toUTCString());
  20. }; ws.onmessage = function (event) { //メッセージが受信されると、ハートビート検出がリセットされます
  21. heartCheck.reset().start(); // メッセージを受信した場合、現在の接続は正常です // メッセージを受信した後の実際の業務処理... }; } // ウィンドウの閉じるイベントをリッスンします。ウィンドウが閉じられるときは、接続が切断される前にウィンドウが閉じられないように、Websocket 接続を積極的に閉じます。そうしないと、サーバーは例外をスローします。 window.onbeforeunload =関数() {
  22. ws.close ();
  23. }関数reconnect(url) {
  24. if(lockReconnect)戻り値;
  25. ロック再接続 = true ;
  26. setTimeout( function () { //接続されていない場合は再接続し続けます。リクエストが多すぎるのを避けるために遅延を設定します。
  27. WebSocket を作成します (url);ロック再接続 = false ;
  28. }, 2000);
  29. } //ハートビート検出 var heartCheck = { timeout: 300000, //5分ごとにハートビートを送信
  30. timeoutObj: null 、serverTimeoutObj: null 、リセット: function (){
  31. タイムアウトをクリアします(this.timeoutObj);タイムアウトをクリアします(this.serverTimeoutObj);これを返します
  32. }, 開始:関数(){
  33. var self = this; this.timeoutObj = setTimeout(関数(){
  34. //ここでハートビート メッセージを送信します。バックエンドはそれを受信するとハートビート メッセージを返します。 //Onmessage は返されたハートビート メッセージを取得します。これは接続が正常であることを意味します。 ws.send( "ping" );
  35. console.log( "ping!" )
  36. self.serverTimeoutObj = setTimeout( function (){ //一定時間後にリセットされない場合は、バックエンドがアクティブに切断されていることを意味します
  37. ws.close (); // onclose が reconnect を実行する場合は、ws.close ( )を実行するだけです。 reconnect が直接実行されると、onclose がトリガーされ、再接続が 2 回実行されます。
  38. }, self.timeout) }, this.timeout) } }

js 部分はネイティブ H5 で記述されています。ブラウザの互換性を高めたい場合は、SockJS を使用することもできます。興味があればBaiduで検索してみてください。

次に、WebSocket の分散アーキテクチャのサポートを実装するために、コードを手動で最適化します。

解決策を考える

モノリシック アプリケーションのコード構造と、分散環境で WebSocket が直面する問題を理解したので、次はこの問題を解決する方法について考えてみましょう。

まずこの問題の根本的な原因を見てみましょう。

簡単に考えれば、1 つのアプリケーションにはサーバーが 1 つだけ存在し、すべてのクライアントがこのメッセージ サーバーに接続されていることがわかります。したがって、パブリッシャーがメッセージを送信すると、すべてのクライアントが実際にこのサーバーとの接続を確立し、グループ メッセージを直接送信できるようになります。

分散システムに切り替えた後、メッセージ サーバーが 2 つある場合、クライアントが Nginx によって負荷分散された後、一部のサーバーは 1 つのサーバーに接続し、他のサーバーは他のサーバーに接続します。したがって、パブリッシャーがメッセージを送信すると、そのメッセージはサーバーの 1 つにのみ送信され、このメッセージ サーバーは大量送信操作を実行できますが、問題は、他のサーバーがこれを認識しておらず、メッセージを送信できないことです。

これで、根本的な原因は、メッセージが生成されたときに 1 つのメッセージ サーバーのみがそれを認識できることであることがわかりました。そのため、他のメッセージ サーバーでもそれを認識できるようにする必要があります。それを感知すると、接続しているクライアントにグループ メッセージを送信できます。

では、この機能を実現するにはどのような方法を使用すればよいのでしょうか?王子はすぐにメッセージ ミドルウェアを導入し、そのパブリッシュ/サブスクライブ モデルを使用してすべてのメッセージ サーバーに通知することを思いつきました。

分散 WebSocket 問題を解決するために RabbitMQ を導入する

メッセージ ミドルウェアの選択に関しては、Prince はセットアップが比較的簡単で、強力な機能があり、グループ メッセージング機能のみを使用するため、RabbitMQ を選択しました。

RabbitMQ にはブロードキャスト モード (ファンアウト) があり、これを使用します。

まず、RabbitMQ 接続クラスを記述します。

  1. com.rabbitmq.client.Connectionをインポートします
  2. com.rabbitmq.client.ConnectionFactory をインポートします。
  3. java.io.IOException をインポートします。
  4. java.util.concurrent.TimeoutException をインポートします。
  5. パブリッククラスRabbitMQUtil {
  6. プライベートスタティック 繋がり 繋がり;
  7. /**
  8. * rabbitmqとの接続を確立する
  9. * @戻る 
  10. */
  11. 公共 静的 接続getConnection() {
  12. if (接続!= null &&接続.isOpen()) {
  13. 戻る 繋がり;
  14. } ConnectionFactory ファクトリー = 新しい ConnectionFactory();
  15. ファクトリー.setVirtualHost( "/" );
  16. ファクトリー.setHost( "192.168.220.110" ); // 仮想IPアドレスを使用する
  17. ファクトリーポートの設定(5672);
  18. factory.setUsername( "guest" );
  19. factory.setPassword( "ゲスト" );
  20. 試す {
  21. 接続= factory.newConnection();
  22. } キャッチ (IOException e) {
  23. e.printStackTrace();
  24. } キャッチ (TimeoutException e) {
  25. e.printStackTrace();
  26. }
  27. 戻る 繋がり;
  28. }
  29. }

このクラスについては特に言うことはありません。これは、MQ 接続を取得するための単なるファクトリ クラスです。

次に、私たちのアイデアによれば、サーバーが起動するたびに、MQ メッセージをリッスンするための MQ コンシューマーが作成されます。ここでの Prince のテストでは、次のように Servlet リスナーを使用します。

  1. javax.servlet.ServletContextEvent をインポートします。
  2. javax.servlet.ServletContextListener をインポートします。
  3. パブリッククラスInitListenerはServletContextListenerを実装します{
  4. @オーバーライド
  5. パブリックvoid contextInitialized(ServletContextEvent servletContextEvent) {
  6. WebSocket を初期化します。 } @オーバーライド
  7. パブリックvoid contextDestroyed(ServletContextEvent servletContextEvent) {
  8. }}

Web.xmlでリスナー情報を設定することを忘れないでください

  1. <?xml バージョン = "1.0"エンコーディング = "UTF-8" ?>
  2. <web-app xmlns= "http://xmlns.jcp.org/xml/ns/javaee"  
  3. xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"  
  4. xsi:schemaLocation= "http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"  
  5. バージョン = "4.0" >
  6. <リスナー>
  7. <リスナークラス>InitListener</リスナークラス>
  8. </リスナー>
  9. </ウェブアプリ>

MQ コンシューマー部分として WebSocket に init メソッドを追加する

  1. 公共  静的void init() {
  2. try {接続 接続= RabbitMQUtil.getConnection();チャネル channel = connection .createChannel(); //スイッチ宣言(パラメータ:スイッチ名、スイッチタイプ)
  3. channel.exchangeDeclare( "fanoutLogs" 、BuiltinExchangeType.FANOUT);
  4. //一時キューを取得する
  5. 文字列 queueName = channel.queueDeclare().getQueue(); //キューをスイッチにバインドします (パラメータ: キュー名、スイッチ名、routingKey は無視されます)
  6. チャネル.queueBind(キュー名、 "fanoutLogs" "" );
  7. // ここでは、DefaultConsumer の handleDelivery メソッドをオーバーライドします。送信時にメッセージを getByte() し、それを文字列に再構成する必要があるためです。
  8. コンシューマー consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, encoding, properties, body);
  9. 文字列メッセージ = 新しい文字列(本文、 "UTF-8" );
  10. システム。出力.println(メッセージ); //ここではWebSocketを使用して、メッセージコンテンツを通じて対応するクライアントにメッセージを送信できます。
  11. } }; // キューで消費されるメッセージを宣言します (パラメータ: キュー名、メッセージが自動的に確認されるかどうか、コンシューマー本体)
  12. チャネル.basicConsume(キュー名、 true 、コンシューマー);
  13. //ここで接続を閉じることはできません。消費メソッドを呼び出した後、消費者は常にrabbitMQに接続され、消費を待機します。
  14. } IOException e をキャッチします { e.printStackTrace(); } }

同時に、メッセージが受信されると、WebSocket を介して対応するクライアントに直接送信されるのではなく、MQ に送信されます。このように、複数のメッセージ サーバーが存在する場合、それらのサーバーはすべて MQ からメッセージを取得し、WebSocket を使用して取得したメッセージ コンテンツを対応するクライアントにプッシュします。

WebSocket の onMessage メソッドに次のコンテンツが追加されます。

  1. 試す {
  2. //接続を試みる
  3. 繋がり 接続= RabbitMQUtil.getConnection(); //チャンネルを作成してみる
  4. チャネル channel = connection .createChannel(); //スイッチを宣言します(パラメータ:スイッチ名、スイッチタイプ、ブロードキャストモード)
  5. チャネル.exchangeDeclare( "fanoutLogs" 、 BuiltinExchangeType.FANOUT);
  6. // メッセージの公開 (パラメータ: スイッチ名、ルーティングキー、無視。ブロードキャスト モードでは、プロデューサーはスイッチの名前とタイプを宣言するだけで済みます)
  7. channel.basicPublish( "fanoutLogs" , "" , null ,msg.getBytes( "UTF-8" ));
  8. System.out.println ( "メッセージを公開" ) ;
  9. チャネル。近い(); } キャッチ (IOException |TimeoutException e) {
  10. e.printStackTrace();
  11. }

追加したら、元の Websocket プッシュ コードを削除します。

これでソリューション全体が完成します。

要約する

この時点で、分散 WebSocket プッシュ メッセージの問題は解決しました。

主にRabbitMQを紹介しました。 RabbitMQ のパブリッシュ/サブスクライブ モードを通じて、各メッセージ サーバーは起動時にメッセージをサブスクライブします。どのメッセージ サーバーがメッセージを送信しても、そのメッセージは MQ に送信されます。このようにして、各メッセージ サーバーはメッセージの送信時刻を感知し、Websocket を介してクライアントに送信します。

これが一般的なプロセスです。これについて考えたことはありますか: RabbitMQ が数分間ダウンしてから再起動した場合、コンシューマーは RabbitMQ に再接続できますか?メッセージを正常に受信できますか?

実稼働環境では、この問題を考慮する必要があります。

ここでは、コンシューマーが自動再接続をサポートしていることがテストされているため、このアーキテクチャを安全に使用してこの問題を解決できます。

この記事はここで終わります。ぜひメッセージを残して、一緒に議論し、学び、一緒に進歩してください。

<<:  あまり知られていないが、非常に実用的な Docker 使用のヒント 10 選

>>:  Amazon SageMaker は、Xingzhe AI がゲーム コンテンツのフィルタリングで 96% の精度を達成するのを支援します

推薦する

#BlackFriday# gcore: 最初の 1 か月は 25% オフ、独立したサーバー、香港/日本/韓国/シンガポール/米国の 28 のデータセンターが利用可能

gcore は毎年恒例のブラックフライデー特別プロモーションを開始しました。28 のコンピュータ ル...

A5トピック:インターネットはMo Yanで熱く、さまざまなウェブサイトがトラフィックを競い合っている

莫言氏が中国人として初めてノーベル文学賞を受賞したことで、インターネットとはあまり関係のないこの作家...

#11.11# 加速クラウド:1週間900元、国内独立サーバー、50M専用BGP帯域幅、100G DDoS高防御+ CC攻撃無視、このサイトと同じ

加速クラウドは、棗荘の伝説的な最大のコンピュータルームでもある山東省魯南ビッグデータセンターのコンピ...

SEO担当者:ホームページを頻繁に更新すると罰せられますか?

月給5,000~50,000のこれらのプロジェクトはあなたの将来ですSEO は細かい点が多い仕事です...

「タオバオ村」調査:売上高はピークに達し、利益は100%から20%未満に減少

「タオバオ村」調査:売上高はピークに達し、利益は100%から20%未満に減少李克強首相は、第18期中...

Kステーション100日:百度は私により良い道を歩ませた

Baidu の 6.28 地震により、私の通常のウェブサイトの 1 つが破壊されました。私の独創性が...

「シニアヘルプシステム」からイベントマーケティングの活力を体感

今日グループでスクリーンショットを見て、とても興味深いと思ったので、下の写真のように、それを記事の冒...

ウェブページの説明 メタディスクリプションの書き方と最適化のヒント

昨日、Web ページのタイトルの最適化テクニックについて記事を書きました。今日は、Web ページの高...

過去2年間で60万ドルを稼いだ方法(詳細データ付き)(パート2)

前回の記事では主に私のデータの一部を共有しました「過去2年間で60万を稼いだ方法を共有(詳細データ付...

建国記念日の休暇中の「行列、混雑、人混み」からオンラインプロモーションに対する姿勢についてお話ししましょう

国慶節の連休から戻り、厳しい帰国の試練を乗り越えた後、チケットを買うための行列、高速道路の交通渋滞、...

動画サイトのSEOプロモーション手法を探る

映画を検索すると、無数の動画ウェブサイトが見つかります。特に動画サイトがもたらすトラフィックは相当な...

百度の試用期間について、百度の試用期間から早く抜け出す方法

私の知る限り、SEO業界に入ったばかりの友人の多くは、自分のウェブサイトがBaiduに登録されてから...

alphavps-server/33€/2xL5630/72g メモリ/5IP

ちょうど一昨日、「alphavps - 年間支払い 12 ユーロ / メモリ 1g / CPU 2 ...

Cloudcone: 3 周年、強力で安価な VPS 2 つ、3.99 USD/4G メモリ/2 コア/100g SSD/3T トラフィック

Cloudcone は、親会社が長い歴史を持つものの、新しいブランドとしては設立されてまだ 3 年し...

どのような友達リンクが高品質の外部リンクであるかについて簡単に説明します

高品質のフレンドリーリンクをどのように見分けるのでしょうか?最近、鄭州SEOは主要プラットフォームで...