FantasticMao 技术笔记
BlogGitHub
  • README
  • C & Unix
    • C
      • 《C 程序设计语言》笔记
      • C 语言中的陷阱
      • CMake 示例
      • GNU make
      • LLVM Clang
      • Nginx 常用模块
      • Vim 常用命令
    • Unix-like
      • 《深入理解计算机系统》笔记
      • 《UNIX 环境高级编程》笔记 - UNIX 基础知识
      • 《UNIX 环境高级编程》笔记 - 文件 IO
      • 《UNIX 环境高级编程》笔记 - 标准 IO 库
      • 《鳥哥的 Linux 私房菜》笔记 - 目录配置
      • 《鳥哥的 Linux 私房菜》笔记 - 认识与学习 bash
      • 《鳥哥的 Linux 私房菜》笔记 - 任务管理
      • OpenWrt 中的陷阱
      • iptables 工作机制
  • Go
    • 《A Tour of Go》笔记
    • Go vs C vsJava
    • Go 常用命令
    • Go 语言中的陷阱
  • Java
    • JDK
      • 《Java 并发编程实战》笔记 - 线程池的使用
      • 设计模式概览
      • 集合概览
      • HashMap 内部算法
      • ThreadLocal 工作机制
      • Java Agent
    • JVM
      • 《深入理解 Java 虚拟机》笔记 - Java 内存模型与线程
      • JVM 运行时数据区
      • 类加载机制
      • 垃圾回收算法
      • 引用类型
      • 垃圾收集算法
      • 垃圾收集器
    • Spring
      • Spring IoC 容器扩展点
      • Spring Transaction 声明式事务管理
      • Spring Web MVC DispatcherServlet 工作机制
      • Spring Security Servlet 实现原理
    • 其它
      • 《Netty - One Framework to rule them all》演讲笔记
      • Hystrix 设计与实现
  • JavaScript
    • 《写给大家看的设计书》笔记 - 设计原则
    • 《JavaScript 权威指南》笔记 - jQuery 类库
  • 数据库
    • ElasticSearch
      • ElasticSearch 概览
    • HBase
      • HBase 数据模型
    • Prometheus
      • Prometheus 概览
      • Prometheus 数据模型和指标类型
      • Prometheus 查询语法
      • Prometheus 存储原理
      • Prometheus vs InfluxDB
    • Redis
      • 《Redis 设计与实现》笔记 - 简单动态字符串
      • 《Redis 设计与实现》笔记 - 链表
      • 《Redis 设计与实现》笔记 - 字典
      • 《Redis 设计与实现》笔记 - 跳跃表
      • 《Redis 设计与实现》笔记 - 整数集合
      • 《Redis 设计与实现》笔记 - 压缩列表
      • 《Redis 设计与实现》笔记 - 对象
      • Redis 内存回收策略
      • Redis 实现分布式锁
      • Redis 持久化机制
      • Redis 数据分片方案
      • 使用缓存的常见问题
    • MySQL
      • 《高性能 MySQL》笔记 - Schema 与数据类型优化
      • 《高性能 MySQL》笔记 - 创建高性能的索引
      • 《MySQL Reference Manual》笔记 - InnoDB 和 ACID 模型
      • 《MySQL Reference Manual》笔记 - InnoDB 多版本
      • 《MySQL Reference Manual》笔记 - InnoDB 锁
      • 《MySQL Reference Manual》笔记 - InnoDB 事务模型
      • B-Tree 简述
      • 理解查询执行计划
  • 中间件
    • gRPC
      • gRPC 负载均衡
    • ZooKeeper
      • ZooKeeper 数据模型
    • 消息队列
      • 消息积压解决策略
      • RocketMQ 架构设计
      • RocketMQ 功能特性
      • RocketMQ 消息存储
  • 分布式系统
    • 《凤凰架构》笔记
    • 系统设计思路
    • 系统优化思路
    • 分布式事务协议:二阶段提交和三阶段提交
    • 分布式系统的技术栈
    • 分布式系统的弹性设计
    • 单点登录解决方案
    • 容错,高可用和灾备
  • 数据结构和算法
    • 一致性哈希
    • 布隆过滤器
    • 散列表
  • 网络协议
    • 诊断工具
    • TCP 协议
      • TCP 报文结构
      • TCP 连接管理
由 GitBook 提供支持
在本页
  • 文件结构
  • 数据结构
  • 存储架构
  • 缓存页和内存映射
  1. 中间件
  2. 消息队列

RocketMQ 消息存储

消息存储模块是 RocketMQ 中最为复杂和重要的模块。

文件结构

/{user.home}/store/.
                   +--commitlog
                   |  +--00000000000000000000 (1GB)
                   |  +--00000000001073741824 (1GB)
                   +--consumequeue
                   |  +--topic_1 (topic name)
                   |  |  +--0 (queue id)
                   |  |  |  +--00000000000000000000 (5.72MB=300000*20/1024/1024)
                   |  |  |  +--00000000000006000000 (5.72MB)
                   |  |  +--1 (queue id)
                   |  |     +--00000000000000000000 (5.72MB)
                   |  +--topic_2 (topic name)
                   |     +--0 (queue id)
                   |        +--00000000000000000000 (5.72MB)
                   +--index
                      +--19950815000000000 (1995-08-15 00:00:00.000, 400MB)
                      +--20201112080000000 (2020-11-12 08:00:00.000, 400MB)

