【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

分类: 365bet新地址 时间: 2025-10-06 09:57:51 作者: admin 阅读: 5980
【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

目录

分布式系统通信方式

MQ选型与应用场景

应用场景(优势)

RabbitMQ工作模型

RabbitMQ简介

RabbitMQ 工作模型(流程)​编辑

Docker安装配置RabbitMQ

RabbitMQ管理控制台

RabbitMQ 简单模式构建生产者

RabbitMQ 简单模式构建消费者

RabbitMQ工作模式 - WorkQueues

RabbitMQ工作模式 - 发布订阅

RabbitMQ工作模式 - 路由模式

RabbitMQ工作模式 - 通配符模式

RabbitMQ集成SpringBoot(上) - 异步解耦发送短信

RabbitMQ集成SpringBoot(下) - 监听消费短信发送

消息的可靠性投递Confirm机制

消息的可靠性投递Return机制

消费端可靠性ACK机制

RabbitMQ 消费者消息限流

RabbitMQ ttl特性控制短信队列超时

1. 对整个队列设置ttl:

2. 对单独一个消息进行ttl设置

RabbitMQ 死信队列的实现

代码实现

本章小结

作业

MQ:Message Queue(消息队列),是在消息传输的过程中,把消息保存到的一个容器。消息可以是我们产生的一些信息数据,一般都都是传字符串或者json字符串。消息队列主要用于一些分布式系统或者微服务系统之间的通信

分布式系统通信方式

远程调用:同步借助中间件进行通信:异步,中间件有mq,zookeeper,redis

MQ选型与应用场景

RabbitMQ:erlang高并发能力不错,springboot推荐集成,社区活跃资料多。并发能力和性能都不错。ActiveMQ:apache出品,目前使用量不多,不如以前,性能也不如其他。RocketMQ:阿里出品,吞吐量很高,是Rabbit的10倍,十万左右。功能很全,扩展性也很好。金融类项目优先推荐使用。Kafka:apache出品,侧重大数据领域用的很多,吞吐量10万级。只支持主要的一些MQ功能。ZeroMQ,MetaMQ,RedisMQ 简单来说,RabbitMQ的综合能力比较好。中小型公司用的也比较多,因为足够满足自身业务,所以后期一旦公司发展可以考虑转型使用RocketMQ,因为rocket是java开发的,可以自己去重构,而rabbit是erlang语言,极不容易修改。未来有机会可以再开Rocket来聊一聊

应用场景(优势)

异步任务

一些耗时很长的操作本身是同步执行的,可以借助mq通知消费者去处理,这个过程是异步的。从而减少了前端用户的等等的响应时间。比如云端操作重装系统,可能需要1分钟左右的时间,用户不可能在页面上等着吧。那么生产者可以向消费者发出一个重装的通知,让其进行重装操作。前端用户可以继续做别的操作,等重装完毕以后,再通知用户即可。因为这种操作不需要及时性,延后通知即可。这也是暂时性的不一致,MQ是最终一致性。提速

本来前端用户都要等着处理完毕的结果响应,现在异步可以直接返回接口,减少了等待的时间,如此一来大大提高了前端用户的等待时间,用户体验更高了。也就是说当前的请求接口的吞吐量就大大提高了。接口解耦

MQ就相当于是工厂创建的微信群,把批发商拉进群,让他们进行监听,起到了接口之间解耦的作用。同时也是一种广播机制。在我们的系统中,如果一个接口里要调用多个远程服务,比如下单的同时需要调用库存、支付、物流、日志、积分等多个服务,那么如果是同步进行,一旦一个环节出了问题,那么整个接口就崩了,如此整个系统的容错性太低了; 如果使用mq解耦,那么哪怕要坏也是只坏一个环节,大大降低了发生问题的风险。削峰填谷

