js连接RabbitMQ达到实时消息推送

发布时间:2019-01-28 00:15:32
点击:43546次

最近在自己捯饬一个网站,有一个功能是需要后端处理完数据把数据发布到MQ中,前端再从MQ中接收数据。但是前端连接MQ又成了一个问题,在网上搜了下资料,点进去一篇IBM DW后发现竟然是超哥写的,真是巧哈~因为超哥写的很好所以很多我就直接摘抄过来了,他应该不会介意的(逃。

参考

实现服务器端推送的几种方式

Web 应用都是基于 HTTP 协议的请求/响应模式,无法像 TCP 协议那样保持长连接,因此 Web 应用就很难像手机那样实现实时的消息推送。就目前来看,Web 应用的消息推送方式主要有以下几种:

1.Ajax 短轮询
Ajax 轮询主要通过页面端的 JS 定时异步刷新任务来实现数据的加载,但这种方式实时效果较差,而且对服务端的压力也较大。

2.长轮询
长轮询主要也是通过 Ajax 机制,但区别于传统的 Ajax 应用,长轮询的服务器端会在没有数据时阻塞请求直到有新的数据产生或者请求超时才返回,之后客户端再重新建立连接获取数据。但长轮询服务端会长时间地占用资源,如果消息频繁发送的话会给服务端带来较大的压力。

3.WebSocket 双向通信
WebSocket 是 HTML5 中一种新的通信协议,能够实现浏览器与服务器之间全双工通信。如果浏览器和服务端都支持 WebSocket 协议的话,该方式实现的消息推送无疑是最高效、简洁的。并且最新版本的 IE、Firefox、Chrome 等浏览器都已经支持 WebSocket 协议,Apache Tomcat 7.0.27 以后的版本也开始支持 WebSocket。

安装RabbitMQ

在macOS上安装rabbitmq(提前已经安装了brew):

brew install rabbitmq

启动stomp有关的一系列插件

rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp rabbitmq_web_stomp_examples

可以通过下面这条命令查看已启动了哪些RabbitMQ插件:

rabbitmq-plugins list

重启rabbitmq:

brew services restart rabbitmq

我们可以在15670端口访问web-stomp-examples,

http://localhost:15670/

其中RabbitMQ运行在15672端口,stomp服务运行在15674端口

前端通过websocket连接RabbitMQ

RabbitMQ 有很多第三方插件,可以在 AMQP 协议基础上做出许多扩展的应用。Web STOMP 插件就是基于 AMQP 之上的 STOMP 文本协议插件,利用 WebSocket 能够轻松实现浏览器和服务器之间的实时消息传递,具体实现方式如下图所示:

w

RabbitMQ Web STOMP 插件可以理解为 HTML5 WebSocket 与 STOMP 协议间的桥接,目的也是为了让浏览器能够使用 RabbitMQ。当 RabbitMQ 消息服务器开启了 STOMP 和 Web STOMP 插件后,浏览器端就可以轻松地使用 WebSocket 或者 SockerJS 客户端实现与 RabbitMQ 服务器进行通信。

前端通过stomp连接RabbitMQ的代码如下:

// 初始化 ws 对象 if (location.search == '?ws') { var ws = new WebSocket('ws://localhost:15674/ws');
} else { var ws = new SockJS('http://localhost:15674/stomp');
} // 获得Stomp client对象 var client = Stomp.over(ws); // SockJS does not support heart-beat: disable heart-beats client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
client.debug = pipe('#second'); // 定义连接成功回调函数 var on_connect = function(x) { //data.body是接收到的数据 client.subscribe("/queue/default", function(data) { var msg = data.body;
        alert("收到数据:" + msg);
    });
}; // 定义错误时回调函数 var on_error = function() { console.log('error');
}; // 连接RabbitMQ client.connect('guest', 'guest', on_connect, on_error, '/'); console.log(">>>连接上http://localhost:15674");

我们可以看到代码主要包括以下几个部分:

  • 初始化websocket对象
  • 构造stomp client
  • 定义连接成功的回调函数
  • 定义连接错误时的回调函数
  • 连接RabbitMQ

因此我们要做的主要是定义这个连接成功的回调函数,其中:

client.subscribe("/queue/default", function(data) { var msg = data.body;
    alert("收到数据:" + msg);
});

这个函数的功能是订阅了一个名为"default"的queue,当有生产者向该队列发送数据时,该函数会作为消费者接收到数据,并触发回调函数。我们可以在回调函数中对接收到的数据进行展示,更新到界面上,从而可以达到和轮询一样的效果。

当我们把这段js放进html中run起来后,通过Chrome F12工具也可以看到这样的提示:

Opening Web Socket...
stomp.js:114 Web Socket Opened...
stomp.js:114 >>> CONNECT
accept-version:1.1,1.0 heart-beat:0,0 host:/ login:guest passcode:guest

stomp.js:114 <<< CONNECTED server:RabbitMQ/3.6.6 session:***
heart-beat:0,0 version:1.1 stomp.js:114 connected to server RabbitMQ/3.6.6 stomp.js:114 >>> SUBSCRIBE id:sub-0 destination:/queue/default

当我们向/queue/default发送一条测试数据“aaa”,前端js就会接收到数据,再看F12工具中的显示:

<<< MESSAGE subscription:sub-0 destination:/queue/default message-id:*** redelivered:false persistent:1 content-length:3 aaa

可以看到已经收到了"aaa"这条消息,并且可以注意到Stomp协议是以文本格式进行数据传输的,而RabbitMQ是以二进制传输的。通过data.body就可以获取到Stomp中的消息的主体了。

思考

关于websocket连接MQ进行实时消息推送,固然是实现消息推送的一个很好的方式,但是不得不思考在大量并发情况下,MQ端是否能承受的了数量庞大的websocket连接。MQ将前后端进行了解耦和异步化,但是也可能成为系统的性能瓶颈。对于这种实现方式,目前只能说是且学且用,性能方面还尚待考究。