网站首页 > 开源技术 正文
RabbitMQ是一个消息中间件,用来接收消息,转发消息。可以认为消息中间件就是一家邮政局,你把信投到邮箱,邮局的人就会帮你递送信件到接收者手上。从这个意义上讲,RabbitMQ兼具邮箱、邮局和邮差的功能。
几种角色
- 生产者:消息的生产者,负责产生消息。
- 队列:RabbitMQ中的消息的缓冲区,用来存放消息,队列仅受限于服务器的内存和磁盘空间。
- 消费者:从RabbitMQ中接收消息,消费消息。
搭建环境
Erlang语言是RabbitMQ的底层语言,安装RabbitMQ需要安装Erlang。在Windows环境下安装示例:
先安装erlang,全部配置默认就行
再安装RabbitMQ,全部配置默认就行
安装后RabbitMQ作为windows的服务自动运行。
写一个Hello World
RabbitMQ支持非常多的语言:
- Python
- Java
- Ruby
- PHP
- C#
- JavaScript
- Go
- Elixir
- Objective-C
- Swift
- Spring AMQP
这里以Java为例,IDE采用IntelliJ:
1.创建Java工程
pom.xml文件如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>rabbitmq-demo</groupId> <artifactId>rabbitmq-java</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.2</version> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>rabbitmq-client</artifactId> </dependency> </dependencies> </project>
2.编写消息发送代码
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
发送者、RabbitMQ服务器和接收者可以运行在不同的地方,只需要将localhost改成RabbitMQ的IP就行。
注意将IntelliJ中的language level设为8,否则有可能编译不通过
3.编写消息接收代码
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
4.测试运行
运行Recv.java
运行Send.java
查看Recv的输出,已经接收到消息
用RabbitMQ做任务分发
此场景为创建工作队列去做耗时的任务,RabbitMQ分发任务到多个消费者。
目的是为了让资源密集型任务不阻塞主进程,将任务封装为一个消息并发送到队列中。
1.编写发送者代码
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class NewTask { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
通过参数传递要发送的消息
2.编写接收者代码
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private final static String TASK_QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } catch (Exception e){ e.printStackTrace(); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }
我们解析接收到的消息,消息中有多少个点就让进程等待几秒,模拟耗时任务。
3.测试运行
首先配置IntelliJ,让Worker.java可以并行运行
运行两次Worker
运行NewTask,我们也会运行多次NewTask,每次传不一样的参数。但是和Worker不同的是NewTask发送完消息就会退出,所以不需要并行运行
首先配置参数
运行多次,一次传递参数:
First message. Second message.. Third message... Fourth message.... Fifth message.....
分别查看两个Worker的输出
可以看到RabbitMQ采用循环的方式轮流给两个worker发送任务。
更复杂的场景请参考官方文档,包括消息是否正确处理,发布订阅默认,RPC等。
猜你喜欢
- 2024-09-08 vue2组件系列第四十二节:NavBar 导航栏
- 2024-09-08 从零开始学Python——使用Selenium抓取动态网页数据
- 2024-09-08 黑客突破macOS的安全防御,新型恶意软件正在偷偷的窃取你的文件
- 2024-09-08 vue2组件系列第三十六节:Lazyload 图片懒加载
- 2024-09-08 Visa|实习面试|2022 暑假(visa issues)
- 2024-09-08 vue2组件系列第二十二节:SwitchCell 开关单元格
- 2024-09-08 vue2组件系列第二十节:按钮式单选组件
- 2024-09-08 vue2组件系列第四十节:NoticeBar 通告栏
- 2024-09-08 vue2组件系列第二十六节:PasswordInput 密码输入框
- 2024-09-08 vue2组件系列第二十四节:多选式按钮
你 发表评论:
欢迎- 07-10公司网站建站选择:人工建站和源码建站分析
- 07-10多用途游戏娱乐新闻网站HTML5模板
- 07-10站长教你搭建属于自己的网站(搭建网站的步骤)
- 07-10php宝塔搭建部署实战响应式塑料封条制品企业网站模板源码
- 07-10自适应响应式汽车配件类网站源码 html5高端大气汽车网站织梦模板
- 07-10网站标签怎么设置?(网站标签怎么设置比较好)
- 07-10PageAdmin企业网站制作中踩过的坑
- 07-10豆包给我输出的html在线象棋源码(有点简单)
- 最近发表
- 标签列表
-
- jdk (81)
- putty (66)
- rufus (78)
- 内网穿透 (89)
- okhttp (70)
- powertoys (74)
- windowsterminal (81)
- netcat (65)
- ghostscript (65)
- veracrypt (65)
- asp.netcore (70)
- wrk (67)
- aspose.words (80)
- itk (80)
- ajaxfileupload.js (66)
- sqlhelper (67)
- express.js (67)
- phpmailer (67)
- xjar (70)
- redisclient (78)
- wakeonlan (66)
- tinygo (85)
- startbbs (72)
- webftp (82)
- vsvim (79)
本文暂时没有评论,来添加一个吧(●'◡'●)