消息队列

最后更新:2020-04-16

分布式系统不同模块之间的通信,除了远程服务调用以外,消息中间件是另外一个重要的手段,在各种互联网系统设计中,消息队列有着广泛的应用。

1. 什么是消息队列

消息队列,顾名思义,就是传递消息的队列,学习操作系统中进程通信的时候我们知道,消息队列是进程之间的一种很重要的通信机制。随着分布式系统的发展,消息队列在系统设计中又有了更多的应用。

参与消息传递的双方称为生产者和消费者,生产者和消费者可以只有一个实例,也可以集群部署,典型架构如下图所示:

消息生产者

消息的生产者是客户端应用程序代码的一部分,用来初始化异步调用处理流程。在消息队列的处理中,生产者的职责非常少,它要做的就是创建一个合法的消息,并把这个消息发送到消息队列中,由应用开发者决定生产者的代码在哪里执行,什么时候发送消息。

消息队列

消息体是参与生产和消费两方传递的数据,消息格式既可以是简单的字符串,也可以是序列化后的复杂文档信息。队列是消息的载体,用于传输和保存消息,它和数据结构中的队列一样,可以支持先进先出、优先级队列等不同的特性。

消息消费者

业务架构的第三个重要角色就是消息的消费者。消息的消费者从消息队列中接收并处理消息,也是由应用开发者实现的,但是一个异步处理的组件。消息的消费者不需要知道生产者存在,它只依赖消息队列中的消息。消息的消费者通常部署在独立的服务器上,和消息的生产者完全隔离,并且可以通过添加硬件的方式进行伸缩。

消息队列特性

  • 业务无关,只做消息分发。
  • FIFO,先投递先到达。
  • 容灾:节点动态增删和消息持久化。
  • 性能:吞吐量提升,系统内部通信效率提高。

3. 消息队列的作用

  • 削峰填谷:削峰填谷是消息队列最主要的作用,但是会造成请求处理的延迟。
  • 异步:异步处理是提升系统性能的神器,但是你需要分清同步流程和异步流程的边界,同时消息存在着丢失的风险,我们需要考虑如何确保消息一定到达。
  • 解耦:解耦合可以提升你的整体系统的鲁棒性。

3.1. 异步

消息的生产者将消息发送到消息队列以后,由消息的消费者从消息队列中获取消息,然后进行业务逻辑的处理,消息的生产者和消费者是异步处理的,彼此不会等待阻塞,所以叫做异步架构。

假设这样一个场景,用户下单成功需要给用户发短信,如果没有消息队列,我们会选择同步调用发短信的接口并等待短信发送成功。

我们忽略中间的网络通信时间消耗,假如订单系统处理需要 150ms,短信系统处理需要 200ms,那么整个处理流程的时间消耗就是 150ms + 200ms = 350ms。

假如再加一个发送邮件,这样整个系统的调用链又变长了,整个时间就变成了550ms。

如果使用 MQ,那么订单系统连续发送一条消息到消息队列中,假如耗时 5ms,短息系统和邮件系统从消息队列中异步读取消息,那么对于用户来说,用户的响应时间就变成了155ms

对一些比较耗时的操作,我们可以把处理过程通过消息队列进行异步处理。这样做一个显而易见的好处就是,可以推迟耗时操作的处理,使耗时操作异步化,而不必阻塞客户端的程序,客户端的程序在得到处理结果之前就可以继续执行,从而提高客户端程序的处理性能。

3.2. 解耦

依然以上面的用户下单的例子为例,最初的伪代码如下

void order() {
    // 创建订单
    createOrder();
    // RPC发送短信
    sendSms();
}

那么我们又添加了一个发送邮件,就得重新去修改代码

void order() {
    // 创建订单
    createOrder();
    // RPC发送短信
    sendSms();
    // RPC发送邮件
    sendMail();
}

如果在后面再增加增加积分、送券等业务呢?

void order() {
    // 创建订单
    createOrder();
    // RPC发送短信
    sendSms();
    // RPC发送邮件
    sendMail();
    // RPC增加积分 
    addPoints();
    // RPC增加券
    addCoupons();
}

