rocketmq基本概念
消息队列

rocketmq基本概念

JACIN··22 分钟阅读

目录#

[[toc]]

整体架构#

  • NameServer

    • 轻量级的路由注册中心,负责管理集群中所有 Broker 的路由信息(Topic ↔ Broker)。
    • 无状态、可水平扩展;客户端和 Broker 都通过 NameServer 做通信路由发现。
    • RocketMQ 的 NameServer 地址(协议+域名+端口),负责路由发现。客户端所有请求都会先打到 NameServer,获取可用 Broker 列表。
    • 就像「注册中心」,不存消息,只存路由(哪个 Topic 在哪些 Broker 上)。
  • Broker

    • 真正存储与转发消息的节点,把 Producer 发来的消息写入磁盘,并将消息推送或拉取给 Consumer。
    • 支持高可用(主从同步/异步复制)、横向扩展多个 Broker 实例。
    • 客户端在拿到 NameServer 返回的 Broker 列表后,直接跟 Broker 通信完成生产或消费。
  • Producer(生产者)

    • 发送消息到 Broker。可以同步/异步/单向发送。
    • 在发送时可指定 Topic、Tag、Key、延迟级别、事务消息等。
  • Consumer(消费者)

    • 从 Broker 拉取(或被推送)消息并进行业务处理。
    • 支持两种消费模式:
      • 集群模式(CLUSTERING):同一个 consumer group 下的实例共同消费,消息只会被其中一个实例处理。
      • 广播模式(BROADCASTING):同一个 consumer group 下的每个实例都能接收到全部消息。
  • 消息(Message)

    一条可传输的数据结构,通常包含:

    • 消息体(payload),即实际业务数据
    • 属性(properties),如消息 ID、时间戳、标签(Tag)、Key 等

Broker 组件(中间件)#

Broker 就是“消息中转与存储”的核心服务进程/组件。

go
┌───────────────────────┐
│      Broker进程       │
│  ┌─────────────────┐  │
│  │  CommitLog      │◀─┐│   追加写入消息
│  └─────────────────┘  ││
│  ┌─────────────────┐  ││
│  │  ConsumeQueue   │──┘│   按 Topic-Queue 索引
│  └─────────────────┘   │
│  ┌─────────────────┐   │
│  │  IndexFile      │   │   根据 Key 做二级索引
│  └─────────────────┘   │
│  ┌─────────────────┐   │
│  │  StoreCheckpoint│   │   记录刷盘进度
│  └─────────────────┘   │
│  ┌─────────────────┐   │
│  │  HAService      │   │   主从同步服务
│  └─────────────────┘   │
│  ┌─────────────────┐   │
│  │  NettyServer    │   │   处理网络请求(生产/消费)
│  └─────────────────┘   │
│  ┌─────────────────┐   │
│  │  BrokerController│  │   启动、调度与路由管理
│  └─────────────────┘   │
└───────────────────────┘

1. 核心职责

  1. 接收消息
    • Producer(生产者)把消息通过网络发给 Broker。
  2. 存储消息
    • Broker 会把消息追加写入磁盘(在 RocketMQ 里叫 CommitLog),保证即使重启也不会丢失。
  3. 转发消息
    • Consumer(消费者)向 Broker 请求拉取消息,Broker 根据消费位点(Offset)和路由表,把消息推送或返回给它。

2. 为什么要有 Broker?

  • 解耦生产者和消费者:生产者只要把消息投递给 Broker,就不用关心后端谁来消费,也不必等待业务处理完成。
  • 缓冲削峰:如果某个时刻消费端处理慢,Broker 会把消息存下来,等消费者空闲时再消费;反之亦然。
  • 高可用与扩展:可以部署多个 Broker 联合工作,既能横向扩展吞吐,也能做主从复制容灾。

想象一个快递分发中心:

  • 生产者:像快递员,它把包裹(消息)送到分发中心(Broker)。
  • Broker:就像这个分发中心,负责 接收存储分发 包裹给不同的派送员(消费者)。
  • 消费者:像末端派件员,它到分发中心去领包裹,然后送到最终收件人手里。

