105-RabbitMQ

1 消息队列MQ概述

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

RabbitMQ是一个Erlang开发的AMQP(Advanced Message Queuing Protocol )的开源实现。

1.1 为什么使用MQ

在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

开发中消息队列通常有如下应用场景:

  1. 异步提速:
    任务异步处理将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。

  2. 应用解耦:
    应用程序解耦合,MQ充当中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合

  3. 削峰填谷:
    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用MQ能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

  4. 可恢复性:
    系统的一部分组件失效时,不会影响到整个系统。MQ降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

  5. 排序保证:
    消息队列可以控制数据处理的顺序,因为消息队列本身使用的是队列这个数据结构,FIFO(先进选出),在一些场景数据处理的顺序很重要,比如商品下单顺序等。

1.2. 消息队列产品

市场上常见的消息队列有如下:

  • ActiveMQ:基于JMS实现, 比较均衡, 不是最快的, 也不是最稳定的
  • ZeroMQ:基于C语言开发, 目前最好的队列系统
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好, 数据基本上不会丢失
  • RocketMQ:基于JMS,阿里巴巴产品, 目前已经捐献给apahce, 还在孵化器中孵化
  • Kafka:类似MQ的产品;分布式消息系统,高吞吐量, 目前最快的消息服务器, 不保证数据完整性

1.3. AMQP 和 JMS

Dubbo协议:Dubbo 缺省协议采用单一长连接和 NIO 异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。

HTTP协议(HyperText Transfer Protocol,超文本传输协议)是因特网上应用最为广泛的一种网络传输协议,所有的WWW文件都必须遵守这个标准。

AMQP协议:即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。

1.3.1. AMQP

AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。

1.3.2. JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

1.3.3. AMQP 与 JMS 区别

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式。
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模式;而AMQP的消息模式更加丰富。
JMS AMQP
定义 Java api Wire-protocol
跨语言
跨平台

1.4. RabbitMQ

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

RabbitMQ官方地址:http://www.rabbitmq.com/

RabbitMQ提供了6种模式:Hello Word简单模式,work工作模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式(通配符模式),RPC远程调用模式(远程调用,不太算MQ;不作介绍)

官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

应用场景:

  1. 双十一商品秒杀/抢票功能实现
    我们在双11的时候,当我们凌晨大量的秒杀和抢购商品,然后去结算的时候,就会发现,界面会提醒我们,让我们稍等,以及一些友好的图片文字提醒。而不是像前几年的时代,动不动就页面卡死,报错等来呈现给用户。

  2. 积分兑换(积分可用于多平台)
    积分兑换模块,有一个公司多个部门都要用到这个模块,这时候就可以通过消息队列解耦这个特性来实现。 各部门系统做各部门的事,但是他们都可以用这个积分系统进行商品的兑换等。其他模块与积分模块完全解耦。

  3. 大平台用户注册
    发送邮件、用户大数据分析操作等 基于同步变异步功能实现

    用户注册真实操作步骤:

    1. 用户注册选择的兴趣标签,根据用户的属性,行为进行用户分析,计算出推荐内容
    2. 注册后可能需要发送邮件给用户
    3. 发送短信给用户
    4. 发送给用户指南的系统通知
    5. …等等

    正常情况注册,不出现高并发,假如有大量的用户注册,发生了高并发,就会出现如下情况

    邮件接口承受不住,或是分析信息时的大量计算使 cpu 满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。

1.5 相关定义:

  • Connection: publisher/consumer 和 broker 之间的 TCP 连接
  • Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
  • Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
  • Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
  • VHost: 虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

image-20191226053108367

==由Exchange、Queue、RoutingKey三个才能决定一个消息从Exchange到Queue的唯一的线路。==

2 安装及配置RabbitMQ

2.1 安装

