简介
简介
官网:https://rocketmq.apache.org
RocketMQ 是由阿里捐赠给 Apache 的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ 既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性
storePathCommitLog:C:\Users\your\store\commitlog
System.setProperty("rocketmq.client.log.loadconfig","false");
核心概念
Topic:消息主题,一级消息类型,生产者向其发送消息
Message:生产者向Topic发送并最终传送给消费者的数据消息的载体
消息属性:生产者可以为消息定义的属性,包含Message Key和Tag
Message Key:消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑
Message ID:消息的全局唯一标识,由消息队列RocketMQ系统自动生成,唯一标识某条消息
Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类
Producer:也称为消息发布者,负责生产并发送消息至Topic
Consumer:也称为消息订阅者,负责从Topic接收并消费消息
分区:即Topic Partition,物理上的概念。每个Topic包含一个或多个分区
消费位点:每个Topic会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset;分区的起始位置对应的位置叫做起始位点MinOffset
Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致
Group ID:Group的标识
队列:个Topic下会由一到多个队列来存储消息
Exactly-Once投递语义:Exactly-Once投递语义是指发送到消息系统的消息只能被Consumer处理且仅处理一次,即使Producer重试消息发送导致某消息重复投递,该消息在Consumer也只被消费一次
集群消费:一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息
广播消费:一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息
定时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息
延时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息
事务消息:RocketMQ提供类似X/Open XA的分布事务功能,通过消息队列RocketMQ的事务消息能达到分布式事务的最终一致
顺序消息:RocketMQ提供的一种按照顺序进行发布和消费的消息类型,分为全局顺序消息和分区顺序消息
全局顺序消息:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费
分区顺序消息:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Message Key是完全不同的概念
消息堆积:Producer已经将消息发送到消息队列RocketMQ的服务端,但由于Consumer消费能力有限,未能在短时间内将所有消息正确消费掉,此时在消息队列RocketMQ的服务端保存着未被消费的消息,该状态即消息堆积
消息过滤:Consumer可以根据消息标签(Tag)对消息进行过滤,确保Consumer最终只接收被过滤后的消息类型。消息过滤在消息队列RocketMQ的服务端完成
消息轨迹:在一条消息从Producer发出到Consumer消费处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从Producer发出,经由消息队列RocketMQ服务端,投递给Consumer的完整链路,方便定位排查问题
重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅的Topic的消费进度,设置完成后Consumer将接收设定时间点之后由Producer发送到消息队列RocketMQ服务端的消息
死信队列:死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列RocketMQ会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明Consumer在正常情况下无法正确地消费该消息。此时,消息队列RocketMQ不会立刻将消息丢弃,而是将这条消息发送到该Consumer对应的特殊队列中
消息队列RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)
应用场景
削峰填谷:诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列RocketMQ可提供削峰填谷的服务来解决该问题
异步解耦:交易系统作为淘宝和天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列RocketMQ可实现异步通信和应用解耦,确保主站业务的连续性
顺序收发:细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列RocketMQ提供的顺序消息即保证消息FIFO
分布式事务一致性:交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列RocketMQ的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性
大数据分析:数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列RocketMQ与流式计算引擎相结合,可以很方便的实现业务数据的实时分析
分布式缓存同步:天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列RocketMQ构建分布式缓存,实时通知商品数据的变化
异步解耦
待更新
性能调优
JVM层面
监控暂停
rocketmq-console 这个是官方提供的一个 WEB 项目,可以查看 RocketMQ 数据和执行一些操作。但是这个监控界面又没有控制权限,并且还有一些消耗性能的查询操作,如果要提高性能,建议这个可以暂停。一般的公司在运维方面会有专门的监控组件,如 zabbix 会做统一处理。或者是简单的 shell 命令监控的方式有很多,比如我们可以写一个 shell 脚本,监控执行 RocketMQ Java 进程的存货状态,如果 RocketMQ crash 了,发送警告
消除偏向锁
大家了解,在 JDK 1.8 sync 有偏向锁,但是在 RocketMQ 都是多线程的执行,所以竞争比较激烈,建议把偏向锁取消,以免没有必要的开销
-XX:-UseBiasedLocking
:禁用偏向锁
垃圾回收参数
RocketMQ 推荐使用 G1 垃圾回收器
-Xms8g -Xmx8g -Xmn4g -XX:+UseG1GC
:这个就是很关键的一块参数了,也是重点需要调整的,就是默认的堆大小是 8g 内存,新生代是 4g 内存。如果是内存比较大,比如有 48g 的内存,所以这里完全可以给他们翻几倍,比如给堆内存 20g,其中新生代给 10g,甚至可以更多些,当然要留一些内存给操作系统来用
-XX:G1HeapRegionSize=16m
:这里把 G1 的 region 大小设置为 16m,这个因为机器内存比较多,所以 region 大小可以调大一些给到 16m,不然用 2m 的 region,会导致 region 数量过多
-XX:G1ReservePercent=25
:这个参数是说在 G1 管理的老年代里预留 25% 的空闲内存,保证新生代对象晋升到老年代的时候有足够的空间,避免老年代内存都满了,新生代有对象要进入老年代没有充足内存了。默认值是 10%,略微偏少,这里 RocketMQ 给调大了一些
-XX:initiatingHeapOccupancyPercent=30
:这个参数是说,当堆内存的使用率达到 30% 之后就会自动启动 G1 的并发垃圾回收,开始尝试回收一些垃圾对象。默认值是 45%,这里调低了一些,也就是提高了 GC 的频率,但是避免了垃圾对象过多,一次垃圾回收耗时过长的问题
操作系统层面
基本参数
vm.overcommit_memory=1
:是否允许内存的过量分配。当值为 0 的时候,用户申请内存时,内核会去检查是否有这么大的内存空间;当值为 1 的时候,内核始终认为有足够大的内存空间,直到它用完为止;当值为 2 的时候,内核禁止任何形式的过量分配内存
vm.swappiness=0
:仅在内存不足的情况下,当剩余空闲内存低于 vm.min_free_kbytes limit
时,使用交换空间
vm.swappiness=1
:内核版本 3.5 及以上、Red Hat 内核版本 2.6.32-303 及以上,进行最少量的交换,而不禁用交换
vm.swappiness=10
:当系统存在足够内存时,推荐设置为该值以提高性能
vm.swappiness=60
:默认值
vm.swappiness=100
内核将积极的使用交换空间
vm.max_max_count=655360
:定义了一个进程能拥有的最多的内存区域,默认为 65536
ulimit=1000000
:文件打开数
limit.conf
:设置用户能打开的最大文件数
网卡
网络接口控制器(network interface controller NIC)因 Ring Buffer 写满导致丢包的情况很多。当业务流量过大且出现网卡丢包的时候,建议调整 Ring Buffer 的大小,这个大小的设置一定程度上可以缓解丢包的状况
关闭IRQBalance
IRQBalance 主要功能是可以合理的调配使用各个 CPU 核心,特别是对于目前主流多核心的 CPU,简单的说就是能够把压力均匀的分配到各个 CPU 核心上,对提升性能有很大帮助。但实际中往往影响 CPU 的使用均衡,建议服务器环境中关闭
TCP NODEALY
Nagle 算法用于对缓冲区内的一定数量的消息进行自动连接。该处理过程(称为 Nagling)通过减少必须发送的封包的数量,提高了网络应用程序系统的效率。(Nagle 虽然解决了小封包的问题,但也导致了较高的不可预测的延迟,同时降低了吞吐量)