核心术语#

术语含义
Topic消息的“主题”,Producer 将消息发送到某个 Topic,Consumer 按 Topic 拉取。
MessageQueueTopic 在单个 Broker 上的一个物理队列(partition),由队列ID标识。Producer 发送时会路由到其中一个队列。
Queue ID消息队列编号(通常一个 Topic 会有多个 Queue,用于并行化写入/读取)。
Offset在某个 MessageQueue 中消息的序号,用于记录消费进度。
Tag消息的二级标签,用于分类过滤;Consumer 在订阅 Topic 时可做 Tag 过滤。
Key用户自定义的消息唯一标识(如订单号),便于快速检索和运维排障。
CommitLogBroker 的核心存储文件,将每条消息追加写入;后续通过 ConsumeQueue 做索引。
ConsumeQueue每个 Topic-Queue 对应的索引文件,存储消息在 CommitLog 的物理偏移,便于快速查找。
IndexFile根据消息 Key 做二级索引,支持按 Key 查询历史消息。

消息模型与模式#

  1. 推模式(Push) vs 拉模式(Pull)
    • RocketMQ 默认采用 Pull:Consumer 主动向 Broker 拉取消息,可灵活控制消费速率。
    • 也支持 Push 的“长轮询”变体,通过 Broker 触发 Consumer 请求。
  2. 消息顺序与并行
    • 顺序消息:Producer 按业务维度(如同一订单 ID)将消息路由到 同一个 MessageQueue,Consumer 串行消费。
    • 并行消费:不同队列并发消费,吞吐更高,但无法保证全局顺序。
  3. 事务消息
    • 支持半消息(half message)+ 本地事务回查:Producer 先发半消息,执行本地事务,再提交或回滚半消息,保证最终一致性。
  4. 延迟消息 / 定时消息
    • Broker 提供多个延迟级别(LEVEL),Producer 发送时指定 delayTimeLevel,达到延迟后再对外可见。

推模式(Push) vs 拉模式(Pull)#

拉模式(Pull)

  • 默认模式:RocketMQ 中,Consumer 主动向 Broker 发起拉取请求(pull),一次性可以拿一批消息(可配置批量大小)。
  • 优点
    • 消费速率可控:应用根据自身处理能力灵活调整拉取频率与批量大小,避免消息峰值时过载。
    • 简单重试:失败后直接重拉,无需依赖回调或 Broker 推送逻辑。
  • 缺点
    • 实时性稍弱:需要定时或长轮询才能实现近实时推送。

推模式(Push)

  • 变体:长轮询
    • Broker 在没有消息时不会立即返回空,而是保持连接等待新消息(长达几十秒),一旦有消息立刻响应。
    • 对应用而言更像“被动接收”,从而获得较好的实时性。
  • 优点
    • 更低延迟:接近实时地将消息推到应用,不必不断轮询。
    • 开发体验:业务代码只需注册回调,当有消息就触发。
  • 缺点
    • 实现复杂:Broker 需维护大量长连接;应用需处理异步回调并做好并发控制。

顺序消息#

  • 业务场景:比如同一笔订单的各阶段日志,必须按提交顺序消费。
  • 实现方式
    1. 同一 MessageQueue:Producer 将同一 Key(如订单ID)消息路由到单一队列上。
    2. 串行消费:Consumer 在该队列上按 Offset 从小到大顺序拉取,并保证同一时间只一个线程处理。
  • 注意事项
    • 限制了并行度:所有相同 Key 的消息只能串行,吞吐受限于单个队列和单线程。
    • 需要业务端提供 Hash 或自定义队列选择器。

并行消费#

  • 业务场景:日志上报、统计分析等无需全局顺序的高吞吐场景。
  • 实现方式
    1. 多队列:Topic 配置多个 MessageQueue(默认根据 Topic 中队列数)。
    2. 并发拉取:Consumer 同时对多个队列发起并行拉取,独立线程处理不同队列。
  • 注意事项
    • 无法保证整体顺序:只有队列内部顺序有保证,不同队列间消息处理是并发的。
    • 队列数最好与消费端实例数或线程数匹配,以获得最佳并行度。