这个是属于高并发的场景,举个例子,工厂有10万件衣服,需要快速清仓,现在所有的批发商的清货能力只能在5万件左右,而且也没有那么多钱进货。所以通过MQ这个中介,工厂把衣服都放入中介,批发商慢慢的把衣服卖出去以后再把后续的5万件衣服进货不就行了?这就是瞬时高并发所遇到的情况,就比如秒杀,服务器里redis啊数据库等处理能力不高,流量太大,那我们把请求放入到mq,如此一来,后续的数据慢慢处理就行了。这也就和餐厅吃饭在外面排队等位是一个道理。处理不来,就慢慢排队等着。使用MQ之前,高并发的时候,瞬时请求量很大,会达到5000以上,这个时候的当前时间就是高峰,服务器压力不行势必会打死。使用MQ之后,限制速度为1000个并发,哪怕5000甚至上万的请求进来,也会被限速,那么这就是削峰,消息的消费平均速度会维持在1000左右,哪怕高峰期过来,速度也是1000左右,那么这就是填谷。举个例子,饭店吃饭高峰期,人流量很多,这个时候不可能直接进去吃饭把,餐桌就那么点,客户太多了,没办法,只能取号排队把,深有体会,排队的过程就是慢慢的消费,削峰填谷

RabbitMQ工作模型

RabbitMQ简介

RabbitMQ基于AMQP协议,Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息中间件设计的。基于这个协议的客户端和消息中间件可以传递消息,并且不会因为客户端或中间件产品的不同所首先,也不会受到不同开发语言的限制。比如用java、php、.net都可以使用消息队列,并且只要支持AMQP这个协议,不论什么客户端,都可以相互传输消息。而且甚至与你只要遵循amqp协议,自己也能开发一套消息队列产品也没问题。

RabbitMQ就是基于amqp所开发的一套产品,只是他是基于erlang语言开发的。它是一种应用于应用之间的通信方法,MQ在分布式系统中应用非常广泛。

官网地址:RabbitMQ: One broker to queue them all | RabbitMQ

RabbitMQ 工作模型(流程)

RabbitMQ:消息队列服务中间件生产者produce:创建消息,发送到mq消费者Consumer:监听消息,处理业务通知Exchange:MQ的交换机,他会按照一定的规则来转发到某个队列,类似controller路由机制。比如 /passport/sms。也就是分发网络请求的功能,和我们现实生活中的交换机是一个意思。Queue:队列,存储消息。相当于controller。被消费者监听,一旦有消息,会被消费者接收到。Binding-Routes:交换机和队列的绑定关系,通过路由结合在一起,消息如何通过交换机发送到队列里,是通过路由机制,类似于@RequestMapping,路由规则一旦匹配,那么就可以存储对应的消息。Channel:生产者和消费者与MQ建立的通道,相当于httpSession会话,建立请求就会有channel。可以理解为一个桥梁作用,消息经过桥梁到达mq的queue。Channel的目的是为了管理每次请求到RabbitMQ-server的连接connection,如此才能更好的节约资源的开支,提高通信效率。

Docker安装配置RabbitMQ

docker pull rabbitmq:management

运行mq

docker run --name rabbitmq \

-p 5671:5671 \

-p 5672:5672 \

-p 4369:4369 \

-p 15671:15671 \

-p 15672:15672 \

-p 25672:25672 \

--restart always \

-d rabbitmq:management

如果忘记加上自动重启,可以运行如下脚本:

docker update rabbitmq --restart=always

5671: AMQP端口 5672: AMQP端口 15672: 管理平台 4369,25672: erlang发现与集群端口 616131,61614: stomp协议端口 1883, 8883: MQTT协议端口 …… 更多参照:Networking and RabbitMQ | RabbitMQ

运行成功

查看具体版本:

docker image inspect rabbitmq:management|grep -i version

访问管理界面:

​​​​​​http://192.168.1.112:15672/

默认用户名密码为:guest guest 进入后即可使用:

​​​​​​http://192.168.1.112:15672/

RabbitMQ管理控制台

RabbitMQ的管理控制台界面相当友好,可视化程度很不错。

概览信息:

connections:客户端的连接channels:没有connections就没有channelsexchanges:交换机,内部有默认的定义好的名字queues:队列,可以通过交换机点击绑定的进入admin:管理设置,可以创建账号以及分区virtul host 可以理解为分区,不同项目使用

RabbitMQ 简单模式构建生产者

本节课开始我们将会使用java代码来构建rabbitmq的各种通信,首先我们来学习的是构建消息的生产者。

RabbitMQ Tutorials | RabbitMQ

