初识rabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
RabbitMQ 特点
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
Rabbitmq使用流程
rabbitmq作为消息队列,一条消息从发布到订阅消费的完整流程为:
消息 --> 交换机exchange ---> 队列queue ---> 消费者
rabbitmq的核心就在交换机和队列
发布者(推送消息的一端):
- 创建一个tcp长连接connection,连接rabbitmq的监听端口5672;
- 在TCP长连接下创建一个信道channel,信道可以理解为connection的一个分支;
- 通过信道向rabbitmq声明一个交换机exchange,设置交换机的类型,名称,是否持久化等属性;
- 通过信道向rabbitmq声明一个队列queue,设置队列的名称,是否持久化等参数;
- 通过信道向rabbitmq声明一个绑定binding,设置绑定的交换机名称,队列名称,绑定的路由键;
- 通过信道向rabbitmq推送一条消息,指定交换机和路由;
消费者(接收消息的一端):
- 从第一步到第四步和发布者做的事情是一模一样的;
- 通过信道向rabbitmq声明一个订阅,订阅特定的queue,并且设置回调函数及是否确认等;
- 通过信道监听rabbitmq推送过来的消息;
- 消费的时候可以设置是否自动确认,如果是手动的话需要手动ack来确认消费完成才最终结束消费流程
深入理解
创建连接connection
- rabbitmq实现的是AMQP通信协议,其所允许的最大AMQP连接数,本质上是TCP连接数;
- 采用TCP长连接,自带SSL认证机制,类似https,保证数据的传输安全;
- 其TCP连接的上限可以通过调整操作系统的限制来变更,详情linux修改TCP最大连接数
创建信道channel
- 信道channel可以理解成是connection下的分支,也就是说多个channel共享一个connection,为什么要这么做呢?
- 创建TCP连接是一个非常耗时的操作,如果一个应用需要多个connection的话,每次的创建和关闭会性能降低,而创建多个channel共享一个connection提高性能,节约系统资源;
- 多个channel是相互独立隔离的,这导致了一个问题就是,由于主机只能识别不同的TCP连接,但不能识别不同的信道发送过来的消息,因此rabbitmq为每个创建的信道分配了一个信道ID用来识别不同信道发送的消息;
- 每个信道都是为独立的进程或线程准备的,因此多线程或多进程共享信道是不被允许的;
- 在一个TCP下可创建的chnnel的数量理论上是没有上限的,只取决于系统资源,但可以通过配置channel_max参数设置上限;
声明交换机exchange
作用:交换机使用来管理分发消息的,一边接受发布者提交的消息,根据消息提供的参数选择相应的处理办法,交换机是不会存储消息的,只有转发或丢弃功能;
如果消息携带的路由键没有对应的路由队列,交换机会将消息丢弃;
重要属性:名称,是否持久化;
类型:
直连交换机direct
- 顾名思义直连交换机根据消息携带的路由键将该消息投递到绑定的队列;
- 如果有多个队列使用相同的路由键和直连交换机绑定,那么直连交换机会将消息同时转发到多个队列;
- rabbitmq本身存在默认交换机,默认交换机的本质就是名称为空的直连交换机,当任何一个新的队列被声明的时候,rabbitmq会使用这个队列的名字作为路由键自动绑定默认交换机;
- 通过rabbitmqctl list_exchanges命令查看所有的交换机,可以看到有一个名称为空的direct;
扇形交换机funout
- 作用:扇形交换机会将消息推送到绑定的所有的队列中,不会理会路由键是什么;
- 在声明funout和队列绑定的时候,也有一个路由键参数,但是写什么都无所谓,funout会将其忽略的;
主题交换机topic
- 作用:主题交换机使用比较复杂的路由键匹配规则实现一路或多路路由;
- 路由键规则:
- 使用.分割的词语列表,也可以是单个词语,长度不能超过255个字节;
- *号用来匹配一个单词,这个单词不能为空,#号用来匹配0个或多个单词;
头交换机headers
- 通过判断消息中携带的额外的属性来分发消息给对应的队列,性质和直连交换机很相似;
- 其区别在于使用消息的属性替换路由键作为路由的规则;
注意
每个交换机的名字是唯一的,如果重复声明交换机并且声明的参数完全一样,那么mq会忽略该声明;
如果声明交换机已经存在,但修改了它的属性,比如类型或持久化等,那么会报错;所以修改的方法是先删除原来的交换机再创建一个新的交换机;
队列queue
- queue用来缓存消息,并向消费者推送消息;
- 声明队列的名称最多255个字节,可以是任意字符串;
- 如果声明时名字为空,mq会随机生成一个名字;
- 声明队列不可以用amq.开头,否则报错;
- 如果多个消费者订阅同一个队列的消息,那么队列会用轮询的方式推送消息,这种方式可以实现负载均衡;
注意:
- 每个队列的名字是唯一的,如果重复声明交换机并且声明的参数完全一样,那么mq会忽略该声明;
- 如果声明的该队列已经存在,但修改了它的属性,比如类型或持久化等,那么会报错;所以修改的方法是先删除原来的队列再创建一个新的队列;
消息订阅
- 消费者可以向队列订阅消息,每个消费者都会被分配一个标志符;
- 订阅时可以设置消费回执,告诉mq什么时候可以将消息删除了,默认是自动删除的;
- 消费者也可以向mq发送 拒绝消息 ,这时消息会被放回队列等待投递到其他的消费者;
- 消费者订阅的时候可以设置预取计数,当存在多个消费者共同订阅一个队列的时候,由于是轮询机制,但每个消费者的消费能力可能是不一样的,这可能造成有的消费者闲的要死,有的忙的要死,设置预取数量,在没有收到回执确认前,该队列推送的消息是有限的,可以提高整体的吞吐量;
消息,交换机,队列的持久化
- 消息持久化是在消息投递前定义的,设置了持久化后,消息会被保存在磁盘,当被消费后会从磁盘中删除,但是也不能保证100%持久化成功,因为消息是先放到内存中的,如果此时主机崩溃,还是会有一部分来不及写入磁盘;
- 交换机持久化是在声明该交换机的时候设置的,当主机崩溃或重启后,mq一重新上线会自动重新声明一个和原来完全一样的交换机;
- 队列持久化是在队列被声明的时候设置的,mq重新上线会自动重新声明一个和原来完全一样的queue,但是队列里的消息会全部丢失;