一个大规模系统,往往会拆分为几十个甚至上百个子系统,每个子系统又对应N多个服务,这些系统与系统之间有着错综复杂的关系网络。如果某个系统产出一份核心数据,可能下游无数的其他系统都需要这份数据来实现各种业务逻辑。

此时如果你要是采取上面那种模式来设计订单系统,负责订单系统的人估计会被烦死。那么此时我们就可以用一个消息队列在中间进行解耦。订单系统只需要发送一个下单成功的消息到消息队列,下游哪个系统感兴趣自己去消费即可

使用分布式消息队列,可以使生产者和消费者的代码实现解耦合,也就是说可以多个生产者发布消息,多个消费者处理消息,共同完成完整的业务处理逻辑,但是它们却不需要直接进行交互调用,没有代码的依赖耦合。在传统的同步调用中,调用者代码必须要依赖被调用者的代码,也就是生产者代码必须要依赖消费者的处理逻辑代码,代码需要直接的耦合,而使用消息队列,这两部分的代码不需要进行任何的耦合。耦合程度越低的代码越容易维护,也越容易进行扩展。

解耦的特点对于团队的工作分工也很有帮助。从消息生产者的视角看,它只需要构建消息,将消息放入消息队列中,开发就完成了;而从消费者的开发视角看,它只需要从消息队列中获取消息,然后进行逻辑处理。它们彼此之间不进行任何耦合。消息的生产者不关心放入消息队列中下一步会发生什么,而消费者也不需要知道消息从哪里来。这两部分程序的开发者也可以不关心彼此的工作进展,他们开发的代码也不需要集成在一起,只要约定好消息格式,就可以各自开发了。

3.3. 削峰填谷

上下游对于事情的处理能力是不同的,例如Web前端通过LVS和nginx每秒可以承受上千万的请求。但数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级,我们不能奢求数据库的机器数量追上前端。

这种问题同样存在于系统和系统之间,如短信系统的速度可能卡在第三方短信平台(每秒几百次请求)。因为订单系统是我们的核心业务,所以它的配置可能会好一些,假设订单系统能承受这一万的用户请求,那么也就意味着我们同时也会出现一万调用发短信服务的请求。而短信系统并不是我们的主要业务,所以我们配备的硬件资源并不会太高,在加上第三方短信平台的限制,所有短信平台可能并不能承受一万次请求。

因为短信业务又不是我们的主业务,而且用户晚上个半分钟左右收到短信,一般是不会有太大问题的。我们可以把下单完成的信息发送到消息队列中,而短信系统尽自己所能地去消息队列中取消息和消费消息,即使处理速度慢一点也无所谓,只要我们的系统没有崩溃就行了。

这样的结果可能就是在高峰期消息队列中挤压了大量的消息,但等高峰期过了之后,短信系统就会快速将积压的消息给解决掉,这就是填谷。

在访问高峰,用户的并发访问数可能超过了系统的处理能力,所以在高峰期就可能会导致系统负载过大,响应速度变慢,更严重的可能会导致系统崩溃。这种情况下,通过消息队列将用户请求的消息纳入到消息队列中,通过消息队列缓冲消费者处理消息的速度。

如图中所示,消息的生产者负载有高峰有低谷,但是到了消费者这里,只会按照自己的最佳处理能力去消费消息。高峰期它会把消息缓冲在消息队列中,而在低谷期它也还是使用自己最大的处理能力去获取消息,将前面缓冲起来、来不及及时处理的消息处理掉。那么,通过这种手段可以实现系统负载削峰填谷,也就是说将访问的高峰削掉,而将访问的低谷填平,使系统处在一个最佳的处理状态之下,不会对系统的负载产生太大的冲击。

4. 消息队列的消费模型

先来看一下消息队列的两种基础模型,也就是点对点和发布订阅方式

4.1. 点到点模型

在点对点模型下,生产者向一个特定的队列发布消息,消费者从该队列中读取消息,每条消息只会被一个消费者处理。

