生产环境:

RabbitMQ是一个消息代理,一个消息系统的媒介,提供了一个通用的消息发送及接收平台,并且能够保障消息传输过程中的安全。使用erlang语言开发,开源,在易用性、扩展性、高可用性等方面表现不俗

RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP的标准实现。

CentOS 6.3 x86_64

技术亮点

  • 可靠性——RabbitMQ提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制。
  • 灵活的路由——消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用。
  • 集群——在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用。
  • 联合——对于服务器来说,它比集群需要更多的松散和非可靠链接。为此RabbitMQ提供了联合模型。
  • 高可用的队列——在同一个集群里,队列可以被镜像到多个机器中,以确保当其中某些硬件出现故障后,你的消息仍然安全。
  • 多协议——RabbitMQ
    支持多种消息协议的消息传递。

  • 广泛的客户端——只要是你能想到的编程语言几乎都有与其相适配的RabbitMQ客户端。

  • 可视化管理工具——RabbitMQ附带了一个易于使用的可视化管理工具,它可以帮助你监控消息代理的每一个环节。
  • 追踪——如果你的消息系统有异常行为,RabbitMQ还提供了追踪的支持,让你能够发现问题所在。
  • 插件系统——RabbitMQ附带了各种各样的插件来对自己进行扩展。你甚至也可以写自己的插件来使用。

图片 1

服务器主机名与IP列表:

专业术语

Broker:简单来说就是消息队列服务器实体。

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

就根据消息的key和已经设置的binding,一个消息系统的媒介。vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

producer:消息生产者,就是投递消息的程序。

consumer:消息消费者,就是接受消息的程序。

connection:连接,就是一个位于客户端和Broker之间的TCP连接

channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

Message:由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message
Queue接受、优先级是多少等。而Body是真正需要传输的消息内容。

Broker:简单来说就是消息队列服务器实体。Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。Queue:消息队列载体,每个消息都会被投入到一个
或多个队列。Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。Routing
Key:路由关键字,exchange根据这个关键字进行消息投递。vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。producer:消息生产者,就是投递消息的程序。consumer:消息消费者,就是接受消息的程序。channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

客户端连接到消息队列服务器,打开一个channel。客户端声明一个exchange,并设置相关属性。客户端声明一个queue,并设置相关属性。客户端使用routing
key,在exchange和queue之间建立好绑定关系。客户端投递消息到exchange。

mq136      172.28.2.136
mq137      172.28.2.137
mq164      172.28.2.164
mq165      172.28.2.165

消息队列的使用过程大概如下:

(1)客户端连接到消息队列服务器,打开一个channel。

(2)客户端声明一个exchange,并设置相关属性。

(3)客户端声明一个queue,并设置相关属性。