延迟消息 / 定时消息#

业务场景

  • 定时任务触发:如发送提醒邮件、订单超时关闭等。
  • 延迟处理:如延迟N分钟后再发通知。

RocketMQ 的延迟实现

  • 预置延迟级别:Broker 在 broker.conf 中配置 messageDelayLevel,如 1s 5s 10s 30s 1m 2m 5m … 等多个级别。
  • Producer 指定级别:发送消息时设置 message.setDelayTimeLevel(level),Broker 根据级别计算实际可见时间。
  • 内部机制
    1. 延迟消息先写入专门的延迟队列(例如 SCHEDULE_TOPIC_XXXX),不会立即投递给订阅者;
    2. 延迟到期后,Broker 将消息重新投到正常 Topic 队列,并更新 Offset;
    3. 最终消费者按常规拉取处理。

优缺点

  • 优点:无需业务自己做定时任务调度,使用 Broker 内置功能即可。
  • 缺点:延迟粒度由级别决定;如果需要精确到秒或毫秒,需要自定义多级或结合外部调度。

重试次数 & 死信队列#

在 Broker 的配置里(broker.conf)或者在客户端启动参数中,你可以设置最大重试次数。超过次数后,消息就会被送入死信队列(DLQ),不再继续投递给正常消费者。

go
# broker.conf 中的配置示例
# 消费失败最大重试次数,默认是 16
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
defaultMaxReconsumeTimes=5
  • defaultMaxReconsumeTimes=5:消费者最多重试 5 次,第 6 次失败后,Broker 会把消息投递到死信队列,消费者就再也收不到它了。
  • 如果你想“30 分钟内没处理成功就抛弃”,可以基于延迟级别配合重试间隔做逼近,也可以在生产者端给消息加上业务过期时间(在消息属性里带一个 expireAt),在消费时检查过期时间并直接 CONSUME_SUCCESS 或丢弃。

消费者端的 ACK 机制#

RocketMQ 的消费是典型的“先取后 ack”模式:

  1. 拉取:ConsumerClient 向 Broker 拉取一批消息(默认一个队列一次最多 32 条)。
  2. 执行业务:应用接收到消息后,执行相应的业务逻辑(比如写数据库、调用接口等)。
  3. 回调 & 返回状态:你的 MessageListener 方法必须返回一个状态给客户端 SDK:
  • CONSUME_SUCCESS:表示一次消费成功,Broker 会更新该消息的消费进度,下次不再投递。
  • RECONSUME_LATER:表示消费失败,Broker 会根据重试策略(上面说的最大重试次数 + 延迟级别)重新投递这条消息。
  1. 取到消息 ≠ 消费成功

    只是把消息从 Broker 拉到客户端,并不代表业务已经处理完成。

  2. 必须通过回调 ACK

    你的消费逻辑(比如 MessageListener)必须返回一个状态:

    • CONSUME_SUCCESS —— 明确告诉 Broker「我处理完了,这条消息可以删掉、更新消费进度,下次不再投递」。
    • RECONSUME_LATER —— 告诉 Broker「我处理失败了,请稍后再重试」。
  3. 不回调或抛异常

    • 如果代码里没捕获异常并返回 CONSUME_SUCCESS,或者直接抛了、没返回任何状态,RocketMQ SDK 会默认当成失败(等同于 RECONSUME_LATER),继续重试直到超过最大重试次数才进死信队列。
    • 你不能“取了就算了”——不回调就意味着 Broker 认为消费失败,会一直重试。

为什么这样设计?#

  • 可靠性:确保消息不丢失,只有业务真正处理掉,Broker 才移除。
  • 灵活性:业务可以在回调里根据不同错误场景决定是重试还是直接 ACK 丢弃。
  • 可观测:每次重试和死信都可统计,方便监控与报警。

评论

还没有评论,来发第一个吧