2.1.1 docker安装RabbitMq

  • 下载镜像

    1
    docker pull rabbitmq:management
  • 创建容器

    1
    2
    3
    4
    5
    6
    7
    8
    docker run -di --name=changgou_rabbitmq -p 5671:5617 -p 5672:5672 -p4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management

    # 解释如下:
    15672 (if management plugin is enabled.管理界面 )
    15671 management监听端口
    5672, 5671 (AMQP 0-9-1 without and with TLS 消息队列协议是一个消息协议)
    4369 (epmd) epmd 代表 Erlang 端口映射守护进程
    25672 (Erlang distribution)
  • 访问后台
    浏览器中输入地址

    1
    http://192.168.200.128:15672/
  • 设置容器开机自动启动

    1
    docker update --restart=always 容器ID

image-20191224135410717

2.1.2 windows 安装RabbitMq

RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,在电信领域应用广泛,OTP(Open Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需要安装Erlang/OTP,并保持版本匹配。

安装 Erlang

  1. 下载 Erlang。下载地址:http://erlang.org/download/otp_win64_20.3.exe

  2. 以管理员身份运行此文件进行安装,一直下一步直至完成。

  3. Erlang安装完成需要配置系统环境变量:ERLANG_HOME=C:\Program Files\erl9.3 在path中添加%ERLANG_HOME%\bin;

安装 RabbitMQ

  1. 下载RabbitMQ。下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.14

  2. 以管理员身份运行此文件进行安装,一直下一步直至完成。安装完成后可以在系统服务中查看到RabbitMQ服务。

为了更加方便的管理RabbitMQ服务,可以安装RabbitMQ提供的一个浏览器端管理插件,可以通过浏览器页面方便的进行服务管理。安装方式:

  1. 以管理员身份打开 cmd (不是PowerShell);然后进入在RabbitMQ的安装目录下sbin目录
  2. 在上述窗口执行命令: rabbitmq-plugins.bat enable rabbitmq_management
  3. 打开浏览器访问网站http://localhost:15672进入登录页面,默认账号和密码都为guest

当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang,方法为搜索 RabbitMQ、ErlSrv,将对应的项全部删除。

2.2. 用户以及Virtual Hosts配置

2.2.1. 用户角色

RabbitMQ在安装好后,可以访问http://localhost:15672;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:

image-20191224130727629

角色说明:

  1. 超级管理员(administrator)可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
  2. 监控者(monitoring)可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
  3. 策略制定者(policymaker)可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
  4. 普通管理者(management)仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
  5. 其他无法登陆管理控制台,通常就是普通的生产者和消费者。

2.2.2. Virtual Hosts配置

RabbitMQ的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。相当于mysql的db。Virtual Name一般以/开头。

  1. 创建Virtual Hosts
    image-20191224130914131

  2. 设置Virtual Hosts权限
    image-20191224131458585
    image-20191224131421263

2.2.3 添加队列

image-20191224131854869

持久化:如果选durable,则队列消息自动持久化到磁盘上,如果选transient,则不会持久化;

自动删除:默认值no,如果yes,则在消息队列没有使用的情况下,队列自行删除。

2.2.4 添加交换机

image-20191224132241282

自动删除:默认值no,如果是yes,则在将所有队列与交换机取消绑定之后,交换机将自动删除。

交换机类型:

  • fanout:广播类型
  • direct:路由类型
  • topic:通配符类型,基于消息的路由键路由
  • headers:通配符类型,基于消息的header路由

内部交换器:默认值no,如果是yes,消息无法直接发送到该交换机,必须通过交换机的转发才能到达次交换机。本交换机只能与交换机绑定。

2.2.5 队列与交换机进行绑定

image-20191224132352207

image-20191224132603345

3 Spring Boot整合RabbitMQ

3.1. 简介

在spring boot项目中,只需要引入start-amqp起步依赖,即可整合RabbitMQ成功;我们基于SpringBoot封装的RabbitTemplate模板对象,可以非常方便的发送消息,接收消息(使用注解)。

amqp的官方GitHub地址:https://github.com/spring-projects/spring-amqp

一般在开发过程中,我们有两个角色:

image-20191224123350539

3.2. 搭建步骤:

1、创建父工程:

2、生产者工程:

  1. 创建SpringBoot工程:rabbitmq-producer
  2. 勾选起步依赖坐标:spring for RabbitMQ
  3. 配置RabbitMQ:服务host地址及端口、虚拟主机、服务账户密码

3、消费者工程:

  1. 创建SpringBoot工程:
  2. 勾选起步依赖坐标:spring for RabbitMQ
  3. 配置RabbitMQ:服务host地址及端口、虚拟主机、服务账户密码

3.3. 搭建过程:

3.3.1 创建父工程:

创建父maven空的工程:springboot-rabbitmq-parent

image-20191224124426479

3.3.2 搭建生产者工程

1、创建工程

创建SpringBoot的生产者工程:rabbitmq-producer

image-20191224124514886

2、起步依赖坐标

image-20191224124625375

pom.xml文件内容为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<!--amqp协议的起步依赖坐标-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--rabbit测试依赖坐标-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!--SpringBoot测试依赖坐标-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

3、启动类

1
2
3
4
5
6
@SpringBootApplication
public class SpringbootRabbitmqProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqProducerApplication.class, args);
}
}

