• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

rabbitmq-pool-go: golang-rabbitmq连接池及channel复用

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

rabbitmq-pool-go

开源软件地址:

https://gitee.com/tym_hmm/rabbitmq-pool-go

开源软件介绍:

rabbitmq 连接池channel复用

开发语言 golang依赖库

go get -u gitee.com/tym_hmm/rabbitmq-pool-go

go get -u gitee.com/tym_hmm/rabbitmq-pool-go

已在线上生产环镜运行, 5200W请求 qbs 3000 时, 连接池显示无压力
rabbitmq部署为线上集群

功能说明

  1. 自定义连接池大小及最大处理channel数
  2. 消费者底层断线自动重连
  3. 底层使用轮循方式复用tcp
  4. 生产者每个tcp对应一个channel,防止channel写入阻塞造成内存使用过量
  5. 支持rabbitmq exchangeType
  6. 默认交换机、队列、消息都会持久化磁盘
  7. 默认值
名称说明
tcp最大连接数5
生产者消费发送失败最大重试次数5
消费者最大channel信道数(每个连接自动平分)100(每个tcp10个)

使用

  1. 初始化
var oncePool sync.Oncevar instanceRPool *kelleyRabbimqPool.RabbitPoolfunc initrabbitmq() *kelleyRabbimqPool.RabbitPool {	oncePool.Do(func() {        //初始化生产者		instanceRPool = kelleyRabbimqPool.NewProductPool()        //初始化消费者	    instanceConsumePool = kelleyRabbimqPool.NewConsumePool()        //使用默认虚拟host "/"		err := instanceRPool.Connect("192.168.1.202", 5672, "guest", "guest")        //使用自定义虚        //err:=instanceConsumePool.ConnectVirtualHost("192.168.1.202", 5672, "guest", "guest", "/testHost")		if err != nil {			fmt.Println(err)		}	})	return instanceRPool}
  1. 生产者
var wg sync.WaitGroup	for i:=0;i<100000; i++ {		wg.Add(1)		go func(num int) {			defer wg.Done()			data:=kelleyRabbimqPool.GetRabbitMqDataFormat("testChange5", kelleyRabbimqPool.EXCHANGE_TYPE_TOPIC, "textQueue5", "/", fmt.Sprintf("这里是数据%d", num))			_=instanceRPool.Push(data)		}(i)	}	wg.Wait()
  1. 消费者

可定义多个消息者事件, 不通交换机, 队列, 路由

每个事件独立

nomrl := &rabbitmq.ConsumeReceive{        #定义消费者事件        ExchangeName: "testChange31",//队列名称        ExchangeType: kelleyRabbimqPool.EXCHANGE_TYPE_DIRECT,        Route:        "",        QueueName:    "testQueue31",        IsTry:true,//是否重试        IsAutoAck: false, //是否自动确认消息        MaxReTry: 5,//最大重试次数        EventFail: func(code int, e error, data []byte) {        	fmt.Printf("error:%s", e)        },        /***         * 参数说明         * @param data []byte 接收的rabbitmq数据         * @param header map[string]interface{} 原rabbitmq header         * @param retryClient RabbitmqPool.RetryClientInterface 自定义重试数据接口,重试需return true 防止数据重复提交         ***/        EventSuccess: func(data []byte, header map[string]interface{},retryClient kelleyRabbimqPool.RetryClientInterface)bool {//如果返回true 则无需重试            _ = retryClient.Ack()//确认消息    	            fmt.Printf("data:%s\n", string(data))        	return true        },	}	instanceConsumePool.RegisterConsumeReceive(nomrl)	err := instanceConsumePool.RunConsume()	if err != nil {		fmt.Println(err)	}
  • 参数说明
名称类型说明
ExchangeNamestring交换机名称
ExchangeTypestring交换机类型:
EXCHANGE_TYPE_FANOUT
EXCHANGE_TYPE_DIRECT
EXCHANGE_TYPE_TOPIC
Routestring路由键
QueueNamestring队列名称
IsTrybool是否重试
如果开启重试后, 在成功回调用返回true会对消息进行重试, 重试时间为 5000~15000 MS
IsAutoAckbool是否自动确认消息, true: 组件底层会自动对消息进行确认
false: 手动进行消息确认,在成功会调中需进行手动确认 _ = retryClient.Ack()
MaxReTryint重试最大次数s, 需isTry为true
EventFailfunc失败回调
EventSuccessfunc成功回调
  1. 错误码说明

错误码为

  1. 生产者push时返回的 *RabbitMqError
  2. 消费者事件监听回返的 code
错误码说明
501生产者发送超过最大重试次数
502获取信道失败, 一般为认道队列数用尽
503交换机/队列/绑定失败
504连接失败
506信道创建失败
507超过最大重试次数

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap