ElasticSearch && kafka

  1. Producer Producer是Kafka中的消息生产者,主要用于生产带有特定Topic的消息,生产者生产的消息通过Topic进行归类,保存在Kafka 集群的Broker上,具体的是保存在指定的partition 的目录下,以Segment的方式(.log文件和.index文件)进行存储。

  2. Consumer Consumer是Kafka中的消费者,主要用于消费指定Topic的消息,Consumer是通过主动拉取的方式从Kafka集群中消费消息,消费者一定属于某一个特定的消费组。

  3. Topic Kafka中的消息是根据Topic进行分类的,Topic是支持多订阅的,一个Topic可以有多个不同的订阅消息的消费者。Kafka集群Topic的数量没有限制,同一个Topic的数据会被划分在同一个目录下,一个Topic可以包含1至多个分区,所有分区的消息加在一起就是一个Topic的所有消息。

  4. Partition 在Kafka中,为了提升消息的消费速度,可以为每个Topic分配多个Partition,这也是就之前我们说到的,Kafka是支持多分区的。默认情况下,一个Topic的消息只存放在一个分区中。Topic的所有分区的消息合并起来,就是一个Topic下的所有消息。每个分区都有一个从0开始的编号,每个分区内的数据都是有序的,但是不同分区直接的数据是不能保证有序的,因为不同的分区需要不同的Consumer去消费,每个Partition只能分配一个Consumer,但是一个Consumer可以同时一个Topic的多个Partition。

一、消息不丢失

  1. 生产者层面

acks=all配置:设置生产者的acks参数为all,确保消息在发送后需要等待所有ISR副本的确认才能认为消息发送成功。这样,即使Leader副本宕机,消息也能在其他副本中保存。

启用重试机制:配置生产者的重试机制(retries),当发送消息失败时,生产者会自动重试,避免因临时网络故障或其他问题导致消息丢失。

消息持久化:在发送消息前,将消息持久化到本地或外部存储系统(如数据库或文件系统)中,在确认消息被成功发送到Kafka之前,可以在重启时重新发送这些消息。

  1. Broker层面

ISR副本机制:配置Kafka集群的副本机制,确保每条消息至少有两个以上的副本存储(常用配置为3个副本),即使一个Broker出现故障,消息也不会丢失。

日志刷盘策略:配置Kafka的日志刷盘策略,调整log.flush.interval.messages和log.flush.interval.ms参数,确保消息及时从内存刷写到磁盘,防止由于系统崩溃导致的消息丢失。

数据恢复机制:启用Kafka的unclean.leader.election=false配置,确保在Leader故障后不会选举一个未完全同步的副本为新的Leader,防止丢失未同步的消息。

  1. 消费者层面

手动提交offset:消费者在消费消息后,手动提交offset,确保消息成功处理后才提交offset,这样即使消费者在处理过程中失败,也能在恢复后重新消费未提交的消息。

事务性消费:使用Kafka的事务性消费功能,确保消息处理与offset提交在一个事务中完成,防止未处理的消息被丢弃。

二、避免重复消费

  1. 生产者层面

幂等性配置 (enable.idempotence=true):开启Kafka生产者的幂等性功能,确保即使生产者在发送消息时出现网络抖动或重试,Kafka也能识别并过滤重复消息,防止重复写入。

去重标识:在每条消息中添加唯一标识符(如UUID),以便消费者能够识别并避免重复处理相同的消息。

  1. Broker层面

精确一次语义:通过Kafka的exactly-once语义配置(需要配合Kafka Streams或事务性生产者/消费者),确保在整个消息流转过程中,每条消息只被处理一次,防止因为系统故障或重启导致的重复消费。

  1. 消费者层面

幂等性处理:在消费消息时确保业务逻辑是幂等的。例如,在插入数据库之前,先检查记录是否存在,或者使用UPSERT(插入或更新)语句来处理重复数据,防止重复消费导致的数据不一致。

消息记录表:使用一张专门的表记录每条消息的处理状态,消费者在处理消息之前,先检查该消息是否已被处理过,如果已处理则跳过,避免重复消费。

手动管理offset:消费者可以通过手动管理offset来控制消息的消费进度。这样即使由于某种原因导致重复消费,也可以通过业务逻辑的幂等性检查来确保不会影响数据的一致性。

综合策略

监控与报警:建立完善的监控系统,监控Kafka集群的健康状况、消息处理进度、offset提交情况,及时发现消息丢失或重复消费的问题。

自动化恢复机制:配置自动化的故障恢复机制,例如在Broker或消费者异常时,自动触发重新消费未提交的消息。

日志与审计:记录所有关键操作的日志,以便在出现问题时进行审计和回溯,进一步确保系统的可靠性。

三、消息堆积

消息堆积原因

消息堆积发生的常见原因有以下情况:

  • 生产者短期间生产大量消息到Broker, 消费者无法及时消费(消费者并发低);
  • 生产者无法感知消息堆积,持续生产消息,导致消息堆积进一步加剧
  • 消费者能力不足,消费时间长,消费者宕机、网络异常与Broker无法通信
  • 分区设置异常
  • 新上线消费者功能存在bug,无法消费消息

总结以上解决方案

从主要消息堆积原因来看,主要分为这几个类型,消费者端,生产者端,服务端;

消费者端

  • 增加消费者实例个数,并发消费线程数量
  • 提高消费者消费的速度,避免消费消息时间过长。如果消费者处理慢,可以提高每批次拉取的数量。批次拉取数量过少(拉取数据/处理时间 < 生产速度),就容易堆积。
  • 消费kafka消息时,应该尽量减少每次消费时间,可通过减少调用三方接口、读库等操作, 从而减少消息堆积的可能性。
  • 增加消费组服务数量,合理增加 topic 的 partition 的个数,消费数 >= 分区数 (二者缺一不可)
  • 消息者支持灰度发布
  • 配置消费者参数,任务启动从上次提交offset处开始消费处理(Offset 是消息在 Kafka 分区中的唯一标识符,表示消息在分区中的位置。每条消息都有一个偏移量,这个偏移量是消息在分区中的顺序位置)
  • 如果确实容易出现堆积,消息来不及消费,建议可以先存在数据库中,然后逐条消费,不仅方便重新触发生产消息,还可以保留消费记录)

生产者端

  • 支持熔断与隔离, 当broker消息堆积时,对生产者能进行熔断,或将生产的消息先发送到其他topic
  • 设计时,Kafka消息key设置,给key加随机后缀,使其更均衡

服务端

  • 合理设置parition很重要,Kafka parition数是Kafka并行度调优的最小单元,如果Kafka分区数设置的太少,会直接影响Kafka consumer消费的吞吐量
  • 生产环境kafka必须集群, 有条件的支持异地多活,应对极端情况
  • 需要注意kafka消息保留时间(修改kafka配置文件, 默认一周)
倒排索引

倒排索引也叫反向索引,我们通常理解的索引是通过key寻找value,与之相反,倒排索引是通过value寻找key,故而被称作反向索引。

为了创建索引,ES引擎通过分词器将每个文档的内容拆成单独的词(称之为词条,或term),再将这些词条创建成不含重复词条的排序列表,然后列出每个词条出现在哪个文档。

这种结构由文档中所有不重复的词的列表构成,对于其中每个词都有至少一个文档与与之关联。这种由属性值来确定记录的位置的结构就是倒排索引,带有倒排索引的文件被称为倒排文件。

底层 lucene 简单来说,lucene 就是一个 jar 包,里面包含了封装好的各种建立倒排索引的算法代码。我们用 Java 开发的时候,引入 lucene jar,然后基于 lucene 的 api 去开发就可以了。

通过 lucene,我们可以将已有的数据建立索引,lucene 会在本地磁盘上面,给我们组织索引的数据结构。

ES基本概念 Index:索引,相当于关系数据库中的database概念,是一类数据的集合,是一个逻辑概念。

Type:类型,相当于数据库中的table概念,在6.0版本之前,一个Index中可以有多个type,7.0版本后彻底废弃多type,每个索引只能有一个type,即“ _doc”。

Document:文档,存储在ES中的主要实体叫文档,可以理解为关系型数据库中表的一行数据记录。每个文档由多个字段(field)组成。区别于关系型数据库的是,ES是一个非结构化的数据库,每个文档可以有不同的字段,并且有一个唯一标识。

Field:字段,存在于文档中,字段是包含数据的键值对,可以理解为Mysql一行数据的其中一列。

Mapping:映射,是对索引库中的索引字段及其数据类型进行定义,类似于关系型数据库中的表结构。ES默认动态创建索引和索引类型的Mapping。

ES集群概念 cluster:集群,一个ES集群由多个节点(node)组成, 每个集群都有一个共同的集群名称最为标识。

node:节点,一个ES实例即为一个节点,一台机器可以有多个节点。

shard:分片,如果某个索引包含大量数据,以至于一台机器无法存储,ES可以将一个索引中的数据切分为多个shard,分布在多台服务器上存储。这样,ES就可以横向扩展,存储更多数据,让搜索和分析等操作分布到多台服务器上去执行,提升吞吐量和性能。每个shard都是一个最小工作单元,承载部分数据,具有一个lucene实例和完整的建立索引、处理请求的能力。

replica:副本,就是shard的冗余备份,它可以防止数据丢失以及shard异常时负责容错和负载均衡。