RabbitMQ tutorial - "Hello World!" | RabbitMQ

简单模式:通过生产者发消息给队列,消费者监听收到消息

为了更加清晰的看到生产者与消费者,我们可以创建两个子模块工程:

分别添加依赖坐标:

com.rabbitmq

amqp-client

5.16.0

创建生产者:​​​​​​​

/**

* 构建生产者,发送消息

*/

public class FooProducer {

public static void main(String[] args) throws Exception {

// 1. 创建连接工厂以及参数配置

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.1.122");

factory.setPort(5672);

factory.setVirtualHost("/");

factory.setUsername("imooc");

factory.setPassword("imooc");

// 2. 创建连接Connection

Connection connection = factory.newConnection();

// 3. 创建管道Channel

Channel channel = connection.createChannel();

// 4. 创建队列Queue(简单模式不需要交换机Exchange)

/**

* queue: 队列名

* durable: 是否持久化,true:重启后,队列依然存在,否则不存在

* exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false

* autoDelete: 是否自动删除,true:当没有消费者的时候,自动删除队列

* arguments: map类型其他参数

*/

channel.queueDeclare("hello", true, false, false, null);

// 5. 向队列发送消息

/**

* exchange: 交换机名称,简单模式没有,直接设置为 ""

* routingKey: 路由key,映射路径,如果交换机是默认"",则路由key和队列名一致

* props: 配置信息

* body: 消息数据

*/

String msg = "Hello ~";

channel.basicPublish("", "hello", null, msg.getBytes());

// 7. 释放

channel.close();

connection.close();

}

}

运行后查看控制台:

RabbitMQ 简单模式构建消费者

/**

* 构建消费者,监听消息

*/

public class FooConsumer {

public static void main(String[] args) throws Exception {

// 1. 创建连接工厂以及参数配置

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.1.122");

factory.setPort(5672);

factory.setVirtualHost("/");

factory.setUsername("imooc");

factory.setPassword("imooc");

// 2. 创建连接Connection

Connection connection = factory.newConnection();

// 3. 创建管道Channel

Channel channel = connection.createChannel();

// 4. 创建队列Queue(简单模式不需要交换机Exchange)

/**

* queue: 队列名

* durable: 是否持久化,true:重启后,队列依然存在,否则不存在

* exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false

* autoDelete: 是否自动删除,true:当没有消费者的时候,自动删除队列

* arguments: map类型其他参数

*/

channel.queueDeclare("hello", true, false, false, null);

// 5. 监听并消费消息

/**

* queue: 监听的队列名

* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知

* callback: 回调函数,处理监听的消息

*/

Consumer consumer = new DefaultConsumer(channel) {

/**

* 重写消息配送方法

* @param consumerTag: 消息标签(标识)

* @param envelope: 信封(一些信息,比如交换机路由等信息)

* @param properties: 配置信息,和生产者的一致

* @param body: 消息数据

* @throws IOException

*/

@Override

public void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body) throws IOException {

System.out.println("consumerTag = " + consumerTag);

System.out.println("envelope = " + envelope.toString());

System.out.println("properties = " + properties.toString());

System.out.println("body = " + new String(body));

super.handleDelivery(consumerTag, envelope, properties, body);

}

};

channel.basicConsume("hello", true, consumer);

// 不需要关闭连接,则持续监听

}

}

监听结果:

RabbitMQ工作模式 - WorkQueues

工作队列没有交换机,生产者发送消息给队列,队列有两个或者多个消费者监听进行消费,但是所有的消息是会被消费者以竞争的关系进行消费,所以队列里的消息也称之为工作的任务,任务由一个人完成了就不需要被第二个人完成。所以这个队列里的所有消息只会被某一个消费者进行消费读取。

使用场景:如果任务量很大很多,而一个消费者处理不过来,则此时可以使用工作队列,比如短信发送在一个系统里会有很多场景进行发送给用户,所以量很大的时候,也可以分配给多个消费者去进行消费发短信即可。这就像上班工作量很大,就需要招人共同完成一些任务是一个道理。

构建生产者: 代码可以从简单模式复制过来进行修改

......

channel.queueDeclare("work_queue", true, false, false, null);