4.2. 发布/订阅模型

大部分人在浏览资讯网站时会订阅喜欢的频道,比如人文社科,或者娱乐新闻,消息队列的发布订阅也是这种机制。在发布订阅模型中,消费者通过一个 Topic 来订阅消息,生产者将消息发布到指定的队列中。如果存在多个消费者,那么一条消息就会被多个消费者都消费一次。

点对点模型和发布订阅模型,主要区别是消息能否被多次消费,发布订阅模型实现的是广播机制。如果只有一个消费者,则可以认为是点对点模型的一个特例。

5. 消息队列的挑战

  • 消息无序 因为生产者和消费者是异步处理的。虽然消息队列本身会保证先创建的消息在前面,但是消费者却并不能保证先创建的消息先消费掉。

  • 重复消费 因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。

  • 系统可用性降低 加入消息队列后,就会增加了一个风险因素:如果消息队列挂了,系统的运行就不对了,可用性降低

  • 系统复杂性增加 消息队列使系统的架构和处理流程更加复杂,带来了更多的复杂性问题,如重复消息、消息丢失、消费顺序性等问题,为了解决这些问题,又需要引入很多复杂的机制,从而也对架构师的系统架构设计能力和架构把控能力提出了更高的挑战和要求。

  • 数据一致性问题 A 系统发送完消息后直接返回成功了,此时C系统读取消息写数据库的时候失败了,就会产生数据不一致问题

6. 顺序消费

消息传输和消费的有序性,是消息队列应用中一个非常重要的问题。在分布式系统中,很多业务场景都需要考虑消息投递的时序。例如,电商中的订单状态流转、数据库的 binlog 分发,都会对业务的有序性有要求。

消息队列中的队列是一个有序的数据结构,消息传递是顺序的,但在实际开发中,特别是在分布式场景下,消息的有序性是很难保证的。

分布式的时钟问题

有序性可以分为业务上的有序和时间上的有序,先看一下时钟上的有序。在分布式环境下,消息的生产者、消费者和队列存储,可能分布在不同的机器上,不同的机器使用各自的本地时钟,由于服务器存在时钟偏斜等问题,本地时间会出现不一致,所以不能用消息发送和到达的时间戳作为时序判断标准。另一方面,分布式系统下缺乏全局时钟,这就使得绝对的时间顺序实现起来更加困难。

消息发送端和消费端的集群

在目前大多数消息队列的应用中,生产者和消费者都是集群部署,通过 ProducerGroup 和 ConsumerGroup 的方式来运行。

生产者如果存在多个发送实例,那么各个发送方的时间戳无法同步,所以消息发送端发送时的时序不能用来作为消息发送的有序判断。

同样的,消费端可能存在多个实例,即使队列内部是有序的,由于存在消息的分发过程,不同消费实例的顺序难以全局统一,也无法实现绝对的有序消费。

消息重传等的影响

我们知道,消息队列在传输消息时,可能会出现网络抖动导致的消息发送失败等,对这种场景的兼容,一般是通过进行合理地重传。消息的重传发生在什么时候是不可预知的,这也会导致消息传输出现乱序。

网络及内部并发

消息生产者集群或者消费端集群的方式,无法保证消息的绝对时序,如果只有一个消费端或者只有一个生产端呢?可以考虑这样一个场景,如果单纯地依靠消息队列本身来保证,那么在跨实例的情况下,因为网络传输的不稳定会有先后顺序,以及内部消费的并发等,仍然无法实现绝对有序。

通过上面的分析可以看到,保证消息绝对的有序,实现起来非常困难,除非在服务器内部,并且一个生产者对应一个消费者。但是这种情况的消息队列肯定是无法在实际业务中应用的。

从消息队列自身的角度,可以分为全局有序和局部有序。当前大部分消息队列的应用场景都是集群部署,在全局有序的情况下,无法使用多分区进行性能的优化。在实际开发中,一般是应用局部有序,把业务消息分发到一个固定的分区,也就是单个队列内传输的方式,实现业务上对有序的要求。

