轻松了解顺序消费-RabbitMQ-死信队列和延时队列-深化浅出 (轻松了解顺序怎么写)
1.引言
在当天的文章中,咱们来聊一聊RabbitMQ,这是小❤在上班中用的最早的信息两边件,关键用于少量数据的异步消费。
2.RabbitMQ
2.1外围组件
RabbitMQ是一个开源的信息两边件,它成功了初级信息队列协定(AMQP),同时提供了各种关键组件来支持信息的消费、传输和消费。
图片
2.2上班流程
RabbitMQ的上班方式是基于消费者、替换机和队列之间的单干。这是一个便捷的信息传递环节:
这种模型具备高度的灵敏性,可以轻松处置少量信息,同时确保信息的牢靠传递。
2.3特性
说到信息两边件,很多人首先想到的就是Kafka,但RabbitMQ也是许多金融或互联网公司构建牢靠、可伸缩和高性能系统的首选。
这是为什么呢?
关键得从RabbitMQ的特性说起,关键有二:一个是性能弱小,另一个是牢靠性!
RabbitMQ器重信息的牢靠性和灵敏性,适宜义务排队和信息传递。而Kafka是散布式流式平台,器重日志存储和数据散发。
顺序消费也是牢靠性的一种,RabbitMQ可以经常使用繁多队列或多个繁多队列来确保顺序消费。
除此之外,RabbitMQ还提供耐久性队列和信息,以确保信息在RabbitMQ主机宕机后不会失落。另外,消费者可以经常使用颁布确认机制来确认信息能否被接纳。
RabbitMQ相对kafka牢靠性更好,数据更不易失落,这关于一些数据敏感型的业务来说,显然更适宜用前者。
并且,RabbitMQ边疆生支持死信队列,可以更好地处置未成功的业务信息,以及成功延时队列等特性,接上去咱们逐一引见。
3.保障顺序消费
RabbitMQ提供了多个队列模型来保障信息的顺序消费。这关于某些运行程序十分关键,例如处置订单、支付和库存治理。
信息杂乱消费的场景
图片
如上图所示,有三条业务信息区分是删除、参与和修正操作,但是Consumer没有按顺序消费,最终存储的顺序是参与、修正和删除,就会出现数据杂乱。
针对信息有序性的疑问,RabbitMQ的处置方法是分三个阶段来保障。
信息发送时,须要业务来保障顺序性,就是保障消费者入队的顺序是有序的。
在散布式的场景下假设难以保障各个主机的入队顺序,则可以加散布式锁的方式来处置。或许在业务消费方的信息里带上信息递增ID,以及信息发生的期间戳。
在RabbitMQ的信息会保留在队列(Queue)中,在同一个队列里的信息是先进先出(FIFO)的,这个由RabbitMQ来帮咱们保障顺序。
而不同队列中的信息,RabbitMQ不可保障其顺序性,就像咱们在食堂打饭一样,站在不同的排队队列,咱们也不可保障会比其余队列的人先打上饭。
普通来说,出队后的顺序消费交给消费者去保障。咱们说的保障消费顺序,通常也是指消费者消费信息的顺序。
有多个消费者的状况下,通常是不可保障信息顺序的。
这就相当于咱们在排队打饭时,有多个打饭阿姨,但是每个阿姨打饭的速度不分歧,对应咱们消费者的消费才干也不同。
所以,为了保障信息的顺序性,咱们可以只经常使用一个消费者来接纳业务信息。
就好比只要一个阿姨在打饭,来得早就必定能早点打上饭。但很显著,这样效率不是很高,所以在经常使用时咱们须要掂量利害:看业务更须要顺序性,还是更须要消费效率。
优先级队列
在保障顺序消费时,另一个波折战略是可以经常使用优先级队列(PriorityQueue)。
在RabbitMQ3.5之后,当消费者数量较少,假设主机检测到消费者不能及时消费信息的状况下,优先级队列就会失效。
详细有两种优先级战略:
在申明队列时,咱们可以经过x-max-priority属性来设置队列的最大优先级,或经过Priority属性来设置信息的优先级,从1~10。
Golang实现代码如下:
//队列属性props:=make(map[string]interface{})//设置队列最大优先级props["x-max-priority"]=10ch.Publish("tizi365",//替换机"",//路由参数false,false,amqp.Publishing{Priority:5,//设置信息优先级DeliveryMode:2,//信息投递形式,1代表非耐久化,2代表耐久化,ContentType:"text/pln",Body:[]byte(body),})
当优先级队列消费失效时,会首先消费高优先级队列中的优先级高的信息,以此来成功顺序消费。
但须要留意的是,优先级队列触发的条件比拟厚道,在须要严厉保障业务信息顺序的状况下最好不要经常使用!
4.死信队列
RabbitMQ里,当信息在队列中变成死信(消费者不可反常处置的信息)之后,它会被从新投递到一个替换机上(即死信替换机),死信替换机上绑定的消费队列就是死信队列。
图片
死信的发生
死信发生须要满足如下条件:
死信的处置步骤
当死信发生时,假设咱们定义了一个死信替换机(其实就是一个普通的替换机,只是用于处置死信,所以叫死信替换机),而后在死信替换机上绑定了一个队列(称作死信队列)。
最后,假设死信队列有消费者监听时,死信信息的处置就会和反常业务信息一样,从替换机到队列,再由死信消费者(监听死信队列的消费者)反常消费。
5.延时队列
RabbitMQ自身不支持延时队列,但是咱们可以经过RabbitMQ的插件rabbitmq-delayed-message-exchange,或许经常使用死信队列+信息过时的方式来成功。
5.1运行场景
当咱们在电商里购物,或许在12306买票时,大略都会遇到这样一个场景:每次下订单后,到支付订单两边有一段商品锁定期间,超越时间后未支付订单就会封锁。
形态转换图如下:
图片
5.2插件成功
地址:
从github的release页面的assets,下载rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez文件,把文件放到rabbitmq插件目录(plugins目录)
揭示:版本号或许跟本教程不一样,假设你的rabbitmq就是最新版本,插件也选用最新版本就行。
rabbitmq-pluginsenablerabbitmq_delayed_message_exchange
经过x-delayed-type设置自定义替换机属性,支持发送提前信息:
props:=make(map[string]interface{})//关键参数,支持发送提前信息props["x-delayed-type"]="direct"//申明替换机err=ch.ExchangeDeclare("delay.queue",//替换机名字"fanout",//替换机类型true,//能否耐久化false,false,false,props,//设置属性)
经过信息头(x-delay),设置信息提后期间。
msgHeaders:=make(map[string]interface{})//经过信息头,设置信息提后期间,单位毫秒msgHeaders["x-delay"]=6000err=ch.Publish("delay.queue",//替换机名字"",//路由参数false,false,amqp.Publishing{Headers:msgHeaders,//设置信息头ContentType:"text/plain",Body:[]byte(body),})
5.3死信队列+信息过时打算
该打算的外围理想是,先创立死信替换机、队列和消费者,来监听死信信息。
而后创立定时过时的信息,比如订单支付的期间为30min,则将信息的TTL(最大存定期间)设置为30min,将信息放到一个没有消费者消费的队列中,当信息过时后就会成为死信。
死信信息被从新发送到死信替换机,而后咱们在死信队列中消费该信息,依据商品ID判别该商品能否被支付。
假设没有支付,就敞开订单,修正订单形态为待下单。假设曾经支付,就将商品形态修正为已成功,并丢掉这条死信信息。
5.小结
RabbitMQ是一特性能弱小的信息两边件,它在许多互联网运行中表演了关键角色,比如华为摄像机SDK的监控图像数据上报,大局部电商系统的异步消费等等。
厉害!一文了解消息中间件-RabbitMQ
RabbitMQ是2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,简称MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法,由Erlang(专门针对于大数据高并发的语言)语言开发,可复用的企业消息系统,是当前最主流的消息中间件之一,具有可靠性、灵活的路由、消息集群简单、队列高可用、多种协议的支持、管理界面、跟踪机制以及插件机制。
1.消息 就是数据,增删改查的数据。例如在员工管理系统中增删改查的数据
2.队列 指的是一端进数据一端出数据,例如C#中(Queue数据结构)
1.消息队列指:一端进消息,一端出消息
就是实现了消息队列概念的一个组件,以面向对象的思想去理解,消息队列就是类,而RabbitMQ就是实例,当然不仅仅只有RabbitMQ,例如ActiveMQ,RocketMQ,Kafka,包括Redis也可以实现消息队列。
1.在常见的单体架构中,主要流程是用户UI操作发起Http请求>服务器处理>然后由服务器直接和数据库交互,最后同步反馈用户结果
2.在微服务架构中,UI与微服务通信,主要是通过Http或者gRPC同步通信
问题分析
在上述2种情况下,我们发现在UI请求时都是同步操作 ,第2种架构虽然将整体服务按业务拆分成不同的微服务并且对应各自的数据库,但是在用户与微服务通信时,存在的问题依然没有解决,例如数据库的承载能力只能处理10w个请求,如果遇到高并发情况下,UI发起50w请求,那数据库是远远承载不了的,从而导致如下问题。
1.高并发请求导致系统性能下降响应慢,同时数据库承载风险加大
2.扩展性不强UI操作的交互对业务的依赖较大,导致用户体验下降
3.瞬时流量涌入巨大的话,服务器可能直接挂了
解决方案
RabbitMQ的优势
RabbitMQ的不足
为Connection的制造工厂。
是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。
是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
(交换机) 我们通常认为生产者将消息投递到Queue中,实际上实际的情况是,生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中(或者丢弃),而在RabbitMQ中的Exchange一共有4种策略,分别为:fanout(扇形)、direct(直连)、topic(主题)、headers(头部)
1.下载RabbitMQ
2.运行环境erlang
3.安装完成之后,加载RabbitMQ管理插件
4.安装成功访问RabbitMQ管理后台
1.分别创建考勤服务,请假服务,计算薪酬服务,邮件服务,短信服务消费者角色
2.创建员工管理网站用于模拟前端调用,主要充当生产者角色
3.在员工管理网站和每一个模拟微服务中通过nuget引入
4.在员工管理网站中创建模拟添加考勤的控制器并加入生产者代码
5.在考勤微服务中创建接口,并在接口中加入消费者代码
fanout类型的Exchange路由规则非常简单,工作方式类似于多播一对多,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
业务实例
当我们有员工需要请假,在员工管理系统提交请假,但是由于公司规定普通员工请假,需要发送短信到他的主管领导,针对此业务场景我们需要调用请假服务的同时去发送短信,这时需要两个消费者(请假服务,短信服务)来消费同一条消息,其实本质就是往RabbitMQ写入一个能被多个消费者接收的消息,所以可以使用 扇形交换机,一个生产者,多个消费者.
生产者模拟使用调用控制器来实现
消费者实现IHostedService 接口创建一个监听主机
直接交换器,工作方式类似于单播一对一,Exchange会将消息发送完全匹配ROUTING_KEY的Queue,缺陷是无法实现多生产者对一个消费者
当我们员工管理系统需要计算薪资并将结果以发送短信的方式告诉员工,这个时候我们就不太适合用“扇形交换机”了,因为换做是你,你也不想你的工资全公司都知道吧?这个时候就需要定制了一对一的场景了,那就在生产消息时使用直连交换机根据routingKey发送指定的消费者.
生产者模拟使用调用控制器来实现
消费者实现IHostedService 接口创建一个监听主机
Exchange绑定队列需要制定Key; Key 可以有自己的规则;Key可以有占位符;
或者# ,
匹配一个单词、#匹配多个单词,在Direct基础上加上模糊匹配;多生产者一个消费者,可以多对对,也可以多对1, 真实项目当中,使用主题交换机。可以满足所有场景
1.生产者定义Exchange,然后不同的routingKey绑定
3.消费者routingKey的模糊匹配,生产者发送消息时routingKey定义以sms.开头, * 号只能匹配的routingKey为一级,例如(sms.A)或(sms.B)的发送的消息,# 能够匹配的routingKey为一级及多级以上 ,例如 (sms.A)或者()
在月底的时候我们需要把员工存在异常考勤信息,薪资结算信息,请假信息分别以邮件的形式发送给我们的员工查阅,我们知道这是一个典型的多个生产者,一个消费者场景,异常考勤信息,薪资结算信息,请假信息分别需要生产消息发送到RabbitMQ,然后供我们员工消费
分别模拟3个生产者:异常考勤信息,薪资结算信息,请假信息
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。如果设置为any,意思就是只要匹配到了headers表中的任何一对键值即可,all则代表需要全部匹配。
1.不需要依赖Key
2.更多的时候,像这种Key Value 的键值,可能会存储在数据库中,那么我们就可以定义一个动态规则来拼装这个Key value ,从而达到消息灵活转发到不同的队列中去
我们根据上面的业务和代码简单实现了由生产者到消费者的一个业务流程,我们可以总结出知道,整个消息的收发过程包含有三个角色,生产者(员工管理网站)、RabbitMQ(Broker)、消费者(微服务),在理想状态下,按照这样实现,整个流程以及系统的稳定性,可能不会发生太大的问题,但是真正在实际应用中我们要去思考可能存在的问题,主要从三个大的方面去分析,然后发散。
1.生产端
2.存储端
3.消费端
我们在给RabbitMQ发送消息时,如何去保证消息一定到达呢,我们可以使用RabbitMQ提供了2种生产端的消息确认机制
我们生产端给RabbitMQ发送消息成功后,如果RabbitMQ宕机了,会导致RabbitMQ中消息丢失,如何解决消息丢失问题,针对RabbitMQ消息丢失,我们可以在生产者中使用
1.持久化消息
2.集群
当生产者写入消息到RabbitMQ后,消费服务接收消息期间,服务器宕机,导致消息丢失了,这个时候我们就应该使用RabbitMQ的消费端消息确认机制
1.自动确认
2.手动确认
消费者收到消息。消费者发送确认消息给rabbitmq期间。执行业务逻辑失败了,但是消息已经确认被消费了,我们应该在我们的消费者接收消息回调执行业务逻辑后面,执行使用手动确认消息机制,保证消息不被丢失
原文链接:
RabbitMQ死信队列DLX应用
进入死信队列的场景:
运行 main 函数,推送消息给 TestDLXQueue 队列。可以先看到消息先在 TestDLXQueue 队列中。
等到30秒后没有被消费,则会把消息推送到 DLXQueue 死信队列中。
主要过程: 生产者 —> 原交换机 —> 原队列 TestDLXQueue (超过 TTL 之后) —> 死信交换机 DLXExchange—> 死信队列 DLXQueue—> 最终消费者。
创建死信队列 DLX_QUEUE 并和指定交换机 DLX_EXCHANGE 进行绑定(其实也是普通的队列、普通的交换机)。
创建另外一个正常的消息队列 TEST_DLX_QUEUE ,设置队列的 TTL 过期时间,同时通过 x-dead-letter-exchange 属性指定死信队列对应的交换机。
等到30秒后没有被消费,则会把消息推送到 DLX_QUEUE 死信队列中。
所以在 RabbitMQ 3.5.8版本之后,可以利用官方的 rabbitmq-delayed-message-exchange 插件来实现消息的延迟发送,可以避免上面所说的问题。RabbitMQ实现消息延迟推送
免责声明:本文转载或采集自网络,版权归原作者所有。本网站刊发此文旨在传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及版权、内容等问题,请联系本网,我们将在第一时间删除。同时,本网站不对所刊发内容的准确性、真实性、完整性、及时性、原创性等进行保证,请读者仅作参考,并请自行核实相关内容。对于因使用或依赖本文内容所产生的任何直接或间接损失,本网站不承担任何责任。