Integer tasks[] = {};

for (int i = 0 ; i < 10 ; i ++) {

String msg = "开始上班,任务[" + i + "]";

channel.basicPublish("", "work_queue", null, msg.getBytes());

}

......

构建两个消费者: 只需要复制简单模式再修改队列名即可

...

channel.queueDeclare("work_queue", true, false, false, null);

...

channel.basicConsume("work_queue", true, consumer);

...

从运行结果可以开得出来,这也是类似于负载均衡的轮询效果。

RabbitMQ工作模式 - 发布订阅

之前的工作队列是一个消息只能被一个消费者处理消费,发布订阅则不是,只要监听队列,所有的消费者都能消费同一个消息,这类似与公众号的订阅,比如我和你都订阅了慕课网的公众号,慕课网只要发布一个新的文章,那么我们都可以同时收到,这就是发布订阅模式。 需要注意,这里使用了交换机,因为不同的用户组可以订阅不同的队列,所以通过交换机来绑定管理并且把消息路由到不同的队列即可。 交换机的类型:

Fanout:广播模式,把消息发送给所有绑定的队列Direct:定向投递,把消息发送给指定的routing key的队列Topic:通配符模式,把消息交给符合routing pattern的队列 需要注意,交换机只负责转发消息,不会存储消息,存储消息的职责是队列的,不要搞混噢。

生产者代码:

......

// 创建交换机

/**

* exchange: 交换机名称

* type: 交换机类型

* FANOUT("fanout"): 广播模式,把消息发送给所有绑定的队列

* DIRECT("direct"): 定向投递,把消息发送给指定的`routing key`的队列

* TOPIC("topic"): 通配符模式,把消息交给符合`routing pattern`的队列

* HEADERS("headers"): 使用率不多,参数匹配

* durable: 是否持久化

* autoDelete: 是否自动删除

* internal: 内部意思,true:表示当前Exchange是RabbitMQ内部使用,用户创建的队列不会消费该类型交换机下的消息,所以我们自己使用设置为false即可

* arguments: 参数

*/

String exchange = "fanout_exchange";

channel.exchangeDeclare(exchange,

BuiltinExchangeType.FANOUT, true, false, false, null);

// 创建两个队列

String fanout_queue_a = "fanout_queue_a";

String fanout_queue_b = "fanout_queue_b";

channel.queueDeclare("fanout_queue_a", true, false, false, null);

channel.queueDeclare("fanout_queue_b", true, false, false, null);

// 绑定交换机和队列

/**

* queue

* exchange

* routingKey: 路由key,绑定规则,这里暂不使用(fanout本身广播给所有订阅者,所以没有路由规则,使用空字符串即可)

*/

channel.queueBind(fanout_queue_a, exchange, "");

channel.queueBind(fanout_queue_b, exchange, "");

for (int i = 0 ; i < 10 ; i ++) {

String msg = "开始上班,任务[" + i + "]";

channel.basicPublish(exchange, "", null, msg.getBytes());

}

channel.close();

connection.close();

运行查看:

消费者: 复制工作模式的消费者代码,只需要修改定义的队列名即可

......

String fanout_queue_a = "fanout_queue_a";

channel.queueDeclare(fanout_queue_a, true, false, false, null);

......

channel.basicConsume(fanout_queue_a, true, consumer);

......

运行结果:

RabbitMQ工作模式 - 路由模式

路由模式routing可以针对不同的类别进行路由,比如图中,可以控制不同的日志级别进行路由,这就相当于控制器的@RequestMapping,请求的url地址根据不同的映射路径进行controller接口的调用,原理一致。

生产者代码: 可以直接拷贝发布订阅模式进行修改

...

String exchange = "routing_exchange";

channel.exchangeDeclare(exchange,

BuiltinExchangeType.DIRECT, true, false, false, null);

// 创建两个队列

String routing_queue_order = "routing_queue_order";

String routing_queue_pay = "routing_queue_pay";

channel.queueDeclare(routing_queue_order, true, false, false, null);

channel.queueDeclare(routing_queue_pay, true, false, false, null);

...

// 订单的创建/更新/删除都走订单队列进行消费;订单的支付走独立的队列进行消费