例如Kafka 保证消息在 Partition 内的顺序,对于需要确保顺序的消息,发送到同一个 Partition 中就可以。单分区的情况下可以天然满足消息有序性,如果是多分区,则可以通过制定的分发策略,将同一类消息分发到同一个 Partition 中。

消息消费的有序性,归根到底是一个业务场景的设计问题,可以在业务中进行规避,或者通过合理的设计方案来解决。

解决一个问题,如果从正面没有很好的解决方案,那么我们就可以考虑是否绕过它。

比如在一个订单状态消息流转的业务场景中,订单会有创建成功、待付款、已支付、已发货的状态,这几个状态之间是单调流动的,也就是说,订单状态的更新需要保证有序性。考虑一下,如果我们要实现的功能是根据发货的状态,进行物流通知用户的功能,实际上因为这个状态是单调不可逆向的,我们可以忽略订单状态的顺序,只关注最后是否已发货的状态。

也就是说,在这个场景下,订单状态流转虽然是要考虑顺序,但是在具体的这个功能下,实际上不需要关注订单状态消息消费的时序。

业务中如何实现有序消费

除了消息队列自身的顺序消费机制,我们可以合理地对消息进行改造,从业务上实现有序的目的。具体的方式有以下几种。

  • 根据不同的业务场景,以发送端或者消费端时间戳为准(选一个标尺来衡量时序的先后顺序):比如在电商大促的秒杀场景中,如果要对秒杀的请求进行排队,就可以使用秒杀提交时服务端的时间戳,虽然服务端不一定保证时钟一致,但是在这个场景下,我们不需要保证绝对的有序。

  • 每次消息发送时生成唯一递增的 ID:在每次写入消息时,可以考虑添加一个单调递增的序列 ID,在消费端进行消费时,缓存最大的序列 ID,只消费超过当前最大的序列 ID 的消息。这个方案和分布式算法中的 Paxos 很像,虽然无法实现绝对的有序,但是可以保证每次只处理最新的数据,避免一些业务上的不一致问题。

  • 通过缓存时间戳的方式:这种方式的机制和递增 ID 是一致的,即当生产者在发送消息时,添加一个时间戳,消费端在处理消息时,通过缓存时间戳的方式,判断消息产生的时间是否最新,如果不是则丢弃,否则执行下一步。

7. 重复消费

其实无论是那种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。

只是不同的消息队列发送的确认信息形式不同。因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。

解决这个问题的主要手段是将消息处理设计成幂等性,也就是说消费者可以对同一条消息进行多次处理计算,而不会影响最终的结果。

天然幂等不需要额外设计

有部分业务是天然幂等的,这部分业务,允许重复调用,即允许重试,在配置消息队列时,还可以通过合理的重试,来提高请求的成功率。

利用数据库进行去重

业务上的幂等操作可以添加一个过滤的数据库,比如设置一个去重表,也可以在数据库中通过唯一索引来去重。

举一个例子,现在要根据订单流转的消息在数据库中写一张订单 Log 表,我们可以把订单 ID 和修改时间戳做一个唯一索引进行约束。

当消费端消费消息出现重复投递时,会多次去订单 Log 表中进行写入,由于我们添加了唯一索引,除了第一条之外,后面的都会失败,这就从业务上保证了幂等,即使消费多次,也不会影响最终的数据结果。

设置全局唯一消息 ID 或者任务 ID

我们在消息投递时,给每条业务消息附加一个唯一的消息 ID,然后就可以在消费端利用类似分布式锁的机制,实现唯一性的消费。

还是用上面记录订单状态流转消息的例子,我们在每条消息中添加一个唯一 ID,消息被消费后,在缓存中设置一个 Key 为对应的唯一 ID,代表数据已经被消费,当其他的消费端去消费时,就可以根据这条记录,来判断是否已经处理过。

8. 参考资料

《架构师的 36 项修炼》

《分布式技术原理与实战45讲》

Edgar

Edgar
一个略懂Java的小菜比