kafka数据同步4:基于Flink 2PC的Kafka2Hive

Kafka2Hive是基于Flink实现的kafka to Hive数据同步器。其语意为 exactly-once,保证kafka中的每个消息都恰好只同步一份。

本篇通过Kafka2Hive这个案例,来看一下如何实现Kafka exactly-once 语义的数据同步。

1. exactly-once

首先回忆一下:《Kafka数据同步1: Flink容错机制 》

When we say “exactly-once semantics”, what we mean is that each incoming event affects the final results exactly once. Even in case of a machine or software failure, there’s no duplicate data and no data that goes unprocessed.

Flink对内部的状态通过checkpionting机制来保证 exactly-once:如果机器或软件发生故障并在重新启动时,Flink 应用程序会从最近成功完成的checkpoint恢复处理;Flink 恢复应用程序state并从ckeckpoint回滚到输入流中的相应开始位置,然后再次开始处理,而在此checkpoint之后事件生成的state不可见。这意味着 Flink 计算结果时就好像失败从未发生过一样。

但是 Flink 应用程序与各种data sinks一起运行,为了提供端到端的Exactly-once语义,仅仅保证Flink应用程序的状态符合Exactly-once语义是不够的,这些语义也适用于Flink写入的外部系统。这些外部系统必须提供一种提交或回滚写入的方法与 Flink 的检查点相协调

在分布式系统中协调提交和回滚的一种常见方法是两阶段提交协议。Flink 通过 TwoPhaseCommitSinkFunction 利用两阶段提交协议来提供端到端的精确一次语义。

设计时需要考虑的问题

  • Q1 Pre-commit阶段:如果在pre-commit阶段发生故障,任务重启后,我们将从最近的检查点重新初始化应用程序,Flink state会恢复自动到上一次检查点时的状态。但是如何rollback外部数据库的pre-commit事务, 并且何时rollback?
  • Q2 commit阶段:如果在pre-commit和commit之间发生故障,任务重启后如何保证最终提交成功?
  • Q3 Coordinator故障:如果整个Flink集群挂掉或者Coordinator(Job Manager)故障,在新的集群中重启Job,state丢失,无法使用checkpoint来恢复,这时候如何进行恢复?

2. 数据格式

2.1 record同步到hive

在配置文件中指定好 kafka topicName 到 hive TableName的映射,然后Kafka2Hive会实现将 Partation Message中的数据 shuffle 到相应时间的Bucket中。


2.2 offset保存到hdfs

为了防止Coordinator故障,每次checkpointing快照不仅持久化数据,还会持久化offset到hive。