channel.queueBind(routing_queue_order, exchange, "order_create");

channel.queueBind(routing_queue_order, exchange, "order_delete");

channel.queueBind(routing_queue_order, exchange, "order_update");

channel.queueBind(routing_queue_pay, exchange, "order_pay");

// 根据不同的路由key进行消息的发送

String msg1 = "创建订单A";

String msg2 = "创建订单B";

String msg3 = "删除订单C";

String msg4 = "修改订单D";

String msg5 = "支付订单E";

String msg6 = "支付订单F";

channel.basicPublish(exchange, "order_create", null, msg1.getBytes());

channel.basicPublish(exchange, "order_create", null, msg2.getBytes());

channel.basicPublish(exchange, "order_delete", null, msg3.getBytes());

channel.basicPublish(exchange, "order_update", null, msg4.getBytes());

channel.basicPublish(exchange, "order_pay", null, msg5.getBytes());

channel.basicPublish(exchange, "order_pay", null, msg6.getBytes());

...

运行生产者,查看:

消费者代码: 拷贝发布定义模式的代码,修改队列名称即可

​​​​​​​

...

String routing_queue_order = "routing_queue_order";

channel.queueDeclare(routing_queue_order, true, false, false, null);

...

channel.basicConsume(routing_queue_order, true, consumer);

...

运行结果:

RabbitMQ工作模式 - 通配符模式

topics通配符模式是功能最强大也是用的最多的一个mq工作模式。 通配符模式其实是路由模式的进阶版,这个时候的routing key可以写成通配符,只要符合规则即可进行消息的路由。

通配符可以有两种形式:

*:匹配一个字段

比如:order.*.* 可以匹配 order.do.delete,order.create.finish#:匹配0个或者多个字段

比如 #.order.# 可以匹配 order.do.delete,check.order.if.finish

生产者代码:

...

String exchange = "topics_exchange";

channel.exchangeDeclare(exchange,

BuiltinExchangeType.TOPIC, true, false, false, null);

// 创建两个队列

String topics_queue_order = "topics_queue_order";

String topics_queue_pay = "topics_queue_pay";

channel.queueDeclare(topics_queue_order, true, false, false, null);

channel.queueDeclare(topics_queue_pay, true, false, false, null);

...

channel.queueBind(topics_queue_order, exchange, "order.*");

channel.queueBind(topics_queue_pay, exchange, "#.pay.#");

// 根据不同的路由key进行消息的发送

String msg1 = "创建订单A";

String msg2 = "创建订单B";

String msg3 = "删除订单C";

String msg4 = "修改订单D";

String msg5 = "支付订单E";

String msg6 = "支付订单F";

channel.basicPublish(exchange, "order.create", null, msg1.getBytes());

channel.basicPublish(exchange, "order.create", null, msg2.getBytes());

channel.basicPublish(exchange, "order.delete", null, msg3.getBytes());

channel.basicPublish(exchange, "order.update", null, msg4.getBytes());

channel.basicPublish(exchange, "order.pay", null, msg5.getBytes());

channel.basicPublish(exchange, "imooc.order.pay.display", null, "慕课网订单支付".getBytes());

channel.basicPublish(exchange, "supermarket.pay", null, "超市支付".getBytes());

...

运行后结果

消费者代码: 只需要修改队列名即可

...

String topics_queue_pay = "topics_queue_pay";

channel.queueDeclare(topics_queue_pay, true, false, false, null);

...

channel.basicConsume(topics_queue_pay, true, consumer);

...

RabbitMQ集成SpringBoot(上) - 异步解耦发送短信

引入mq依赖:

org.springframework.boot

spring-boot-starter-amqp

在api子工程中创建短信发送的短信配置类:

/**

* RabbitMQ 的配置类

*/

@Configuration