(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。

(5)客户端投递消息到exchange。

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing
key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:

(1)exchange持久化,在声明时指定durable => 1

(2)queue持久化,在声明时指定durable => 1

(3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing
key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

在各节点服务器上作好hosts解析

RabbitMQ下载地址

进入RabbitMQ官网
: http://www.rabbitmq.com/ 
点击右侧最新版本列表下载最新版本并安装。

输入以下命令可以修改RabbitMQ的存储目录

D:Program FilesRabbitMQ Serverrabbitmq_server-3.6.6sbin

rabbitmq-service.bat remove

set RABBITMQ_BASE=d:/RabbitMQ

rabbitmq-service.bat install

安装RabbitMQWeb管理插件

rabbitmq-plugins enable rabbitmq_management

浏览器中打开http://127.0.0.1:15672测试是否安装成功,用户名密码默认都是guest

进入后可以看到RabbitMQ一些配置信息

图片 2

RabbitMQ配置文件介绍

Key

Documentation

tcp_listeners

用于监听 AMQP连接的端口列表(无SSL). 可以包含整数 (即"监听所有接口")或者元组如 {"127.0.0.1", 5672} 用于监听一个或多个接口.

Default: [5672]

num_tcp_acceptors

接受TCP侦听器连接的Erlang进程数。

Default: 10

handshake_timeout

AMQP 0-8/0-9/0-9-1 handshake (在 socket 连接和SSL 握手之后)的最大时间, 毫秒为单位.

Default: 10000

ssl_listeners

如上所述,用于SSL连接。

Default: []

num_ssl_acceptors

接受SSL侦听器连接的Erlang进程数。

Default: 1

ssl_options

SSL配置.参考SSL documentation.

Default: []

ssl_handshake_timeout

SSL handshake超时时间,毫秒为单位.

Default: 5000

vm_memory_high_watermark

流程控制触发的内存阀值.相看memory-based flow control 文档.

Default: 0.4

vm_memory_high_watermark_paging_ratio

高水位限制的分数,当达到阀值时,队列中消息消息会转移到磁盘上以释放内存. 参考memory-based flow control 文档.

Default: 0.5

disk_free_limit

RabbitMQ存储数据分区的可用磁盘空间限制.当可用空间值低于阀值时,流程控制将被触发. 此值可根据RAM的总大小来相对设置 (如.{mem_relative, 1.0}). 此值也可以设为整数(单位为bytes)或者使用数字单位(如."50MB"). 默认情况下,可用磁盘空间必须超过50MB. 参考 Disk Alarms 文档.

Default: 50000000

log_levels

控制日志的粒度.其值是日志事件类别(category)和日志级别(level)成对的列表.

level 可以是 ‘none’ (不记录日志事件), ‘error’ (只记录错误), ‘warning’ (只记录错误和警告), ‘info’ (记录错误,警告和信息), or ‘debug’ (记录错误,警告,信息以及调试信息).

目前定义了4种日志类别. 它们是:

  • channel -针对所有与AMQP channels相关的事件
  • connection – 针对所有与网络连接相关的事件
  • federation – 针对所有与federation相关的事件
  • mirroring -针对所有与 mirrored queues相关的事件

Default: [{connection, info}]

frame_max

与客户端协商的允许最大frame大小. 设置为0表示无限制,但在某些QPid客户端会引发bug. 设置较大的值可以提高吞吐量;设置一个较小的值可能会提高延迟.

Default: 131072

channel_max

与客户端协商的允许最大chanel大小. 设置为0表示无限制.该数值越大,则broker使用的内存就越高.

Default: 0

channel_operation_timeout

Channel 操作超时时间(毫秒为单位) (内部使用,因为消息协议的区别和限制,不暴露给客户端).

Default: 5000

heartbeat

表示心跳延迟(单位为秒) ,服务器将在connection.tune frame中发送.如果设置为 0, 心跳将被禁用. 客户端可以不用遵循服务器的建议, 查看 AMQP reference 来了解详情. 禁用心跳可以在有大量连接的场景中提高性能,但可能会造成关闭了非活动连接的网络设备上的连接落下.

Default: 60 (3.5.5之前的版本是580)

default_vhost

当RabbitMQ从头开始创建数据库时创建的虚拟主机. amq.rabbitmq.log交换器会存在于这个虚拟主机中.

Default: <<"/">>

default_user

RabbitMQ从头开始创建数据库时,创建的用户名.

Default: <<"guest">>

default_pass

默认用户的密码.

Default: <<"guest">>

default_user_tags

默认用户的Tags.

Default: [administrator]

default_permissions

创建用户时分配给它的默认Permissions .

Default: [<<".*">>, <<".*">>, <<".*">>]

loopback_users

只能通过环回接口(即localhost)连接broker的用户列表

如果你希望默认的guest用户能远程连接,你必须将其修改为[].

Default: [<<"guest">>]

cluster_nodes

当节点第一次启动的时候,设置此选项会导致集群动作自动发生. 元组的第一个元素是其它节点想与其建立集群的节点. 第二个元素是节点的类型,要么是disc,要么是ram

Default: {[], disc}

server_properties

连接时向客户端声明的键值对列表

Default: []

collect_statistics

统计收集模式。主要与管理插件相关。选项:

  • none (不发出统计事件)
  • coarse (发出每个队列 /每个通道 /每个连接的统计事件)
  • fine (也发出每个消息统计事件)

你自已可不用修改此选项.

Default: none

collect_statistics_interval

统计收集时间间隔(毫秒为单位). 主要针对于 management plugin.

Default: 5000

auth_mechanisms

提供给客户端的SASL authentication mechanisms.

Default: [‘PLAIN’, ‘AMQPLAIN’]

auth_backends

用于 authentication / authorisation backends 的列表. 此列表可包含模块的名称(在模块相同的情况下,将同时用于认证来授权)或像{ModN, ModZ}这样的元组,在这里ModN将用于认证,ModZ将用于授权.

在2元组的情况中, ModZ可由列表代替,列表中的所有元素必须通过每个授权的确认,如{ModN, [ModZ1, ModZ2]}. 这就允许授权插件进行组合提供额外的安全约束.

除rabbit_auth_backend_internal外,其它数据库可以通常 plugins来使用.

Default: [rabbit_auth_backend_internal]

reverse_dns_lookups

设置为true,可让客户端在连接时让RabbitMQ 执行一个反向DNS查找, 然后通过 rabbitmqctl 和 管理插件来展现信息.

Default: false

delegate_count

内部集群通信中,委派进程的数目. 在一个有非常多核的机器(集群的一部分)上,你可以增加此值.

Default: 16

trace_vhosts

 tracer内部使用. 你不应该修改.

Default: []

tcp_listen_options

默认socket选项. 你可能不想修改这个选项.

Default:

[{backlog, 128}, {nodelay, true}, {exit_on_close, false}]

hipe_compile

将此选项设置为true,将会使用HiPE预编译部分RabbitMQ,Erlang的即时编译器.
这可以增加服务器吞吐量,但会增加服务器的启动时间.
你可以看到花费几分钟延迟启动的成本,就可以带来20-50% 更好性能.这些数字与高度依赖于工作负载和硬件.

HiPE 支持可能没有编译进你的Erlang安装中.如果没有的话,启用这个选项,并启动RabbitMQ时,会看到警告消息. 例如, Debian / Ubuntu 用户需要安装erlang-base-hipe 包.

HiPE并非在所有平台上都可用, 尤其是Windows.

在 Erlang/OTP 17.5版本之前,HiPE有明显的问题 . 对于HiPE,使用最新的OTP版本是高度推荐的.

Default: false

cluster_partition_handling

如何处理网络分区.可用模式有:

  • ignore
  • pause_minority
  • {pause_if_all_down, [nodes], ignore | autoheal}where [nodes] is a list of node names
    (ex: [‘rabbit@node1’, ‘rabbit@node2’])
  • autoheal

参考documentation on partitions 来了解更多信息

Default: ignore

cluster_keepalive_interval

节点向其它节点发送存活消息和频率(毫秒). 注意,这与 net_ticktime是不同的; 丢失存活消息不会引起节点掉线

Default: 10000

queue_index_embed_msgs_below

消息大小在此之下的会直接内嵌在队列索引中. 在修改此值时,建议你先阅读  persister tuning 文档.

Default: 4096

msg_store_index_module

队列索引的实现模块. 在修改此值时,建议你先阅读  persister tuning 文档.

Default: rabbit_msg_store_ets_index

backing_queue_module

队列内容的实现模块. 你可能不想修改此值.

Default: rabbit_variable_queue

msg_store_file_size_limit

Tunable value for the persister. 你几乎肯定不应该改变此值。

Default: 16777216

mnesia_table_loading_timeout

在集群中等待使用Mnesia表可用的超时时间。

Default: 30000

queue_index_max_ journal_entries

Tunable value for the persister. 你几乎肯定不应该改变此值。

Default: 65536

queue_master_locator

Queue master 位置策略. 可用策略有:

  • <<"min-masters">>
  • <<"client-local">>
  • <<"random">>

查看documentation on queue master location 来了解更多信息.

Default: <<"client-local">>

Demo:

[{rabbit,[{tcp_listen_options,[{backlog,128},{nodelay,true},{linger,{true,0}}]}]}]

更多配置相关信息请参考:

http://www.blogjava.net/qbna350816/archive/2016/08/02/431415.html

http://www.rabbitmq.com/configure.html

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:exchange持久化,在声明时指定durable
=> 1queue持久化,在声明时指定durable =>
1消息持久化,在投递时指定delivery_mode => 2

  1. cat >>/etc/hosts/<<EOF 
  2. mq136      172.28.2.136 
  3. mq137      172.28.2.137 
  4. mq164      172.28.2.164 
  5. mq165      172.28.2.165 
  6. EOF 

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

一、简介
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。Erlang的分布式通讯安全策略,可以归结为
All or
None。。RabbitMQ是AMQP(高级消息队列协议)的标准实现。RabbitMQ的结构图如下:

以上内容来自百度百科rabbitmq

图片 3

在安装RabbitMQ前,首先需要安装 Erlang环境

几个概念说明:

官网::
使用yum安装

Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

#wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rp#yum install rabbitmq-server-3.6.5-1.noarch.rpm

消息队列的使用过程大概如下:

具体安装细节和配置大家可以看这篇文章:
RabbitMQ安装教程(Windows/Linux都有)

(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

创建两个Maven工程,一个作为发送端,一个作为接受端。

exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing
key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

图片 4

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
(1)exchange持久化,在声明时指定durable => 1
(2)queue持久化,在声明时指定durable => 1
(3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

Sender端:

server.port=8083spring.application.name=spirng-boot-rabbitmq-senderspring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

Receiver端:

server.port=8082spring.application.name=spirng-boot-rabbitmq-receiverspring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest

rabbitMq五种消息模式:1.直接模式Direct2.工作队列模式Work
Queue3.发布/订阅模式Publish/Subscribe4.路由模式Routing5.通配符模式Topics

今天给大家实现最简单的直接模式注入名字为”redirect”的对列,发送端和接受端要保证队列的名字完全一致才能确保通信的成功。

@Configurationpublic class SenderConfig { @Bean public Queue queue() { return new Queue("redirect"); }}

发送消息的方法,为了便于观察,这里使用了定时任务,每5秒钟发送一条数据,有关定时任务的使用方法,大家可以看我之前写的博客,特别简单。

@Componentpublic class Sender { @Autowired private AmqpTemplate template; @Scheduled(cron="0/5 * * * * ? ") //每5秒执行一次 public void send() { DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); template.convertAndSend("redirect",sdf.format(new Date+"消息来啦!"); }}

启动方法,开启定时任务的注解。

@SpringBootApplication@EnableSchedulingpublic class SpringBootRabbitMqSenderApplication { public static void main(String[] args) { SpringApplication.run(SpringBootRabbitMqSenderApplication.class, args); }}

Receiver端只需配置个Rabbit监听器,监听指定名字的队列,收到消息后,直接控制台打印出来。

@Componentpublic class Receive { @RabbitListener(queues="redirect") //监听器监听指定的redirect public void processC(Object obj) { System.out.println("receiver:"+obj.toString; }}

然后分别启动Sender端和Receiver端,发现Receiver端控制台不断输出内容。

图片 5消息内容

上面测试的是一个生产者对应一个消费者,下面我们看一下一个生产者对应多个消费者,,接收端新增一个方法,发送端不变,队列名字不变。

@Componentpublic class Receive2 { //消费者1 @RabbitListener(queues="direct") public void processC(String obj) { System.out.println("Receiver1:"+obj); } //消费者2 @RabbitListener(queues="direct") public void processC2(String obj) { System.out.println("Receiver2:"+obj); }}

一个生产者,多个消费者,消息会平均分配到每个消费者中!

图片 6

再来看一下,多个生产者对应多个消费者,会产生怎样的情况?发送端复制一分发送的方法。

@Componentpublic class Sender { @Autowired private AmqpTemplate template; @Scheduled(cron="0/5 * * * * ? ") //每5秒执行一次 public void send() { DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //第一个参数 交换机名称 第二个参数是 队列的key 第三个参数是发送的内容 template.convertAndSend("direct",sdf.format(new Date; } @Scheduled(cron="0/5 * * * * ? ") //每5秒执行一次 public void send2() { DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //第一个参数 交换机名称 第二个参数是 队列的key 第三个参数是发送的内容 template.convertAndSend("direct",sdf.format(new Date; } }

多个生产者,多个消费者,消息同样会平均分配到每个消费者中!

图片 7

需要注意的是实体类要实现序列化。

public class User implements Serializable{ private static final long serialVersionUID = 1L; private int id; private String name; private String gender; public User(int id, String name, String gender) { super(); this.id = id; this.name = name; this.gender = gender; } public int getId() { return id; } public void setId { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getGender() { return gender; } public void setGender(String gender) { this.gender = gender; } @Override public String toString() { return "User [, " + (name != null ? "name=" + name + ", " : "") + (gender != null ? "gender=" + gender : "") + "]"; }}

发送内容改为对象

@Componentpublic class Sender { @Autowired private AmqpTemplate template; //@Scheduled(cron="0/5 * * * * ? ") //每5秒执行一次 public void send(User user) { template.convertAndSend("direct",user); } }

接收内容变为对象

@Componentpublic class Receive2 { //消费者 @RabbitListener(queues="direct") public void processC(User user) { System.out.println("Receive:"+user); } }

开始测试

@RunWith(SpringRunner.class)@SpringBootTestpublic class SpringBootRabbitMqSenderApplicationTests { @Autowired private Sender helloSender; //Direct模式 @Test public void send() { helloSender.send(new User(1, "张三", "男")); }}

测试结果如下:

图片 8

具体的数据传输信息可以浏览器访问

图片 9

到这里,rabbitMqDirect模式已经整合完成,日后我会继续将其他几种消息模式整合完毕,敬请期待!

下面我们再了解下消息队列RabbitMQ集群,由于RabbitMQ是用erlang开发的,RabbitMQ
完全依赖 Erlang 的
Cluster,而Erlang集群非常方便,因此配置RabbitMQ集群变得非常简单。

RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘。不过,如前文所述,如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。

良好的设计架构可以如下:在一个集群里,有3台以上机器,其中1台使用磁盘模式,其它使用内存模式。其它几台为内存模式的节点,无疑速度更快,因此客户端(consumer、producer)连接访问它们。而磁盘模式的节点,由于磁盘IO相对较慢,因此仅作数据备份使用。

图片 10

相关文章