网站首页 > 开源技术 正文
在前面已经为大家介绍了RocketMQ的安装与启动,但实际工作中RocketMQ是作为我们项目的集成件存在的,而Java最流行的框架是Spring了,那么如何与SpringBoot脚手架做个集成呢?
首先要引入其他依赖,找到对应的Pom配置上就好了,那么我们从RocketMq官网找到依赖包。官方地址:(官方文档 喜欢看文档的同学可以跳转看下
结合通过SpringBoot快速构建的项目,最精简的pom依赖清单
整合RocketMQ依赖包
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>
</dependencies>
配置RocketMQ服务地址
因为rocketMQ作为中间件,所以我们要配置链接RocketMQ的服务器地址。从(RocketMQ构图整理概览)这篇文章可以了解到RocketMQ的核心组件是NameServer、Broker、Producer、Consumer。
那么我们可以看到作为我们的应用项目肯定是扮演Producer和Consumer的 角色的,所以RocketMQ的服务器就落到了NameServer和Broker身上。再来分析一下,Broker是Producer和Consumer的中间者,接收生产者产生的消息和推消息给消费者,那我们需要配置Broker的地址吗?可以考虑下,所有的Broker会在NameServer中注册信息,Broker是NameServer的小弟。所以我们只需要配置NameServer即可,因为从概览图上也可以看到Producer要把消息发送给哪个Broker其实是从NameServer这里取的。所以我们在application.properties中配置
# namesrv的服务地址
rocketmq.namesrv.address=192.168.137.171:9876
封装消息生产者
把RocketMQ集群地址配置好后就开始封装消息生产者,我们从官网拷贝生产者DEMO代码稍微做些修改
@Service
public class Producer {
@Value("${rocketmq.namesrv.address}")
private String nameSrvAddress;
public void sendMessage(String name, String topic) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
// 根据group实例化一个生产者
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name_4");
// 配置rocketMQ集群地址
producer.setNamesrvAddr(nameSrvAddress);
//Launch the instance.
producer.start();
// 构建消息体
Message msg = new Message(topic, "TagA", ("Hello RocketMQ " + name).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
}
这里的Group和Tag我没有做抽取,有需要的可以将Tag作为消息的小分类标记。举个例子订单支付失败消息,那么Topic可以统一定义成order_pay_fail但实际上支付失败的订单有多种分类,比如超时支付失败、用户取消支付、系统异常等等。那么业务有一种场景想针对用户手动取消支付的订单发送短信,发送优惠券等,就可以根据Tag进行筛选了。而group是一种分组的概念,同一种 业务类型可以 定义为一个分组。后续专门 针对group作分享。到这里可以根据topic发送消息的生产者代码就完成了。
封装消息消费者
封装好生产者之后就开始集成下消费者了,同样从官方文档上把DEMO代码拷贝过来,但是消费者可就没有生产者那么直接了当了。先看官方代码
// 根据groupName构建一个消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 指定nameSrv
consumer.setNamesrvAddr("localhost:9876");
// 消费哪个topic
consumer.subscribe("TopicTest", "*");
// 执行回调
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
System.out.printf("Consumer Started.%n");
作为消费者,我肯定希望他有点个性,不是所有的topic的消息都要消费,正常的业务场景是比如取消订单这个场景的消费者就只处理取消订单这个Topic的消息。从上面的代码可以看到我们要定义一个可以使用的消费者要指定nameSrv的地址、要消费topic的名称。如果我有几十上百个消费者,而且nameSrv的地址是一样的,是不是重复写很多这样的定义呢?聪明的你肯定想到了要抽象,那我们就把这个消费者抽象出来。
定义要消费的Topic
要消费哪个Topic,消费者肯定是知道的,那我设想下在消费者头上添加个注解来指定Topic,类似这样
@RocketMqTopic(topic = "TopicTest")
public class CancelOrder{
// 按照Tag来消费
@RocketMqTag(tag = "TagA")
public Boolean consumerA(MessageExt msg) throws UnsupportedEncodingException {
System.out.println("哈哈:"+msg.getTopic()+":"+msg.getTags()+new String(msg.getBody(), "UTF-8"));
return true;
}
@RocketMqTag(tag = "TagB")
public Boolean consumerB(MessageExt msg) throws UnsupportedEncodingException {
System.out.println("哈哈:"+msg.getTopic()+":"+msg.getTags()+new String(msg.getBody(), "UTF-8"));
return true;
}
}
可以看到我这里定义了两个注解来帮我定义消费者要消费的Topic和Tag
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface RocketMqTag {
String tag() default "";
}
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RocketMqTopic {
String topic() default "";
}
注解定义好了,那我如何处理抽象出来的消息分发呢?Spring给我们提供了AnnotationConfigApplicationContext或者也可以实现ApplicationContextAware接口来获取ApplicationContext对象,有了Spring的上下文我们就可以获取哪些Bean定义了注解,也就能收集到项目需要消费哪些Topic
ApplicationContext context = new AnnotationConfigApplicationContext(ScanRockerMqConsumerConfig.class);
Map<String, Object> beansWithAnnotation = context.getBeansWithAnnotation(RocketMqTopic.class);
Set<String> strings = beansWithAnnotation.keySet();
同样我们也可以得到定义了Tag的方法,使用Map给收集起来,代码如下
/** topic 队列 **/
private Map<String, Object> consumerMap = new HashMap<>();
/** tag 队列 **/
private Map<Object, Map<String, Object>> consumerMethodMap = new HashMap<>();
@PostConstruct
public void scan() {
ApplicationContext context = new AnnotationConfigApplicationContext(ScanRockerMqConsumerConfig.class);
Map<String, Object> beansWithAnnotation = context.getBeansWithAnnotation(RocketMqTopic.class);
Set<String> strings = beansWithAnnotation.keySet();
for (String benName : strings) {
Object bean = beansWithAnnotation.get(benName);
Class<?> aClass = bean.getClass();
RocketMqTopic annotation = aClass.getAnnotation(RocketMqTopic.class);
String topic = annotation.topic();
Method[] methods = aClass.getMethods();
if(methods.length < 1) {
continue;
}
Map<String, Object> consumerMethodItemMap = new HashMap<>();
for (Method method : methods) {
RocketMqTag methodConsumer = method.getAnnotation(RocketMqTag.class);
if(null == methodConsumer) {
continue;
}
String tag = methodConsumer.tag();
consumerMethodItemMap.put(tag, method);
}
if(consumerMethodItemMap.keySet().size() < 1){
continue;
}
consumerMap.put(topic, beansWithAnnotation.get(benName));
consumerMethodMap.put(bean, consumerMethodItemMap);
}
}
然后有了这个集合,我们就可以根据Topic和Tag来分发消息数据了
@Bean
public AsynMessageConsumerHoler getRocketMqConsumer() {
AsynMessageConsumerHoler consumer = new AsynMessageConsumerHoler(rocketMqProperties.getConsumerGroupName());
try {
consumer.setNamesrvAddr(rocketMqProperties.getNamesrvAddr());
// 消费顺序,指定从队列头到尾消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
Set<String> topics = consumerMap.keySet();
if(CollectionUtils.isEmpty(topics)) {
LOGGER.error("please set a topic at last, the system startup failed");
System.exit(0);
}
for (String topic : topics) {
consumer.subscribe(topic.trim(), "*");
}
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
LOGGER.info("receive a message, msgId=" + msg.getMsgId());
try {
Object bean = consumerMap.get(msg.getTopic());
Map<String, Object> stringObjectMap = consumerMethodMap.get(bean);
Set<String> strings = stringObjectMap.keySet();
for (String tag : strings)
if(tag.equals(msg.getTags())) {
Method method = (Method)stringObjectMap.get(tag);
method.setAccessible(true);
try {
Object result = method.invoke(bean, msg);
if(result instanceof Boolean) {
return (Boolean) result ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS : ConsumeConcurrentlyStatus.RECONSUME_LATER;
}else {
// 这里是指返回值不符合格式,实际上是已经消费了的
LOGGER.error("禁止返回非布尔类型的值,如果出现此提示应该杀个程序员祭天");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}catch (Exception e) {
LOGGER.error("***消费点已报错,自己去处理逻辑,消息会重发***");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
}catch (Exception e) {
LOGGER.info("failed to consum this message, msgId=" + msg.getMsgId());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start()
LOGGER.info("the consumer system startup success!");
}catch (Exception e) {
LOGGER.info("the system startup failed!");
}
return consumer;
}
}
到了这里消费者就封装好了,能够按照Topic和Tag自定义消费者,也基本能够满足业务需求了,有更加定制的拿去改改也是可以的。地址我已经打包放到gitee上了,有需要的小伙伴关注:Java极客帮 获取
猜你喜欢
- 2024-12-12 深入理解ViewPager2原理及其实践(上篇)
- 2024-12-12 译|Python幕后(3):漫步CPython源码
- 2024-12-12 心房颤动伴长RR间期及散点图
- 2024-12-12 心房颤动及长间歇
你 发表评论:
欢迎- 05-16东契奇:DFS训练时喷了我很多垃圾话 我不懂他为什么比赛不这么干
- 05-16这两球很伤!詹姆斯空篮拉杆不中 DFS接里夫斯传球空接也没放进
- 05-16湖人自媒体调查:89%球迷希望DFS回归79%希望詹姆斯回归
- 05-16Shams:湖人得到全能球员DFS 节省了1500万奢侈税&薪金空间更灵活
- 05-16G5湖人胜率更高!詹姆斯不满判罚,DFS谈5人打满下半场:这很艰难
- 05-16DFS:当东契奇进入状态 所有防守者在他面前都像个圆锥桶
- 05-16上一场9中6!DFS:不能让纳兹-里德这样的球员那么轻松地投三分
- 05-16WIDER FACE评测结果出炉:滴滴人脸检测DFS算法获世界第一
- 最近发表
-
- 东契奇:DFS训练时喷了我很多垃圾话 我不懂他为什么比赛不这么干
- 这两球很伤!詹姆斯空篮拉杆不中 DFS接里夫斯传球空接也没放进
- 湖人自媒体调查:89%球迷希望DFS回归79%希望詹姆斯回归
- Shams:湖人得到全能球员DFS 节省了1500万奢侈税&薪金空间更灵活
- G5湖人胜率更高!詹姆斯不满判罚,DFS谈5人打满下半场:这很艰难
- DFS:当东契奇进入状态 所有防守者在他面前都像个圆锥桶
- 上一场9中6!DFS:不能让纳兹-里德这样的球员那么轻松地投三分
- WIDER FACE评测结果出炉:滴滴人脸检测DFS算法获世界第一
- 湖人自媒体调查:89%球迷希望DFS回归 79%希望詹姆斯回归
- 一觉醒来湖人苦盼的纯3D终于到位 DFS能带给紫金军多少帮助
- 标签列表
-
- jdk (81)
- putty (66)
- rufus (78)
- 内网穿透 (89)
- okhttp (70)
- powertoys (74)
- windowsterminal (81)
- netcat (65)
- ghostscript (65)
- veracrypt (65)
- asp.netcore (70)
- wrk (67)
- aspose.words (80)
- itk (80)
- ajaxfileupload.js (66)
- sqlhelper (67)
- express.js (67)
- phpmailer (67)
- xjar (70)
- redisclient (78)
- wakeonlan (66)
- tinygo (85)
- startbbs (72)
- webftp (82)
- vsvim (79)
本文暂时没有评论,来添加一个吧(●'◡'●)