1. 首页
  2. 开发

node.js rabbitmq(amqplib) 断线重连

rabbitmq

我用的是amqplib 这个包,github上的项目名是amqp.node

这个包貌似是没有实现断线自动重连的,看了看issues,也和我自己实现的版本差不多。

思路

思路主要是两个地方:

  1. connection实例的error事件.
  2. 初始化的promise链报错.

针对这两种情况都去尝试重新连接,因为可能不能立即恢复,所以得隔一段事件重连一次,直到恢复为止。

封装实现

/**
 * Mq Factory
 * @authors yanjixiong
 * @date    2016-07-22 09:56:19
 */

var Connection = require('./Connection');
var Channel = require('./Channel');
var Exchange = require('./Exchange');
var Queue = require('./Queue');
var Consume = require('./Consume');
var ExchangeTypes = require('../constant').ExchangeTypes;
var RouteKey = require('../constant').RouteKey;

var config = require('../../config');
var log = require('../../common/logger').getLogger('Core:mq:index');

var ROUTE_KEY = RouteKey.RECEIVE_PA;
var QUEUE_NAME = RouteKey.RECEIVE_PA; // 队列名称

function MQ() {
	this.connection = null;
	this.init();
}

/**
 * 初始化消息队列
 * @return {[type]} [description]
 */
MQ.prototype.init = function init() {
	var self = this;

	// 创建连接
	Connection
		.createConnection(config.rabbitMQ_url)
		.then(function (conn) {

			// 实例中存储当前连接
			self.connection = conn;

			// 监听连接错误
			conn.on('error', function(err) {
				log.error('[mq] connection error ', err);
				self.reconnect();
			});

			log.info('[mq] create connection success');

			// 创建通道
			return Channel
				.createChannel(conn);
		})
		.then(function (ch) {

			// 进程被杀死关闭连接
			process.once('SIGINT', function() { 
				log.info('kill by signal SIGINT');
				ch.close();
				self.connection.close(); 
				self.connection = null;
				process.exit(0);
			});

			ch.on('error', function(error) {
				// ch.close();
				log.error('[mq] channel error: ', error);
			});

			log.info('[mq] create channel success');

			// 创建交换机
			return Exchange
				.assertExchange(ch, config.exchange_name, ExchangeTypes.DIRECT, {durable: false})
				.then(function () {
					log.info('[mq] assert exchange [%s] [%s]', config.exchange_name, ExchangeTypes.DIRECT);

					// 创建队列
					return Queue
						.assertQueue(ch, QUEUE_NAME, {exclusive: false, durable: false}); // exclusive 是否排它 durable :是否持久化
				})
				.then(function (queue) {
					log.info('[mq] assert queue [%s] success', QUEUE_NAME);

					log.debug(queue);

					// 绑定队列到交换机
					return Queue.
						bindQueue(ch, QUEUE_NAME, config.exchange_name, ROUTE_KEY);
				})
				.then(function() {
					log.info('[mq] bind queue [%s] to exchange [%s]', QUEUE_NAME, config.exchange_name);

					// 消费
					return Consume
						.consume(self.connection, ch, QUEUE_NAME);
				})
		})
		.catch(function (err) {
			log.error('[mq] Init failed , error: ', err);
			self.reconnect();
		});
};

/**
 * 重新连接
 * @return {[type]} [description]
 */
MQ.prototype.reconnect = function() {
	var self = this;

	log.info('[mq] try reconnect 3 seconds later');

	setTimeout(function () {
		self.init();
		self.reconnectCount++;
	}, 3000);	
}

/**
 * 获取连接
 * @return {[type]} [description]
 */
MQ.prototype.getConnection = function getConnection() {
	var self = this;

	if (this.connection) {
		return Promise.resolve(self.connection);
	} else {
		return Connection
			.createConnection(config.rabbitMQ_url)
			.then(function (conn) {
				// 实例中存储当前连接
				self.connection = conn;

				// 进程被杀死关闭连接
				process.once('SIGINT', function() { 
					log.info('kill by signal SIGINT');
					conn.close(); 
					self.connection = null;
					process.exit(0);
				});

				log.info('[mq] create connection success');

				return Promise.resolve(conn);
			});
	}
}

module.exports = MQ;
收藏

暂无评论

登录后可以进行评论。没有账号?马上注册