跳转至

Kafka优化

如何防止消息丢失?

Producer 端

ack 机制

Producer 在发送消息时,可以配置 ack 机制来保证消息的发送成功。ack 机制有三种级别:

  • 0:不等待任何 ack,消息发送后立即返回。这种模式下,消息丢失的可能性最高。
  • 1:等待 Leader 分区副本收到消息后返回。这种模式下,只有 Leader 分区副本丢失,才会导致消息丢失。
  • all:等待所有 ISR 分区副本收到消息后返回。这种模式下,只有所有 ISR 分区副本都丢失,才会导致消息丢失。
重试机制

Producer 在发送消息失败时,可以配置重试机制来再次发送消息。重试机制可以有效减少消息丢失的可能性。

  • Producer 端发送消息的重试次
  • 两次重试之间的间隔时间

Broker 端

  • 消息持久化:Kafka 将消息持久化到磁盘上,即使 Broker 宕机,消息也不会丢失。
  • 副本机制:Kafka 将每个分区的消息复制到多个副本上,即使一个副本丢失,其他副本仍然可以提供服务。

Consumer 端

offset 机制:Consumer 消费消息时,会记录消费的位置,称为 offset。

offset 机制保证了 Consumer 不会重复消费消息。

为了不丢数据,正确的做法是:拉取数据、 业务逻辑处理、 提交消费 Offset 位移信息。

设置参数 enable.auto.commit = false, 采用手动提交位移的方式。另外对于消费消息重复的情况,业务自己保证幂等性, 保证只成功消费一次即可。

业务实现幂等性

以处理订单消息为例。

幂等Key由订单编号+订单状态所组成,在处理之前,首先会去Redis查询是否存在当前Key。

  • 如果存在,说明已经处理过,直接丢弃;
  • 如果不存在,继续往下处理,最终的逻辑是将处理过的数据插入到业务 DB 中,最终把幂等 Key 插入到 Redis。

Redis 其实只是一个「前置」处理,最终的幂等性是依赖数据库的唯一 Key 来保证的(唯一 Key 实际上也是订单编号+状态)

如何防止重复消费?

Kafka Broker 上存储的消息,都有一个 Offset 标记。然后 kafka 的消费者是通过 offSet 标记来维护当前已经消费的数据,每消费一批数据,Kafka Broker 就会更新 OffSet 的值,避免重复消费。

出现重复消费的情况

  • 在 Consumer 消费的过程中,应用程序被强制 kill 掉或者宕机,可能会导致 Offset 没提交,从而产生重复提交的问题。
  • Consumer 端会从分配的 Partition 里面去消费消息,如果 Consumer 在默认的 5 分钟内没办法处理完这一批消息。此时就会触发 Kafka 的 Rebalance 机制,从而导致 Offset 自动提交失败。

解决办法

  • 提高消费端的处理性能避免触发 Balance,比如可以用异步的方式来处理消息,缩短单个消息消费的市场。或者还可以调整消息处理的超时时间。还可以减少一次性从 Broker 上拉取数据的条数。
  • 可以针对消息生成 md5 然后保存到 mysql 或者 redis 里面,在处理消息之前先去 mysql 或者 redis 里面判断是否已经消费过。这个方案其实就是利用幂等性的思想。

如何做到消息的顺序消费?

  • 生产者:通过分区的 leader 副本负责数据以先进先出的顺序写入,来保证消息顺序性。
  • 消费者:同一个分区内的消息只能被一个 group 里的一个消费者消费,保证分区内消费有序。

kafka 每个 partition 中的消息在写入时都是有序的,消费时, 每个 partition 只能被每一个消费者组中的一个消费者消费,保证了消费时也是有序的。

整个 kafka 不保证有序。如果为了保证 kafka 全局有序,那么设置一个生产者,一个分区,一个消费者。

顺序消费的问题

保证最终一致性

  • 宽表:如订单状态,使用一个或多个独立字段。消息来了,只更新对应的字段就好,消息只会暂时不一样,最终是一致的。
  • 消息补偿:另一个进行消费相同topic的数据,消息落盘,延迟处理。将消息与DB进行对比,如果发现数据不一致,再重新发送消息至主进程处理
  • 根据唯一标识路由到同一个partition:如根据 user_id 或者 order_id 发送到相同的 partition (一个partition由一个consumer消费)

如何解决消息积压问题?

消费者的消费速度远赶不上生产者的生产消息的速度,导致 kafka 中有大量的数据没有被消费。

随着没有被消费的数据堆积越多,消费者寻址的性能会越来越差,最后导致整个 kafka 对外提供的服务的性能很差,从而造成其他服务也访问速度变慢,造成服务雪崩。

解决方案

  • 在这个消费者中,使用多线程,充分利用机器的性能进行消费消息。
  • 创建多个消费组,多个消费者,部署到其他机器上,一起消费,提高消费者的消费速度
  • 优化业务的架构设计,提升业务层面消费的性能。
  • 消息转发至新的主题:创建一个消费者,该消费者在 kafka 另建一个主题,配上多个分区,多个分区再配上多个消费者。该消费者将 poll 下来的消息,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始一起消费了。——不常用