public class RabbitMQSMSConfig {

// 定义交换机的名字

public static final String SMS_EXCHANGE = "sms_exchange";

// 定义队列的名字

public static final String SMS_QUEUE = "sms_queue";

// 创建交换机

@Bean(SMS_EXCHANGE)

public Exchange exchange() {

return ExchangeBuilder.topicExchange(SMS_EXCHANGE).durable(true).build();

}

// 创建队列

@Bean(SMS_QUEUE)

public Queue queue() {

// return new Queue(SMS_QUEUE);

return QueueBuilder.durable(SMS_QUEUE).build();

}

// 定义队列绑定到交换机的关系

@Bean

public Binding smsBinding(

@Qualifier(SMS_EXCHANGE) Exchange exchange,

@Qualifier(SMS_QUEUE) Queue queue

) {

return BindingBuilder.bind(queue).to(exchange).with("imooc.sms.#").noargs(); // 执行绑定

}

}

异步解耦短信发送:

spring:

rabbitmq:

host: 192.168.1.122

port: 5672

username: rabbitmq

password: rabbitmq

virtual-host: imooc-space

@Autowired

private RabbitTemplate rabbitTemplate;

...

// 把短信内容和手机号构建为一个bean并且转换为json作为消息发送给mq

rabbitTemplate.convertAndSend(

RabbitMQSMSConfig.SMS_EXCHANGE,

"imooc.sms.login.send",

bodyJson);

...

RabbitMQ集成SpringBoot(下) - 监听消费短信发送

创建一个资源服务service-resource-4001,目前用于消费者监听

@Component

@Slf4j

public class RabbitMQSMSConsumer {

/**

* 监听队列,并且处理消息

*/

@RabbitListener(queues = {RabbitMQSMSConfig.SMS_QUEUE})

public void watchQueue(String payload, Message message) {

log.info("payload = " + payload);

String routingKey = message.getMessageProperties().getReceivedRoutingKey();

log.info("routingKey = " + routingKey);

String msg = payload;

SMSContentQO smsContent = GsonUtils.stringToBean(msg, SMSContentQO.class);

// 发送短信 TODO

}

}

最终运行测试即可,实现短信发送的异步解耦。

消息的可靠性投递Confirm机制

那么现在我们在进行短信发送的时候,直接发送就可以了。假设,现在我们要确保消息不被丢失,是可靠的发送到mq服务,并且也能够确保被消费了。这个时候应该怎么处理呢?这个其实就是消息的可靠性。 因为生产者发消息到交换机再到队列,这之间是有链路的,每个链路的过程都有可能出错,导致消息没有正常发送。

RabbitMQ有两种方式来控制消息可靠性。

confirm 确认模式

消息一旦从生产者发给MQ server,也就是被交换机接收到,这个时候会有一个回调 confirmCallback,代表mq服务接收到了。但是这个消息不管是否成功,这个回调函数一定会执行return 回退模式

交换机把消息路由到队列的时候如果出现问题,则触发回调 returnCallback,只有失败才会执行这个函数。

上代码:

配置confirm类型:

NONE: 禁用发布确认模式,是默认的CORRELATED: 发布消息成功到交换器后会触发回调方法SIMPLE: 使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法

定义confirmCallback回调函数:

运行测试,观察结果。

消息的可靠性投递Return机制

配置return开启:

定义returnCallback回调函数:

(路由key写错,则不会路由到队列,用于测试) 运行后观察结果:

message: 消息数据replyCode: 错误codereplyText: 错误信息exchange: 交换机routingKey: 发送的路由地址

这就相当于发邮件的时候,如果你发的地址不存在有问题,对方接收不到,则你会收到一个退回邮件,机制是差不多的。

如此一来,我们就可以通过confirm和return两种机制来记录这个过程的日志,如果错误日志出现很频繁说明咱们的mq可能就不太稳定或者有什么其他因素需要排查了。

提问:RabbitMQ有没有事务机制? 也有,但是性能很差,所以平时不用。

消费端可靠性ACK机制

前面两节所用的confirm和return都是生产端机制,此外还有ack模式,是基于消费端的可靠性。

什么是ack呢? ack的意思是Acknowledge,确认的意思,表示消费者收到消息后的确认,可以理解为这是一个信息的回执,以前我们寄信的时候,收件人收到信后会有一个回执,也就是签收单,这个交给邮局的,邮局就是broker,也是一种签收确认的机制吧。mq同理。

ack机制有三种确认方式:

自动确认: Acknowledge=none(默认)

消费者收到消息,则会自动给broker一个回执,表示消息收到了,但是消费者里的监听方法内部是我们自己的业务,业务是否成功,他不管的,如果业务出现问题出现异常,那么也就相当于这条消息是失败的。这就相当于我寄了一个快递,对方收到后快递公司就自动确认了快递的签收确认,这是很常见的手段吧。一旦快递内部是否破损我们就不知道了,对吧。手动确认:Acknowledge=manual

消费端收到消息后,不会通知broker回执,需要等待我们自己的业务处理完毕OK了没问题,然后手动写一条代码去确认,当然如果出现错误异常,我们可以有兜底的处理方法,比如记录日志或者重新发新的消息都行。根据异常类型确定:Acknowledge=auto

消费端处理业务出现不同的异常,根据异常的类型来做出不同的响应处理,这种方式比较麻烦,需要提前预判很多异常类型。这里了解一下有这个类型即可。

代码中设置手动签收

创建新的方法,加入参数channel

运行后发送消息查看结果:

发现有一条消息是未被确认的,因为我们没有手动确认,如果是默认的确认方式,这个时候是不会有unacked的。

添加除零错误

手动确认和消息重发

@RabbitListener(queues = {RabbitMQSMSConfig.SMS_QUEUE})

public void watchQueue(Message message, Channel channel) throws Exception {

try {

String routingKey = message.getMessageProperties().getReceivedRoutingKey();

log.info("routingKey = " + routingKey);

int a = 100 / 0;

String msg = new String(message.getBody());

log.info("msg = " + msg);

/**

* deliveryTag: 消息投递标签

* multiple: 批量确认所有消费者获得的消息

*/

channel.basicAck(message.getMessageProperties().getDeliveryTag(),

true);

} catch (Exception e) {

e.printStackTrace();

/**

* requeue: true:重新回到队列, false:丢弃消息

*/

channel.basicNack(message.getMessageProperties().getDeliveryTag(),

true,

true);

// 不支持requeue

// channel.basicReject(message.getMessageProperties().getDeliveryTag(),

true);

}

}

RabbitMQ 消费者消息限流

我们在一开始介绍MQ的时候,就提到了削峰填谷,本质上就是限流,所以我们需要对限流做一个落地的实现。那么现在提出一个需求,假设用户发短信一下子太多了,那么消费者在进行消费处理业务的时候,也是需要进行限流的。

限流主要在消费者这一块,基于消费者做代码实现。并且基于手动ack的开启。

prefetch的设置,其实也是类似于负载均衡,为了更有效的利用服务器资源。也可以提高消费者的吞吐量。

生产者加一个for循环模拟多条消息的发送。

消费者手动ack之前打一个断点,

运行后观察:

此时我们第一个批次拉取了2条消息,但是没有确认,剩余8条还存在于队列中。 断点继续:

每次都是未确认2个,消息会以此从mq中拉取。

把prefetch注释掉再测试:

断点再一次进来,可以看到,消费者是一次性拉取了10条消息,并没有做到限流。

所以,通过prefetch可以达到消费端控制消费速度的效果。

RabbitMQ ttl特性控制短信队列超时

假设现在短信发送量很多,消息太多太多了,可能处理不过来,那么假设短信验证码本身就是5分钟内失效的,但是5分钟过后还没有发出去,mq消息还是在队列中没有被消费者消费,那么这条消息其实是作废的,没有用的。而且本身用户已经等了那么久了都没收到,所以我们能不能索性设定一个时间为60s,60秒还没有消费消息,那么就不消费了呗,让用户再次发送一个请求完事。

此时我们可以使用ttl这个特性。

TTL:time to live 存活时间,和redis的ttl是一个道理如果消息到到ttl的时候,还没有被消费,则自动清除RabbitMQ可以对消息或者整个队列设置ttl

代码实现,生产端配置超时ttl时间(单位:毫秒)

1. 对整个队列设置ttl:

参数可以从控制台复制:

把消费者的监听去除,不要进行消费:

运行并且观察: 报错,因为现在队列是没有ttl的,我们需要删除后重新创建。