offset持久化保存到hdsf,格式如下:

  • kafka_partition_offsets为根目录,内部有一系列时间目录,文件名为Job的启动时间
  • 时间目录内部有一系列检查点目录,文件名代表检查点的版本
  • 检查点目录下面有一系列文件,这些文件就是不同任务的快照。文件中保存每个分区消费的offset。(第一行还会保存对应的hive文件地址,用于后续宕机重启恢复, 解决Q3

2.3 文件状态

不管是record、offset,其写入的文件都分三种状态

  • .in-progress: 表示正在写入的文件,相当于事务的执行阶段。
  • .pending: 预提交状态。
  • .final: 已提交状态。只有.final才表示数据的最终状态(一致性状态),可以被读取。

2.4 checkpoint State格式

processor.svg
每次CheckPoint保存的state快照不仅包括本次checkpointing的**_ .in-progress 文件路径 和 .pending 文件路径。还保存了本次checkpoint之前的所有历史**checkpoint的 **.pending_** 文件路径。
任务重启后,checkpoint的state中不包含之后的 **_.in-progress **文件路径 和 **.pending _**文件路径, 默默丢弃,实现rollback, 解决Q1。然后定期清理垃圾文件即可。
历史checkpoint的 .pending 文件路径是为了重启后保证pre-commit最终一定执行成功,解决Q2。
如果Flink集群运行正常,仅仅进行任务重启,Q1和Q2都可以通过Flink checkpoint机制来解决。但是Q3由于系统重启或者换机器集群处理,由于checkpoint和state丢失,无法通过checkpoint机制来解决,而且发生的阶段比较随机,包含了前两个时刻,需要格外讨论。

3. 流程

过一下流程,看kafka2Hive如何运行,如何保证一致性。其中consumer group的内容参考:《Kafka数据同步2: FlinkKafkaConsumerBase 》

a. 某一时刻

从input流中读取了一个消息X, 包含partition和offset信息。

b. 数据转化

读取消息X后,processor算子会先更新对应partition的offset,然后将数据写入Hive对应的临时文件,即_tmp/{bucketPath}_tast_{tastIndex}_${task}

c. checkpoint barrier


当读到 checkpoint barrier 边界时,会触发checkpointing操作产生系统快照,供后续出现故障重启时使用。两个checkpoint barrier之间的执行操作就像一段分布式事务,采用两阶段提交协议。

d. pre-commit: snapshotState

首先第一阶段,所有算子都保存快照到state backend。只有所有的算子的snapshotState方法都执行成功,checkpoint算成功,才能获取对应的state。

Data Source

kafaConsumer将当前的offset保存到state。

Data Sink

由两部分组成:

  • hive中的 in-progress 文件停止写入,然后转化为pending文件(pending文件代表 pre-commit)并且将文件路径snapshot到state。(不止保存checkpoint的信息,还保存所有历史checkpoint的信息
  • 将内存中的offset信息持久到hdfs

如果在此阶段发生故障

  1. hive文件转化为 .pending 状态,在Hdfs offset处于 **_.in-progress _**状态或者之前状态时,系统恰好故障。重启任务或者job,相当于没发生一样,没有副作用。

  2. hive record转化为**_ .pending **状态,在Hdfs offset处于 **.pending _状态时,故障导致任务重启**

    • hive得益于checkpointing机制,不感知上一次checkpoint之后的.pending文件,相当于实现了rollback。
    • 但是Hdfs offset rollback操作目前好像没有相应的保障,没有rollback, 后续需要做额外保障。(即第三种可能)
  3. hive record转化为 .pending 状态,在Hdfs offset处于**_ .pending _状态时,故障导致job重启**, 处理方式为:

    • 先保证 .pending 状态 offset文件对应的hive文件成功转化为.final状态
    • 再重启Flink进行2-pc 流处理

todo: 补一张图

当Job重启时,发现Hdfs offset处于 .pending 状态,其实对应3种可能:

  • pre-commit阶段还未来得及告诉JobManager就宕机,说明pre-commit执行失败,重启后consumer group 默认是从上一次checkpiont开始消费,需要对hdfs offset进行rollback, 即默默丢弃offset文件, 然后开始消费。
  • 通知了JobManeger,但是commit阶段才开始, 还未完成hive commit就宕机了, 重启后由于dataSouce已经提交成功了,默认从本次checkpoint开始消费 ,需要确保commit,将hive文件和offset变为.final
  • commit阶段刚把hive commit, 但是offset未commit, 其实已经视为成功,将offset设置为 .final 即可。

但是,JobManager也刚刚重启,无法判断当前处于什么阶段。尤其前两种情况,都有 .pending hive文件 和 .pending offset文件,但是重启后开始消费的offset不同。

采取的保证措施是也不管处于什么阶段了,直接将 hdfs offset和hive文件全部变为 .final, 然后从后面启动Flink作业,初始offset为hdfs offset + 1, 而非接着上次。

e. commit: notifyCheckpointComplete

如果所有算子的snapshot方法完成, 说明各算子都已经"pre-commit"。各算子调用notifyCheckpointComplete进行后续操作:

Data Source

kafka consumer会提交offset,进入下一轮消费。

Data Sink

将hive中的消息从pending文件转化为final文件。消息之后才能被消费。

将hdfs中的消息从pending文件转化为final。


如果在此阶段发生故障

  1. hive record转化为**_ .pending **状态或者之前状态,在Hdfs offset处于 **.pending _状态或者之前状态时,故障导致任务重启**,通过initializeState方法重新commit。
  2. hive record转化为**_ .pending_** 状态,在Hdfs offset处于 _ .pending_ 状态或者之前状态时,故障导致job重启,同前面pre-commit阶段讨论的一样, 先保证 .pending 状态 offset文件对应的hive文件成功转化为.final状态,再重启2-pc。
  3. hive record转化为 _.final _状态,在Hdfs offset处于 ._pending _状态或者之前状态时,故障导致任务重启,视为已经成功提交。
  4. hive record转化为 .final 状态,在Hdfs offset处于 _.pending _状态或者之前状态时,故障导致job重启,同前面pre-commit阶段讨论的一样, 先保证 .pending 状态 offset文件对应的hive文件成功转化为.final状态,再重启2-pc。
  5. hive record转化为 **_.final **状态,在Hdfs offset处于 **.final _**状态或者之前状态时,故障导致任务重启或者job重启,无影响。

f. initializeState重启

如果任务出错重启, 重启时会调用CheckpointedFunction的initializeState方法:

  • 从状态中恢复事务信息
  • 对pre-commit事务的提交(问题2
  • 对失败的事务进行rollback。(问题1,直接无视本次checkpoint之后的操作)

Data Source

pre-commit信息就是上一次checkpoint时保存在state中的offset。从该offset之后开始消费,就完成了事务的提交。

Data Sink

pre-commit信息就是路径保存在state中的历史checkpont的所有_ .pending _文件,保证 .pending 文件成功转换为_ .final_ 状态。
hive rollback操作就是不感知将当前checkpoint之后生成的 **_.in-progress _和 _.pending **_文件,使其成为垃圾文件,定期清理。

g. job重启(极少情况)

类似2PC成功执行的前提是Coordinator正常运行,Flink的2PC执行的前提也是Job Master正常运行。但是如果整个Flink应用突然宕机或者出现其他异常,我们被迫在新的Flink应用重启job。此时state消失了,将数据恢复到一致呢?
启动时,从保存在hdfs中的最近成功checkpoint的offset信息来恢复(即所有checkpoint目录下所有的后缀都不是 .in-progress)

  • 如果第一次启动或者没有checkpoint成功过,binlog从最旧消费(startFromOffset = -1) ,其他从最新消费(startFromOffset = 1)
  • 如果同步成功过,将 .pending状态的offset对应的hive文件变为 .final,然后从上次作业最后成功的 checkpoint 手动恢复offset和partition。

参考文章

设计数据密集型应用DDIA - 中文翻译

Apache Flink 官方文档翻译之 容错机制

Apache Flink官方文档之通过状态快照实现容错处理

[Apache Flink官方文档之An Overview of End-to-End Exactly-Once Processing in Apache Flink (with Apache Kafka, too!)](