MQ;Message Queue;消息队列;是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦;异步消息;流量削峰等问题;实现高性能;高可用;可伸缩和最终一致性架构。
RocketMQ是阿里巴巴旗下一款开源的MQ框架;2016年底捐赠给Apache开源基金会成为孵化项目;2017年正式成为了Apache顶级项目;作为一款纯java、分布式、队列模型的开源消息中间件;支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
常见的MQ主要有;ActiveMQ、RabbitMQ、Kafka、RocketMQ
RocketMQ主要有四大核心组成部分;NameServer、Broker、Producer以及Consumer四部分。
与其余组件说明如下;
Producer; 消息生产者;负责产生消息;一般由业务系统负责产生消息Producer Group;消息生产者组;简单来说就是多个发送同一类消息的生产者称之为一个生产者Consumer;消息消费者;负责消费消息;一般是后台系统负责异步消费Consumer Group;消费者组;和生产者类似;消费同一类消息的多个 Consumer 实例组成一个消费者组Topic;主题;用于将消息按主题做划分;Producer将消息发往指定的Topic;Consumer订阅该Topic就可以收到这条消息Message;消息;每个message必须指定一个topic;Message 还有一个可选的 Tag 设置;以便消费端可以基于 Tag 进行过滤消息Tag;标签;子主题;二级分类;对topic的进一步细化,用于区分同一个主题下的不同业务的消息Broker;Broker是RocketMQ的核心模块;负责接收并存储消息;同时提供Push/Pull接口来将消息发送给Consumer。Broker同时提供消息查询的功能;可以通过MessageID和MessageKey来查询消息。Borker会将自己的Topic配置信息实时同步到NameServer push;推模式; 消息到达消息服务器之后;主动推送给消费者pull;拉模式; 是消费端发起请求;主动向消息服务器(Broker)拉取消息Queue;Topic和Queue是1对多的关系;一个Topic下可以包含多个Queue;主要用于负载均衡;Queue数量设置建议不要比消费者数少。发送消息时;用户只指定Topic;Producer会根据Topic的路由信息选择具体发到哪个Queue上。Consumer订阅消息时;会根据负载均衡策略决定订阅哪些Queue的消息Offset;RocketMQ在存储消息时会为每个Topic下的每个Queue生成一个消息的索引文件;每个Queue都对应一个Offset记录当前Queue中消息条数NameServer;NameServer可以看作是RocketMQ的注册中心;它管理两部分数据;集群的Topic-Queue的路由配置;Broker的实时配置信息。其它模块通过Nameserv提供的接口获取最新的Topic配置和路由信息;各 NameServer 之间不会互相通信; 各 NameServer 都有完整的路由信息;即无状态。
一条消息被多个Consumer消费;即使这些Consumer属于同一个Consumer Group;消息也会被Consumer Group中的每一个Consumer都消费一次。
一个Consumer Group中的所有Consumer平均分摊消费消息(一个消息只会被一个消费者消费)
RocketMq消息类型分为三种;普通消息、顺序消息、事务消息
普通消息;有三种发送方式 单向发送;单向发送是指发送方只负责发送消息;不等待服务器回应;且没有回调函数触发。即只发送请求而不管响应。同步发送;同步发送是指消息发送方发出数据后;会在收到接收方发回响应之后才会发送下一个数据包的通讯方式。异步发送;异步发送是指发送方发出数据后;不等接收方发回响应;接着发送下一个数据包的通讯方式。发送方通过回调接口接收服务器响应;并对响应结果进行处理。顺序消息
一般情况下;每个主题;topic;都会有多个消息队列;message queue;;假设投递了同一个主题的十条消息;那么这十条消息会分散在不同的队列中。对于消费者而言;每个消息队列是等价的;就不能确保消息总体的顺序。而顺序消息的方案就是把这十条消息都投递到同一个消息队列中。顺序消息与普通消息同样有三种发送方式。
事务消息
RocketMQ提供了事务消息;通过事务消息就能达到分布式事务的最终一致;从而实现了可靠消息服务。
事务消息发送步骤;
发送方将半事务消息发送至RocketMQ服务端。RocketMQ服务端将消息持久化之后;向发送方返回Ack确认消息已经发送成功。由于消息为半事务消息;在未收到生产者对该消息的二次确认前;此消息被标记成“暂不能投递”状态。发送方开始执行本地事务逻辑。发送方根据本地事务执行结果向服务端提交二次确认;Commit 或是 Rollback;;服务端收到Commit 状态则将半事务消息标记为可投递;订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息;订阅方将不会接受该消息。
事务消息回查机制;
在断网或者是应用重启的特殊情况下;上述步骤4提交的二次确认最终未到达服务端;经过固定时间后服务端将对该消息发起消息回查。发送方收到消息回查后;需要检查对应消息的本地事务执行的最终结果。发送方根据检查得到的本地事务的最终状态再次提交二次确认;服务端仍按照步骤4对半事务消息进行操作。
单个Master
这种方式风险较大;一旦Broker重启或者宕机时;会导致整个服务不可用;不建议线上环境使用。
多Master模式
一个集群无Slave;全是Master;例如2个Master或者3个Master
单台机器宕机期间;这台机器上未被消费的消息在机器恢复之前不可订阅;消息实时性会受到影响。
多Master多Slave模式;异步复制
每个Master配置一个Slave;有多对Master-Slave;HA采用异步复制方式;主备有短暂消息延迟;毫秒级。
优点;即使磁盘损坏;消息丢失的非常少;且消息实时性不会受影响;因为Master宕机后;消费者仍然可以从Slave消费;此过程对应用透明;不需要人工干预。性能同多Master模式几乎一样。
缺点;Master宕机;磁盘损坏情况;会丢失少量消息。
多Master多Slave模式;同步双写
每个Master配置一个Slave;有多对Master-Slave;HA采用同步双写方式;主备都写成功;向应用返回成功。
优点;数据与服务都无单点;Master宕机情况下;消息无延迟;服务可用性与数据可用性都非常高。
缺点;性能比异步复制模式略低;大约低10%左右。
**此次我们选择双主双从集群搭建~
服务器环境
1192.168.116.128nameserver、brokerserverMaster1、Slave22192.168.116.132nameserver、brokerserverMaster2、Slave1
两台服务器各启动一个nameserver;各启动两个broker~
上传rocketmq压缩包并解压
解压后移动至/usr/local/rocketmq/下;没有则创建上级目录
环境变量配置;加入如下;
vim /etc/profile
#set rocketmq ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.5.1-bin-release PATH=$PATH:$ROCKETMQ_HOME/bin export ROCKETMQ_HOME PATH
source /etc/profile
① master1
vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-a.properties
#所属集群名字 brokerClusterName=rocketmq-cluster #broker名字;注意此处不同的配置文件填写的不一样 brokerName=broker-a #0 表示 Master;>0 表示 Slave brokerId=0 #nameServer地址;分号分割 namesrvAddr=192.168.116.128:9876;192.168.116.132:9876 #在发送消息时;自动创建服务器不存在的topic;默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic;建议线下开启;线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组;建议线下开启;线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点;默认凌晨 4点 deleteWhen=04 #文件保留时间;默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条;根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/usr/local/rocketmq/master #commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/master/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/master/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq/master/index #checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/master/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq/master/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
② slave2
vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-b-s.properties
#所属集群名字 brokerClusterName=rocketmq-cluster #broker名字;注意此处不同的配置文件填写的不一样 brokerName=broker-b #0 表示 Master;>0 表示 Slave brokerId=1 #nameServer地址;分号分割 namesrvAddr=192.168.116.128:9876;192.168.116.132:9876 #在发送消息时;自动创建服务器不存在的topic;默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic;建议线下开启;线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组;建议线下开启;线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=11011 #删除文件时间点;默认凌晨 4点 deleteWhen=04 #文件保留时间;默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条;根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/usr/local/rocketmq/slave #commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/slave/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/slave/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq/slave/index #checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/slave/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq/slave/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
① master2
vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-b.properties
#所属集群名字 brokerClusterName=rocketmq-cluster #broker名字;注意此处不同的配置文件填写的不一样 brokerName=broker-b #0 表示 Master;>0 表示 Slave brokerId=0 #nameServer地址;分号分割 namesrvAddr=192.168.116.128:9876;192.168.116.132:9876 #在发送消息时;自动创建服务器不存在的topic;默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic;建议线下开启;线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组;建议线下开启;线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点;默认凌晨 4点 deleteWhen=04 #文件保留时间;默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条;根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/usr/local/rocketmq/master #commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/master/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/master/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq/master/index #checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/master/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq/master/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
② slave1
vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-a-s.properties
#所属集群名字 brokerClusterName=rocketmq-cluster #broker名字;注意此处不同的配置文件填写的不一样 brokerName=broker-a #0 表示 Master;>0 表示 Slave brokerId=1 #nameServer地址;分号分割 namesrvAddr=192.168.116.128:9876;192.168.116.132:9876 #在发送消息时;自动创建服务器不存在的topic;默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic;建议线下开启;线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组;建议线下开启;线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=11011 #删除文件时间点;默认凌晨 4点 deleteWhen=04 #文件保留时间;默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条;根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/usr/local/rocketmq/slave #commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/slave/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/slave/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq/slave/index #checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/slave/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq/slave/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
vi /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/bin/runbroker.sh
找到如下修改;
JAVA_OPT=;${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m;
vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/bin/runserver.sh
找到如下修改;
JAVA_OPT=;${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m;
切换到rocketmq目录bin目录下执行;
nohup sh mqnamesrv &
查看日志;
tail -f ~/logs/rocketmqlogs/namesrv.log
① 128服务器
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties & tail -f ~/logs/rocketmqlogs/broker.log
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties & tail -f ~/logs/rocketmqlogs/broker.log
② 132服务器
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties & tail -f ~/logs/rocketmqlogs/broker.log
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties & tail -f ~/logs/rocketmqlogs/broker.log
启动完成;通过jps查看java进程
关闭命令;
关闭namesrv服务;sh bin/mqshutdown namesrv 关闭broker服务 ;sh bin/mqshutdown broker
--------------------------待续
序号 | IP | 角色 | 架构模式 |
---|