4、配置RabbitMQ

配置文件application.properties,内容如下:

1
2
3
4
5
6
7
8
9
10
# RabbitMQ 服务host地址
spring.rabbitmq.host=192.168.200.128
# 端口
spring.rabbitmq.port=5672
# 虚拟主机地址
spring.rabbitmq.virtual-host=/itheima
# rabbit服务的用户名
spring.rabbitmq.username=heima
# rabbit服务的密码
spring.rabbitmq.password=heima

3.3.3 搭建消费者工程

1、创建工程

创建SpringBoot的消费者工程:rabbitmq-consumer

image-20191224125050553

2、勾选起步依赖坐标

image-20191224125207084

pom.xml文件内容为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<!--amqp的起步依赖坐标-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

3、启动类

1
2
3
4
5
6
7
@SpringBootApplication
public class SpringbootRabbitmqConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqConsumerApplication.class, args);
}
}

4、配置RabbitMQ

application.properties,内容如下:

1
2
3
4
5
6
7
8
9
10
# RabbitMQ 服务host地址
spring.rabbitmq.host=192.168.200.128
# 端口
spring.rabbitmq.port=5672
# 虚拟主机地址
spring.rabbitmq.virtual-host=/itheima
# rabbit服务的用户名
spring.rabbitmq.username=heima
# rabbit服务的密码
spring.rabbitmq.password=heima

四、RabbitMQ五种工作模式【重要】

4.1 Hello World简单模式【最简单消息队列模式】

4.1.1 什么是简单模式

1575274339325

在上图的模型中,有以下概念:

  • P生产者: 也就是要发送消息的程序
  • C消费者: 消息的接受者,会一直等待消息到来。
  • queue消息队列: 图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

4.1.2 RabbitMQ管理界面操作

  • 创建simple_queue队列用于演示Hello World简单模式
    07

  • 点击 simple_queue 可以进入到这个queue的管理界面
    08

  • 点击 Get Message 按钮可以获取查看队列中的消息
    09

4.1.3 生产者代码

rabbitmq-producer项目测试代码不使用rabbitTemplate如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.itheima.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
在执行上述的消息发送之后;可以登录rabbitMQ的管理控制台,可以发现队列和其消息:
//主机地址;默认为 localhost
connectionFactory.setHost("localhost");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称;默认为 /
connectionFactory.setVirtualHost("/itcast");
//连接用户名;默认为guest
connectionFactory.setUsername("heima");
//连接密码;默认为guest
connectionFactory.setPassword("heima");
//创建连接
Connection connection = connectionFactory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 要发送的信息
String message = "你好;小兔子!";
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
}

