网站首页 > 开源技术 正文
背景
Sia-rabbitmq-plus是公司2013年开发的一个项目。最早只是一个简单的封装好的rabbitmq客户端,随着接入的业务越来越多,业务方提出了很多新的需求。于是我们在rabbitmq的基础上新增了消息队列监控预警提醒,历史消息消费情况查看,详细日志查看,客户端心跳监测等功能,这些功能极大方便了我们查看发现问题,解决问题的效率。
项目简介
Sia-rabbitmq-plus工作流程是客户端(生产者或消费者)通过rabbitmq接口发送或者消费消息,同时将客户端元信息(ip,队列名称,项目名称,预警值等)发送给心跳监测队列。 定时任务定时收集rabbitmq相关的队列数据,同时监测预警值发送预警信息。主要包含以下几个模块
- 客户端(sia-rabitmq-plus-client)
- 主要提供了消费和发送消息的api接口
- MQ信息收集模块(sia-rabbitmq-plus-gather)
- http方式调用rabbitmq的接口采集rabbitmq相关信息
- 心跳监测模块 (sia-rabbitmq-plus-heartbeat)
- 收集客户端生产者和消费者的心跳数据
- 监控前端展示模块(sia-rabbitmq-plus-display)
- 将收集到的mq信息和心跳数据展示在前台页面
RabbitMQ的特性
消息队列中间件(Message Queue Middleware ,简称为MQ) 是指利用高效可靠的消息传递,机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传,递和消息排队模型,它可以在分布式环境下扩展进程间的通信。作为众多消息中间件之一的。RabbitMQ发展到今天被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。
RabbitMQ 的具体特点可以概括为以下几点。
- 可靠性: RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认及发布确认等。 令灵活的路由: 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能, RabbitMQ己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个 交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
- 扩展性: 多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展 集群中节点。
- 高可用性: 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队 列仍然可用。
- 多种协议: RabbitMQ除了原生支持AMQP 协议,还支持STOMP,MQTT 等多种消息 中间件协议。
- 多语言客户端:RabbitMQ几乎支持所有常用语言,比如Java、Python、Ruby、PHP、 C#、JavaScript 等。
- 管理界面: RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集 群中的节点等。
- 插件机制: RabbitMQ提供了许多插件,以实现从多方面进行扩展,当然也可以编写自 己的插件。
Sia-rabbitmq-plus特性
- 高可用性:
RabbitMQ队列服务集群化,支持动态增删集群节点。 增加镜像队列,预防单点故障。
- 可收缩性:
支持动态增删消费线程以适应消息的处理。每个监听队列都有一个线程池提供服务,用户可自定义消费处理线程池。
- 易用性:
去中心化管理,相关配置信息本地化,简单方便。
- 消息追踪
所有收发的消息在本地用单独的日志文件存储,方便数据恢复与排错。
- 邮件预警
增加对线上队列的实时监控,触发预警时及时通知到个人。
- 支持三种发送方式
1 . 点对点模式:Provider生产消息发送到队列中,Consumer绑定队列获取消息。
2 . 发布订阅模式: Provider将消息发送给Exchange,然后Consumer创建Queue绑定到Exchange接收消息。
3 . 同步链条模式:模拟系统间同步请求调用,实现机制与之前版本完全保持一致,但不作推荐使用。
监控页面预览
监控页面可以直观的看到每个队列的消费情况、预警邮箱、接收的ip、发送的ip通过颜色区分队列的健康情况,以及最多7天的历史消息信息。具体查看使用指南
Sia-rabbitmq-plus重要指标
- 吞吐量(每秒最大请求量)
- 大概每秒20MB的数据(并发请求也是依次进入队列服务器)一天的最大吞吐量为1728GB。
- 每秒最大的并发量
- 假设每条消息平均10KB,则每秒可以流通的消息数为2000条,一天可以流通的消息为172,800,000 (1亿7千2百8十万)条
- 消息的最大堆积
- sia-rabbitMQ-plus的堆积量跟硬盘空间有关,目前SIA搭建的RabbitMQ集群所在的宿主机为8核16G内存256G磁盘。 据最新的统计,目前单队列最高一天流通1百万条消息
关键术语
- exchange:接收发布应用程序发送的消息,并根据一定的规则将这些消息路由到“消息队列”。
- queue:存储消息,直到这些消息被消费者安全处理完为止。
- businessCode: businessCode是两个系统间通讯的的唯一的标示符,对应MQ服务器的queue
- groupCode:发布订阅模式下的交换机,对应MQ服务器的exchange
使用指南:
rabbitmqPlus监控后台使用说明
1.1 监控后台页面预览
启动sia-rabbitmq-plus-display项目后打开本地地址:http://127.0.0.1:17003
即可看到监控页面,监控页面未做权限控制可看到所有队列的信息。点击具体的队列名称可以进入查看队列详情。
2.1 术语解释
- 队列名称:RabbitMQ服务器上的队列名
- 项目名称:用户配置的PROJECT_NAME
- 累积条数:当前队列未消费的消息条数
- 增长次数:累积条数持续增长的次数
- 发送条数:之前一分钟内发往队列的消息条数
- 接收条数:之前一分钟内从队列消费的消息条数
- 消费者数:从队列接收消息的消费者个数
- 报警Email:接收预警邮件的邮箱名,SIA5.0系列版本中,用户配置的EMAIL_RECEVIERS
- 接收IP及时间:从队列消费消息的机器IP及其最近的消费时间
- 发送IP及时间:向队列发送消息的机器IP及其最近的发送时间
- 队列总数:RabbitMQ服务器上的队列名总数
- 发送总数:之前一分钟发往各队列的消息条数总和
- 接收总数:之前一分钟从各队列消费的消息条数总和
2.2 队列状态解释
队列健康状态按颜色分为五类:
- 红色:预警状态,累积条数或增长次数超过用户设定的阈值(累积条数默认阈值为100条,增长次数默认阈值为5次)。此时已发送预警邮件。
- 黄色:警告状态,消费者数为零,累积条数大于零。说明有未消费的消息。
- 灰色:弃用状态,消费者数为零,累积条数也为零。说明队列没有消息流通,可能已废弃。
- 白色:空闲状态,消费者数大于零,发送条数与接收条数都为零。说明队列暂无消息流通。
- 绿色:运行状态,消费者数大于零,发送条数与接收条数也都大于零。说明队列有消息流通,收发两端正常通信。
2.3 实时数据查看
(1)检索功能
在应用的最上方,有一个检索框:
以队列名称做模糊检索,就能查看相应的队列:
(2)排序功能
对队列名称、累积条数、增长次数、发送条数、接收条数、消费者数等列提供排序功能。
- 以发送条数为例,点击发送条数,出现发送条数▲,表示按照升序排序:
- 再次点击发送条数,出现发送条数▼,表示按照降序排序:
(3)检索功能与排序功能可以搭配使用
(4)点击队列名称这一列下的任意队列,即进入相应队列的历史数据查询界面
2.4 历史数据查询
(1)发送条数与接收条数展示图
按照时间单位,可以展示最近7天的历史数据。选择时间单位,设置起始时间与结束时间,点击查看:
- 分,查看区间为30分钟
- 时,查看区间为30小时
(发送条数与接收条数重合了)
- 日,查看区间为7天
(发送条数与接收条数重合了)
如果只想看发送条数(或接收条数),则点击发送条数(或接收条数)进行开启或关闭。
(2)累积条数展示图
按照时间单位,可以展示最近7天的历史数据。查看区间为30分钟。
累积条数不为零说明当时发送条数总数大于接收条数总数。
点击返回图标,即可回到上一层实时数据查看界面
3. 队列预警的解释
队列预警邮件示例:
剩余消息数:188 报警阈值:1000 增长次数:12 报警增长次数阈值:11
解释如下:
if(剩余消息数>报警阈值){ 发送预警邮件,说明消费者接收速率慢,需要关注! } if(增长次数>报警增长次数阈值){ 发送预警邮件,大概率消费者已经停机,需要立即处理! }
开发指南
skytrain-client消息发送配置
1、准备工作
- 发送端需要配置siaparameters.properties文件(文件名不可更改)
RABBITMQ_HOST=10.100.66.81 RABBITMQ_PORT=5672 SKYTRAIN_LOG_ROOT=D:/logs/send SKYTRAIN_LOG_FILESIZE=10MB SKYTRAIN_LOG_FILENUMS=5 PROJECT_NAME=skytrain_client_test PROJECT_DESCRIPTION=skytrain_project_team EMAIL_RECEVIERS=xinliang@creditease.cn,pengfeili23@creditease.cn
- RABBITMQ_HOST,MQ服务器的IP地址(必须)
- RABBITMQ_PORT,MQ服务器对外暴露的服务端口(必须)
- SKYTRAIN_LOG_ROOT,自定义日志输出路径(可选,默认系统当前路径)。发送接收消息产生的日志按照队列名在指定路径生成
- SKYTRAIN_LOG_FILESIZE,自定义日志大小(可选,默认20MB,单位:【KB、MB、GB】)SKYTRAIN_LOG_FILENUMS,自定义日志个数(可选,默认10个)
注意:默认消息日志大小为:20MB*10,循环日志,项目组可根据实际需求设置大小及个数
- PROJECT_NAME,项目组名称(必须),注意接收的队列名必须以此开头。如果是发送者,请让接收者先启动(因为发送者不建立队列)
- PROJECT_DESCRIPTION,项目组描述,对项目组的(中文)描述(可选)
- EMAIL_RECEVIERS,预警邮件接收者,邮箱为公司邮箱,多个按逗号隔开(必须)
2、siaparameters.properties 配置文件的读取
- 从class文件所在的路径查找
- 若上一步没有找到,则从项目lib包所在的路径查找
- 自定义目录下,通过代码 PropertyHelper.setProfilePath("文件所在目录"); 设置,就能正确加载。配置文件的读取优先级从1.->2.->3.,如果最终没有找到,会有出错信息,按照提示解决即可。一般把该文件与其他配置文件放在一起即可。
- 启动参数添加 在启动时,添加JVM参数 -DSKYTRAIN_FILE_PATH=文件所在目录
- 这种方式与3.二选一即可
skytrain-client消息接收配置
1、接收端需要配置 siaparameters.properties 文件
参见siaparameters.properties 文件的配置
2、接收端需要配置 receivequeue.properties 文件
注意:文件名不可更改!
点对点模式接收配置:
skytrain_client_test_send_p2p={"unConsumeMessageAlarmNum":200,"unConsumeMessageAlarmGrowthTimes":10,"className":"skytrainDemo.RecevieP2P","methodName":"execRun","autoAck":"false","threadPoolSize":"4"}
发布订阅模式接收配置:
skytrain_client_test_send_pubsub@skytrain_client_test_receive_message={"unConsumeMessageAlarmNum":200,"unConsumeMessageAlarmGrowthTimes":10,"className":"skytrainDemo.ReceviePubSub","methodName":"execRun","autoAck":"true","threadPoolSize":"4"}
- skytrain_client_test_send_p2p,MQ服务器的接收队列名,对应发送端的businessCode(点对点模式,必须),项目组自行设置,需要以PROJECT_NAME 开头
- skytrain_client_test_send_pubsub,MQ服务器接收交换机名,对应发送端的groupCode(发布订阅模式,必须),项目组自行设置。
- skytrain_client_test_receive_message,MQ服务器的与交换机绑定的队列名(发布订阅模式,必须),项目组自行设置。
- unConsumeMessageAlarmNum,自定义队列累积条数预警阈值,超过则发预警邮件(可选,默认为100条)
- unConsumeMessageAlarmGrowthTimes,自定义累积消息持续递增的次数阈值,超过则发预警邮件(可选,默认为5次)
- className,接收端进行消息处理的(包名+)类名(必须)
- methodName,接收端进行消息处理的方法名(必须),接收参数只有一个,要么是SIAMessage要么是String,例如:
public void execRun(SIAMessage message) {} public void execRun(String message) {}
- autoAck,是否开启自动ACK机制(可选),默认手动确认(true,接收方接收到消息后,自动向服务器发回确认,false,消息接收方处理完消息后,再向服务器发送消息确认,服务器再删除该消息的副本)。建议设为false(手动确认),这样能保证未消费的消息不丢失,但线上可能累积消息。若设为true,则将消息全部拉回本地内存(如果宕机,则消息丢失)。本质上消息的消费速度与该设置无关,与消息的处理线程池大小有关。这个设置的意义是选择将消息缓存在本地内存还是服务器。请项目组自己考量。
- threadPoolSize,接收端进行消息处理使用线程池的大小(可选),默认为1,这个值与并行处理消息数相关,建议设置为处理器的核数X2。
3、receivequeue.properties配置文件的读取
略,参见 siaparameters.properties 配置文件的读取
4、消费者的启动
文件 receivequeue.properties 正确配置后,只需在程序的启动(初始化)块中加入代码:
Consumer.start();
就能按照配置文件的设置启动所有的消费者(receivequeue.properties 里可以配置多个队列消费者)
如果使用Spring的bean相关配置:
例:
<bean id="helloWorld" class="skytrainDemo.testBean" />
上面的对应设置为:
点对点接收配置:
skytrain_client_test_send_p2p={"unConsumeMessageAlarmNum":200,"unConsumeMessageAlarmGrowthTimes":10,"beanName":"helloWorld","beanMethodName":"getMessage","autoAck":"false","threadPoolSize":"4"}
发布订阅接收配置:
skytrain_client_test_send_pubsub@skytrain_client_test_receive_message={"unConsumeMessageAlarmNum":200,"unConsumeMessageAlarmGrowthTimes":10,"beanName":"helloWorld","beanMethodName":"getMessage","autoAck":"false","threadPoolSize":"4"}
- beanName,bean的名字,需要在 Spring 的 applicationContext.xml 中配置,如下所示
<bean id="helloWorld" class="com.sia.testBean" />
- beanMethodName,接收的方法名,bean所在的类中对应的接收方法,与methodName一样,只能包含一个(String或SIAMessage)参数
注意:
如果使用 Spring 而不是 Web 项目,只需在程序的启动(初始化)块中加入代码:
Consumer.start(ApplicationContext applicationContext)
传递一个应用的上下文,就能按照配置文件的设置启动所有的消费者(receivequeue.properties 里可以配置多个队列消费者)
如果是 Web 项目,通过 listener 启动就可以了
接收端web.xml配置
使用如下配置:
<listener> <listener-class>com.sia.rabbitmqplus.start.SIAInitialListener</listener-class> </listener>
注意:
配置过程中,最好将 sia 的 SIAInitialListener 配置在 web.xml 的最后,便于sia的正常加载。
SIA对象说明
类名: SIAMessage
所在包:com.creditease.sia.pojo
sia-rabbitmq-plus部署指南
一. mysql初始化
DROP TABLE IF EXISTS `skytrain_queue_message_info_history`; CREATE TABLE `skytrain_queue_message_info_history` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `queue_name` varchar(100) NOT NULL, #队列名称 `un_consume_message_num` int(11) DEFAULT NULL,#残留消息数量 `publish_message_num` int(11) DEFAULT NULL,#发送消息数量 `deliver_message_num` int(11) DEFAULT NULL,# 消费消息数量 `worktime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `id_UNIQUE` (`id`), KEY `index3` (`queue_name`), KEY `index4` (`worktime`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8;
二. redis安装
reids的安装和配置详见官方文档,单机或集群模式都行
三. rabbitMQ 安装
rabbitMQ的安装和配置详见官方文档,单机或集群都行
##项目部署
1.环境要求 系统:64bit OS,Linux/Mac/Windows IDE:推荐使用IntelliJ IDEA 或 Eclipse JDK:JDK1.6+
2.从SIA-RABBITMQ-PLUS工程下获取源代码打包,执行rabbitmq-plus-build-component目录的mvn命令即可。
- 在~/sia-task/sia-task-build-component目录下,执行如下命令打包:mvn clean install 。
- 打包成功后,会在~/sia-rabbitmq-plus/rabbitmq-plus-build-component 目录下出现target文件,target文件中的.zip文件即为项目安装包。
- 打开安装包所在文件夹,将安装包解压,得到task目录,其中包括四个子目录:
3.配置文件修改
将config文件夹下的sia-task-config工程的配置文件task_config_open.yml,以及sia-task-scheduler工程下的配置文件task_scheduler_open.yml中的zookeeper和Mysql的链接修改为自己的地址。
4.启动sia-rabbitmq-plus-heartbeat工程
sh sia-rabbitmq-plus-heartbeat.sh
5.启动sia-rabbitmq-plus-gather工程 sh sia-rabbitmq-plus-gather.sh
6.启动sia-rabbitmq-plus-display工程 sh sia-rabbitmq-plus-display.sh
7.启动sia-rabbitmq-plus-dome工程 sh sia-rabbitmq-plus-dome.sh
私信回复"sia-rabbitmq-plus"获取链接地址,喜欢的点个关注,一起学习探讨新技术。
猜你喜欢
- 2024-09-08 vue2组件系列第四十二节:NavBar 导航栏
- 2024-09-08 从零开始学Python——使用Selenium抓取动态网页数据
- 2024-09-08 黑客突破macOS的安全防御,新型恶意软件正在偷偷的窃取你的文件
- 2024-09-08 vue2组件系列第三十六节:Lazyload 图片懒加载
- 2024-09-08 Visa|实习面试|2022 暑假(visa issues)
- 2024-09-08 消息中间件RabbitMQ入门详解(消息中间件mq作用)
- 2024-09-08 vue2组件系列第二十二节:SwitchCell 开关单元格
- 2024-09-08 vue2组件系列第二十节:按钮式单选组件
- 2024-09-08 vue2组件系列第四十节:NoticeBar 通告栏
- 2024-09-08 vue2组件系列第二十六节:PasswordInput 密码输入框
你 发表评论:
欢迎- 07-10公司网站建站选择:人工建站和源码建站分析
- 07-10多用途游戏娱乐新闻网站HTML5模板
- 07-10站长教你搭建属于自己的网站(搭建网站的步骤)
- 07-10php宝塔搭建部署实战响应式塑料封条制品企业网站模板源码
- 07-10自适应响应式汽车配件类网站源码 html5高端大气汽车网站织梦模板
- 07-10网站标签怎么设置?(网站标签怎么设置比较好)
- 07-10PageAdmin企业网站制作中踩过的坑
- 07-10豆包给我输出的html在线象棋源码(有点简单)
- 最近发表
- 标签列表
-
- jdk (81)
- putty (66)
- rufus (78)
- 内网穿透 (89)
- okhttp (70)
- powertoys (74)
- windowsterminal (81)
- netcat (65)
- ghostscript (65)
- veracrypt (65)
- asp.netcore (70)
- wrk (67)
- aspose.words (80)
- itk (80)
- ajaxfileupload.js (66)
- sqlhelper (67)
- express.js (67)
- phpmailer (67)
- xjar (70)
- redisclient (78)
- wakeonlan (66)
- tinygo (85)
- startbbs (72)
- webftp (82)
- vsvim (79)
本文暂时没有评论,来添加一个吧(●'◡'●)