编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

消息中间件RabbitMQ入门详解(消息中间件mq作用)

wxchong 2024-09-08 10:46:03 开源技术 9 ℃ 0 评论
RabbitMQ是一个消息中间件,用来接收消息,转发消息。可以认为消息中间件就是一家邮政局,你把信投到邮箱,邮局的人就会帮你递送信件到接收者手上。从这个意义上讲,RabbitMQ兼具邮箱、邮局和邮差的功能。

几种角色

  • 生产者:消息的生产者,负责产生消息。
  • 队列:RabbitMQ中的消息的缓冲区,用来存放消息,队列仅受限于服务器的内存和磁盘空间。
  • 消费者:从RabbitMQ中接收消息,消费消息。

搭建环境

Erlang语言是RabbitMQ的底层语言,安装RabbitMQ需要安装Erlang。在Windows环境下安装示例:

先安装erlang,全部配置默认就行

再安装RabbitMQ,全部配置默认就行

安装后RabbitMQ作为windows的服务自动运行。

写一个Hello World

RabbitMQ支持非常多的语言:

  • Python
  • Java
  • Ruby
  • PHP
  • C#
  • JavaScript
  • Go
  • Elixir
  • Objective-C
  • Swift
  • Spring AMQP

这里以Java为例,IDE采用IntelliJ:

1.创建Java工程

pom.xml文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>rabbitmq-demo</groupId>
 <artifactId>rabbitmq-java</artifactId>
 <version>1.0-SNAPSHOT</version>
<dependencies>
 <dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>5.5.2</version>
 </dependency>
 <dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>rabbitmq-client</artifactId>
 </dependency>
</dependencies>
</project>

2.编写消息发送代码

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
 
public class Send { 
 
 private final static String QUEUE_NAME = "hello"; 
 
 public static void main(String[] argv) throws Exception { 
 ConnectionFactory factory = new ConnectionFactory(); 
 factory.setHost("localhost"); 
 try (Connection connection = factory.newConnection(); 
 Channel channel = connection.createChannel()) { 
 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
 String message = "Hello World!"; 
 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); 
 System.out.println(" [x] Sent '" + message + "'"); 
 } 
 } 
} 

发送者、RabbitMQ服务器和接收者可以运行在不同的地方,只需要将localhost改成RabbitMQ的IP就行。

注意将IntelliJ中的language level设为8,否则有可能编译不通过

3.编写消息接收代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
 private final static String QUEUE_NAME = "hello";
 public static void main(String[] argv) throws Exception {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("localhost");
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 String message = new String(delivery.getBody(), "UTF-8");
 System.out.println(" [x] Received '" + message + "'");
 };
 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
 }
}

4.测试运行

运行Recv.java

运行Send.java

查看Recv的输出,已经接收到消息

用RabbitMQ做任务分发

此场景为创建工作队列去做耗时的任务,RabbitMQ分发任务到多个消费者。

目的是为了让资源密集型任务不阻塞主进程,将任务封装为一个消息并发送到队列中。

1.编写发送者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class NewTask {
 private final static String QUEUE_NAME = "hello";
 public static void main(String[] argv) throws Exception {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("localhost");
 try (Connection connection = factory.newConnection();
 Channel channel = connection.createChannel()) {
 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 String message = String.join(" ", argv);
 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
 System.out.println(" [x] Sent '" + message + "'");
 }
 }
}

通过参数传递要发送的消息

2.编写接收者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
 private final static String TASK_QUEUE_NAME = "hello";
 public static void main(String[] argv) throws Exception {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("localhost");
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 String message = new String(delivery.getBody(), "UTF-8");
 System.out.println(" [x] Received '" + message + "'");
 try {
 doWork(message);
 } catch (Exception e){
 e.printStackTrace();
 }
 finally {
 System.out.println(" [x] Done");
 }
 };
 boolean autoAck = true; // acknowledgment is covered below
 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
 }
 private static void doWork(String task) throws InterruptedException {
 for (char ch: task.toCharArray()) {
 if (ch == '.') Thread.sleep(1000);
 }
 }
}

我们解析接收到的消息,消息中有多少个点就让进程等待几秒,模拟耗时任务。

3.测试运行

首先配置IntelliJ,让Worker.java可以并行运行

运行两次Worker

运行NewTask,我们也会运行多次NewTask,每次传不一样的参数。但是和Worker不同的是NewTask发送完消息就会退出,所以不需要并行运行

首先配置参数

运行多次,一次传递参数:

First message.
Second message..
Third message...
Fourth message....
Fifth message.....

分别查看两个Worker的输出

可以看到RabbitMQ采用循环的方式轮流给两个worker发送任务。

更复杂的场景请参考官方文档,包括消息是否正确处理,发布订阅默认,RPC等。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表