rabbitmq-producer项目测试代码使用rabbitTemplate如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.itheima.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo01TestSimpleQueue {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void contextLoads() {
//向消息队列发送一条简单消息
/**
* 参数1:消息队列名称
* 参数2:消息内容
*/
rabbitTemplate.convertAndSend("simple_queue","hello 小兔子!");
}
}

4.1.4 消费者代码

抽取创建connection的工具类com.itheima.rabbitmq.util.ConnectionUtil;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.itheima.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址;默认为 localhost
connectionFactory.setHost("localhost");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称;默认为 /
connectionFactory.setVirtualHost("/itcast");
//连接用户名;默认为guest
connectionFactory.setUsername("heima");
//连接密码;默认为guest
connectionFactory.setPassword("heima");
//创建连接
return connectionFactory.newConnection();
}
}

rabbitmq-consumer 不使用 RabbitHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.itheima.rabbitmq.simple;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
//确认消息
//channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//监听消息
/**
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
//不关闭资源,应该一直监听消息
//channel.close();
//connection.close();
}
}

使用 RabbitHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.itheima.rabbitmq.simple;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 消费者,接收消息队列消息监听器
* 必须将当前监听器对象注入Spring的容器中
*/
@Component
@RabbitListener(queues = "simple_queue")
public class SimpleListener {

@RabbitHandler
public void simpleHandler(String msg){
System.out.println("=====接收消息====>"+msg);
}
}

然后启动SpringbootRabbitmqConsumerApplication, 就可以接收到RabbitMQ服务器发送来的消息

4.2 Work queues工作队列模式

4.2.1 什么是工作队列模式

image-20191205102457994

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。同一个消息只能由一个消费者消费

【多个节点分片任务处理,提升任务处理的效率】

4.2.2 RabbitMQ管理界面操作

创建 work_queue 队列用于演示work工作队列模式

12

4.2.3 生产者代码

Work Queues 与入门程序的 简单模式 的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。

rabbitmq-producer项目测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.itheima.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo02TestWorkQueue {

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() {
//向工作队列发送1千条消息
for (int i = 0; i < 1000; i++) {
/**
* 参数1:消息队列名称
* 参数2:消息内容
*/
rabbitTemplate.convertAndSend("work_queue","hello 我是小兔子【"+i+"】!");
}
}
}

4.2.4 消费者代码

  • rabbitmq-consumer项目创建监听器1:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.itheima.rabbitmq.work;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 工作队列:消费者接收监听器1,接收来自消息队列中的消息
*/
@Component
@RabbitListener(queues = "work_queue")
public class WorkListener1 {

@RabbitHandler
public void workHandler(String msg){
System.out.println("=====工作队列接收消息端1====>"+msg);
}
}
  • rabbitmq-consumer项目创建监听器2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.itheima.rabbitmq.work;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 工作队列:消费者接收监听器2,接收来自消息队列中的消息
*/
@Component
@RabbitListener(queues = "work_queue")
public class WorkListener2 {

@RabbitHandler
public void workHandler(String msg){
System.out.println("=====工作队列消息接收端2====>"+msg);
}
}

4.3 三种模式概览

image-20191205102917088

前面2个案例中,只有3个角色:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分

而在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

Exchange有常见以下3种类型:

  • Fanout:广播 将消息交给所有绑定到交换机的队列, 不处理路由键。只需要简单的将队列绑定到交换机上。fanout 类型交换机转发消息是最快的。
  • Direct:定向 把消息交给符合指定routing key 的队列. 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为 “dog” 的消息才被转发,不会转发 dog.puppy,也不会转发 dog.guard,只会转发dog。
    其中,路由模式使用的是 direct 类型的交换机。
  • Topic:主题通配符) 把消息交给符合routing pattern(路由模式)的队列. 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号 “#” 匹配一个或多个词,符号”*“匹配不多不少一个词。因此“audit.#” 能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到 “audit.irs”。
    其中,主题模式(通配符模式)使用的是 topic 类型的交换机。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失

4.4 Publish/Subscribe发布与订阅模式

4.4.1 什么是发布订阅模式

image-20191205102917088