(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'sms_queue' in vhost '/': received the value '10000' of type 'signedint' but current is none,

再次运行测试: 发现这个队列额外多了一个ttl的标签

此外当前有10条消息等待被消费

但是10秒过后,没有消费超时了,则自动丢弃

2. 对单独一个消息进行ttl设置

运行后观察: 目前有1条消息

10秒过后,消息过期了抛弃,如下:

注意:如果同时设置了两种消息过期方式和的话,时间短的会优先触发。

RabbitMQ 死信队列的实现

前面讲了ttl,那么在这里就必须来提一嘴死信队列

死信队列:DLX,dead letter exchange。当一个消息死了以后,就会被发送到死信交换机DLX

前面我们使用了ttl超时机制,如果我们队列绑定了DLX死信交换机,那么超时后,消息不会被抛弃,而是会进入到死信交换机,死信交换机绑定了其他队列(称之为死信队列),那么这个时候我们就可以处理那些被抛弃的消息了。

此外除了ttl超时进入死信队列,还有两种情况也会进入到死信队列:

队列消息长度已经到达限制,我一个队列设置最多1000条消息的量,后续进入的消息就抛弃了,就像我只能是10碗饭,你再盛饭给我吃就吃不下了,只能扔了。消费者手动驳回消息,nack或者reject,并且requeue为false,就像我吃饭的时候,一不小心里面进了一只苍蝇,我不会把这碗饭重新放入饭盆里吧,只能扔了。

代码实现

配置死信交换机和队列:(步骤和之前创建队列交换机以及绑定关系是一致的)

/**

* RabbitMQ 的配置类 死信队列

*/

@Configuration

public class RabbitMQSMSConfig_Dead {

// 定义交换机的名称

public static final String SMS_EXCHANGE_DEAD = "sms_exchange_dead";

// 定义队列的名称

public static final String SMS_QUEUE_DEAD = "sms_queue_dead";

// 统一定义路由key

public static final String ROUTING_KEY_SMS_DEAD = "dead.sms.display";

// 创建交换机

@Bean(SMS_EXCHANGE_DEAD)

public Exchange exchange() {

return ExchangeBuilder

.topicExchange(SMS_EXCHANGE_DEAD)

.durable(true)

.build();

}

// 创建队列

@Bean(SMS_QUEUE_DEAD)

public Queue queue() {

// return new Queue(SMS_QUEUE);

return QueueBuilder

.durable(SMS_QUEUE_DEAD)

.build();

}

// 创建绑定关系

@Bean

public Binding smsDeadBinding(@Qualifier(SMS_EXCHANGE_DEAD) Exchange exchange,

@Qualifier(SMS_QUEUE_DEAD) Queue queue) {

return BindingBuilder

.bind(queue)

.to(exchange)

.with("dead.sms.*")

.noargs();

}

}

为消费者绑定死信队列,配置死信队列的参数: 需要设置死信交换机和死信路由key,队列长度也可以限制:

死信队列监听的消费者:

运行结果: 超过长度的已经丢弃并且进入死信队里

10秒种后剩余6条消息没有被消费则进入死信队列

ttl+死信队列可以实现延迟队列,但是后期我们会使用延迟插件安装来实现延迟队列,更加方便。

手动nack以及reject课后去尝试一下。

本章小结

我们为什么会选择使用mq呢,因为在一个大型项目里,涉及发短信的地方太多了,如果每个地方都这么用异步任务来一下,会显得有些乱,而且每个项目里都有,使用mq以后,可以在一个单独的项目中,专门负责用于进行短信发送的消费,这样就更加解耦了,更加体现单一职责了。如此一来,mq可以更好的管理消费者,或者说他有更细的细粒度。甚至说消费者服务可以有很多,我们还能创建一个子嵌套聚合工程进行消费者管理。

异步任务 放入到api里,两点不好,一个是大量业务在api共用工程,不太好,第二个,本质上项目没有做到解耦,打包还是在一起的,虽然业务异步,但是代码并未解耦。而mq可以用一个专有的短信监听服务,专门异步发送所有短信场景,这才是比较 ok的。

作业

手动ack重回队列发生死循环的时候,如何设定参数进行手动上报预警。

需要使用到Nacos分布式配置,redis计数,短信或邮箱发送

相关推荐

辭典檢視
365皇冠体育网址

辭典檢視