数据结构

                    +---------------------------+
         +-----+    |+-+ +-+ +-+ +-+ +-+ +-+ +-+|
         |+---+|--->|| | | | | | | | | | | | | ||<-----+
         ||   ||    |+-+ +-+ +-+ +-+ +-+ +-+ +-+|      |
         |+---+|    +---------------------------+      | offset
         |     |            ConsumeQueue               +-------- Consumer
         |+---+|    +---------------------------+      |
         ||   ||    |+-+ +-+ +-+ +-+ +-+ +-+ +-+|      |
         |+---+|--->|| | | | | | | | | | | | | ||<-----+
         |     |    |+-+ +-+ +-+ +-+ +-+ +-+ +-+|
         |+---+|    +---------------------------+
         ||   ||            ConsumeQueue
         |+---+|
         |     |
         |+---+|    +---------------------------+
         ||   ||    |+-+ +-+ +-+ +-+ +-+ +-+ +-+|  queyr by key
         |+---+|--->|| | | | | | | | | | | | | ||<-------------- User
         +-----+    |+-+ +-+ +-+ +-+ +-+ +-+ +-+|
        CommitLog   +---------------------------+
            ^               IndexFile
            |
Producer ---+

存储架构

RocketMQ 消息存储模块的设计主要与上面三类文件相关:

  • CommitLog 用于存储消息的主体和元数据。Producer 端写入的消息主体的内容长度是不固定的。CommitLog 文件大小默认 1GB,文件长度为 20 位,左边补零,剩余为起始偏移量。例如 00000000000000000000 代表了第一个文件,起始偏移量为 0;00000000001073741824 代表了第二个文件,起始偏移量为 1GB=1024*1024*1024KB=1073741824KB。CommitLog 文件的内容是顺序写入的。

  • ConsumeQueue 是用于提高 Consumer 消费消息性能的逻辑消费队列。由于 RocketMQ 是基于 topic 订阅机制来发送和消费消息的,因此如果 Consumer 直接从 CommitLog 中依据 topic 来查找待消费的消息,则需要遍历完整的 CommitLog,这将会是一个非常低效的操作。ConsumeQueue 作为 CommitLog 中消费消息的索引,保存了指定 topic 下的 MessageQueue 在 CommitLog 中的物理偏移量、消息大小、和消息 tag 的哈希值。因此 ConsumeQueue 文件的存储路径分为三层:consumequeue/{topic}/{queue}/{file},并且 ConsumeQueue 文件同样采取定长设计,其中的每一个条目为 20 字节:物理偏移量 8 字节 + 消息大小 4 字节 + 消息 tag 哈希值 8 字节。每个 ConsumeQueue 由 30w 个条目组成,共计 5.72 MB,并且可以像访问数组一样随机访问 ConsumeQueue 内部的条目。

  • IndexFile 和 ConsumeQueue 类似,是用于提高依据 key 来查询消息的效率的索引文件。IndexFile 底层的存储方式设计为在文件系统中实现的 HashMap 结构,因此 IndexFile 的索引类型是 hash 类型的。IndexFile 存储路径为 index/{fileName},其中文件名称为 YYYYMMddHHmmsssS 格式的日期时间,并且 IndexFile 文件同样采取定长设计,单个文件大小约 400MB。

缓存页和内存映射

缓存页(Page Cache)是操作系统对文件的缓存,用于加速对文件的读写速度。一般来说,应用程序对文件的顺序读取速度会接近于对内存的读写速度,这是因为操作系统使用了缓存页机制来对读写操作进行了优化。

  • 对于数据的写入操作,操作系统会先将数据写入至缓存页,然后通过异步的方式由 pdflush 内核线程将缓存页中的数据刷新至磁盘;

  • 对于数据的读取操作,如果在一次读取文件时出现未命中缓存页的情况,则操作系统在磁盘上访问文件的同时,会顺序对其他相邻的数据文件进行预读。

在 RocketMQ 中,ConsumeQueue 存储的数据比较少,并且是被操作系统顺序读取的。因此在缓存页的机制下,操作系统对 ConsumeQueue 的读取性能接近于对内存的读取,并且消息堆积也不会对 ConsumeQueue 的读取性能造成影响。但是 CommitLog 存储的数据比较大,并且会被操作系统随机读取,因此对 CommitLog 读取的性能会相对低效。此时,如果选择合适的系统 I/O 调用算法(例如在存储采用 SSD 时设置调度算法为 Deadline),随机读取的性能会有所提升。

在 RocketMQ 中,主要通过 MappedByteBuffer 内存映射机制对文件进行读写操作。利用 NIO 中的 FileChannel 将磁盘上的物理文件直接映射到用户态的内核空间中,将应用对文件的操作转换为直接对内存地址进行操作,从而极大地提高了文件的读写性能。

最后更新于1年前

RocketMQ 存储 CommitLog 的代码请见 ,序列化 CommitLog 的代码请见 。

RocketMQ 通过 DefaultMessageStore 的 doDispatch 方法,将消息的索引内容存储至 ConsumeQueue 和 IndexFile 中,具体代码请见 。

GitHub
GitHub
GitHub