发布订阅模式:

  1. 每个消费者监听自己的队列。
  2. 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

【广播消息:一次性将消息发送给所有消费者,每个消费者收到消息均一致】

4.4.2 RabbitMQ管理界面操作

  • 创建两个队列 fanout_queue1和 fanout_queue2
    image-20191226053743533

  • 创建Exchange交换器 fanout_exchange
    image-20191226053920763

  • 将创建的fanout_exchange交换器和 fanout_queue1, fanout_queue2队列绑定
    image-20191226054038545

4.4.3 生产者代码

rabbitmq-producer项目测试代码如下:

不使用 springboot 注入代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.itheima.rabbitmq.ps;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 发布与订阅使用的交换机类型为:fanout
*/
public class Producer {
//交换机名称
static final String FANOUT_EXCHAGE = "fanout_exchange";
//队列名称
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
//队列名称
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws Exception {
//创建连接
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
/**
* 声明交换机
* 参数1:交换机名称
* 参数2:交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
//队列绑定交换机
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");
channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");
for (int i = 1; i <= 10; i++) {
// 发送信息
String message = "你好;小兔子!发布订阅模式--" + i;
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());
System.out.println("已发送消息:" + message);
}
// 关闭资源
channel.close();
connection.close();
}
}

使用 springboot 注入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.itheima.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
* 目标:将消息发送给交换机,通过交换机广播给消息队列,路由键为空字符串
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo03TestPublishAndSubscribe {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void contextLoads() {
//向广播交换机发送1千条消息
for (int i = 0; i < 1000; i++) {
/**
* 参数1:交换机名称
* 参数2:路由键
* 参数3:消息内容
*/
rabbitTemplate.convertAndSend("fanout_exchange","","hello 我是小兔子【"+i+"】!");
}
}
}

4.4.4 消费者代码

rabbitmq-consumer项目创建监听器:

不使用 springboot 注入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.itheima.rabbitmq.ps;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
//队列绑定交换机
channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, "");
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-
8"));
}
};
//监听消息
/**
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);
}
}

使用 springboot 注入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.itheima.rabbitmq.pubandsub;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 发布订阅模式:消息监听1
*/
@Component
@RabbitListener(queues = "fanout_queue1")
public class PubAndSubListener1 {

@RabbitHandler
public void pubAndSubHandler(String msg){
System.out.println("=====发布订阅模式接收消息端【1】=====>"+msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.itheima.rabbitmq.pubandsub;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 发布订阅模式:消息监听2
*/
@Component
@RabbitListener(queues = "fanout_queue2")
public class PubAndSubListener2 {

@RabbitHandler
public void pubAndSubHandler(String msg){
System.out.println("=====发布订阅模式接收消息端【2】=====>"+msg);
}
}

4.4.5 小结

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别:

  1. 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
  2. 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
  3. 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。

4.5 Routing路由模式

4.5.1 什么是路由模式

路由模式特点:队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)消息的发送方在向 Exchange发送消息时,也必须指定消息的RoutingKey。Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息.

image-20191205103846484

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

【有选择性的接收消息】

4.5.2 RabbitMQ管理界面操作

  1. 创建两个队列分别叫做 routing_queue1routing_queue2 用户演示
    image-20191226055442517

  2. 创建交换器 routing_exchange , 类型为 direct , 用于演示路由模式
    image-20191226055720887

  3. 设置绑定: 将创建的交换器 routing_exchangerouting_queue1 , routing_queue2 绑定在一起, 路由键Routing Key分别为 infoerror
    image-20191226055838905

4.5.3 生产者代码

rabbitmq-producer项目测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.itheima.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
* 目标:将消息发送给交换机,通过交换机路由给指定的消息队列,通过路由键类指定
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo04TestRoutingModel {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void contextLoads() {
//向路由交换机发送1千条消息
for (int i = 0; i < 1000; i++) {
/**
* 参数1:交换机名称
* 参数2:路由键:error , info ,指定要投递的消息队列
* 参数3:消息内容
*/
if(i%2 == 0){
rabbitTemplate.convertAndSend("routing_exchange","info","hello 我是小兔子【"+i+"】!");
} else {
rabbitTemplate.convertAndSend("routing_exchange","error","hello 我是小兔子【"+i+"】!");
}
}
}
}

4.5.4 消费者代码

rabbitmq-consumer项目创建监听器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.itheima.rabbitmq.routing;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路由模式:消息队列接收监听器1,接收来自路由模式发送的消息
*/
@Component
@RabbitListener(queues = "routing_queue1")
public class RoutingListener1 {

@RabbitHandler
public void routingHandler(String msg){
System.out.println("=====路由模式消息接收监听器【1】=====>"+msg);
}
}

rabbitmq-consumer项目创建监听器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.itheima.rabbitmq.routing;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路由模式:消息队列接收监听器2,接收来自路由模式发送的消息
*/
@Component
@RabbitListener(queues = "routing_queue2")
public class RoutingListener2 {

@RabbitHandler
public void routingHandler(String msg){
System.out.println("=====路由模式消息接收监听器【2】=====>"+msg);
}
}

4.6 Topics通配符模式(主题模式)

4.6.1 什么是通配符(主题)模式

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!

Routingkey: 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:item.insert

通配符规则:

  • #:匹配一个或多个词,多个词用点号分隔
  • *:匹配不多不少恰好1个词

举例:

  • item.# 能够匹配item.insert.abc.bbc或者item.insert
  • item.* 只能匹配item.insert

image-20191205104428234

【基于通配符接收消息】

4.6.2 RabbitMQ管理界面操作

  1. 创建队列 topic_queue1topic_queue1
    image-20191226060931347

  2. 创建交换器 topic_exchange , type类型为 topic
    image-20191226061017346

  3. 设置绑定:
    topic_queue1绑定的Routing Key路由键为item.*
    topic_queue2绑定的Routing Key路由键为item.#
    image-20191226061135494

4.6.3 生产者代码

rabbitmq-producer项目测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.itheima.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
* 目标:将消息发送给交换机,通过交换机路由给指定的消息队列,路由键使用通配符
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo05TestTopicModel {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void contextLoads() {
//向通配符交换机发送消息
rabbitTemplate.convertAndSend("topic_exchange","item.insert","hello 我是小兔子,路由键item.insert");
rabbitTemplate.convertAndSend("topic_exchange","item.insert.abc","hello 我是小兔子,路由键:item.insert.abc");
}
}

4.6.4 消费者代码

  • rabbitmq-consumer项目创建监听器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.itheima.rabbitmq.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 通配符模式:消息队列接收监听器1,接收来自通配符模式发送的消息
*
*/
@Component
@RabbitListener(queues = "topic_queue1")
public class TopicListener1 {

@RabbitHandler
public void topicHandler(String msg){
System.out.println("=====通配符模式消息接收监听器【1】=====>"+msg);
}
}
  • rabbitmq-consumer项目创建监听器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.itheima.rabbitmq.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 通配符模式:消息队列接收监听器1,接收来自通配符模式发送的消息
*
*/
@Component
@RabbitListener(queues = "topic_queue2")
public class TopicListener2 {

@RabbitHandler
public void topicHandler(String msg){
System.out.println("=====通配符模式消息接收监听器【2】=====>"+msg);
}
}

4.7 模式总结RabbitMQ

  1. 简单模式 HelloWorld: 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

  2. 工作队列模式 Work Queue: 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

  3. 发布订阅模式 Publish/subscribe: 需要设置类型为 ==fanout== 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息广播发送到绑定的队列

  4. 路由模式 Routing: 需要设置类型为 ==direct== 的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

  5. 通配符模式 Topic: 需要设置类型为 ==topic== 的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列


105-RabbitMQ
https://flepeng.github.io/021-Java-01-course-105-RabbitMQ/
作者
Lepeng
发布于
2020年2月2日
许可协议