Kafka Metrics机制
Kafka Metrics机制
Kafka uses Yammer Metrics for metrics reporting in the server. The Java clients use Kafka Metrics, a built-in metrics registry that minimizes transitive dependencies pulled into client applications. Both expose metrics via JMX and can be configured to report stats using pluggable stats reporters to hook up to your monitoring system.
—— https://kafka.apache.org/documentation/#monitoring
1. Metrics收集原理
Metrics类用来管理kafka运行产生的各种埋点数据,内部管理两个关键类:Metric和Sensor.
1.1 Metric
程序运行时会产生各种数据,Metric封装了获取这些数据的细节,提供给外界使用(Facade Pattern)。每一个Metric代表一种类型的数据,一系列Metic组成所有维度的统计数据,通过Measurable方法获取具体的数据。如下图所示:
图1 Metric收集原理
- 程序运行的过程中不断产生各种数据,这些数据可以是当前的某个状态,比如访问次数总和,也可以是历史记录,比如每分钟的统计次数。这些指标数据可以是内存中的一个变量,也可以保存到数据库中。
- 需要统计的指标为一个Metric,用来供外界查看。
- 我们需要建立一个从Metric到数据的映射关系,这就通过Measurable来建立。
比如:
// 数据
int count = 0;
// metrics “api”
addMetric(metricName("count", "kafka-metrics-count", "total number of registered metrics"),
//metrics到数据的映射方式
new Measurable() {
@Override
public double measure(MetricConfig config, long now) {
return count;
}
});
Map<Long, Integer> counts1 = new HashMap<Long, Integer>();
Map<Long, Integer> counts2 = new HashMap<Long, Integer>();
addMetric(metricName("count", "kafka-metrics-count", "total number of registered metrics"),
new Measurable() {
@Override
public double measure(MetricConfig config, long now) {
return count1.get(now) + count2.get(now);
}
});
1.2 Sensor
A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set of metrics about request sizes such as the average or max.
我们可以通过 生成数据 + Metrics+Measurable 来统计任何维度的数据。Sensor其实就是一种特殊实现,帮我们实现了这样一种常见场景:采集数据,然后将之前采集的所有数据映射成各种聚合的结果,再通过Metrics来提供给外界。如下图所示:
图2 Sensor采集原理
- 首先在Sensor中注册Metric和对应的Stat。
- 在程序运行的过程中,通过 Sensor.record 采集数据。
- Sensor会将采集的数据分发给所有的Stat,同时检查其对应的所有的Metric是否超过配置中的限额。
- Stat对数据进行集成操作(如count、avg、max)
- Metric通过调用Stat的measure方法获取数据
其中的关键就是:SampledStat,可以看到继承了Measurable。即既保存数据,又提供数据到Metic的映射关系。
1.3 State
目前已有的MeasurableState有:
- Rate: 采样频率(内部为Count)
- Total: 统计总数,无时间窗口,从启动开始一直累加。
- Value: 当前值
- SimpleStat:对时间窗口内的数据进行采集聚合。创建Metrics时,可以根据MetricConfig设置窗口数、窗口单位时间、窗口采集上限。默认采用2个窗口,窗口的时间单位为30s,共一分钟,采集上限为Long.MAX_VALUE。
- Max
- Avg
- Count
- Min
重点看一下Count和Rate:
Count
Count每次采集信息时,如果发现当前窗口已经满了(超时或者达到值上限),会把第一个窗口1给清空,作为新窗口。
窗口切换的过程中可能会导致数据还未上报就已经丢失 .
Rate
Rate内部也通过Count保存数据,在返回时 value/(当前时间-窗口最早时间).窗口切换的过程中也可能会导致数据还未上报就已经丢失。但是如果分布均匀,时间同样也小了,Rate值可以降低影响。
1.4 总体结构
通过上面的内容,已经大致了解采集的具体结构。再来看一下Metrics的总体结构:
总体结构
2. 外界获取Metrics数据
可以通过多种方式将数据暴露给外部使用。如JMX、yamml等。
2.1 JMX
JmxReporter实现了MetricsReporter方法,将Metrics提供给JMX。MetricsReporter定义了一些钩子函数,会在注册Repoter、更新Metric、移除Metric时触发。
具体的原理也很简单,就是将Metric的值保存在MBean中,通过JMX Agent暴露出去:
public class JmxReporter implements MetricsReporter {
private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
//创建时将metric封装进KafkaMbean,然后注册KafkaMbean到JMX Agent(更新、移除时同理)
@Override
public void init(List<KafkaMetric> metrics) {
synchronized (LOCK) {
for (KafkaMetric metric : metrics)
addAttribute(metric);
for (KafkaMbean mbean : mbeans.values())
reregister(mbean);
}
}
// 注册KafkaMbean到JMX Agent
private void reregister(KafkaMbean mbean) {
unregister(mbean);
try {
ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, mbean.name());
} catch (JMException e) {
throw new KafkaException("Error registering mbean " + mbean.name(), e);
}
}
}
4. 采样数据丢失思考
我在offset commit消息上报的时候,明明已经触发了故障,但是却出现如下结果:一分钟内,offet commit的频率(Rate)为0.2, 但是offet commit success出现的次数为0,offet commit fail出现的次数(Count)可能为12,8,9等各种值, 显然
Count有数据丢失了。
Kafka Client Metric的大多数数据采集都是随机的,所以尽管可能会有少量数据丢失,但是不影响整体。但是像offet commit这种周期性采集的消息,可能就会出现很多问题,这里讨论一下。
为何Rate值相对更可靠,Count值不可靠?先说结论,采集和上报直接存在盲区。
由上图可以看到,由于采集和上报是异步的,且周期都是1分钟,所以会出现红色区域所示的盲区。如果极端情况下,采集刚切换窗口后立即开始上报,会有1/2的数据丢失!
所以,得出如下结论:
- Kafka Metric收集的值仅可作为出现错误或者异常的依据,不可用来当作确切的值。
- 如果要采集的周期性数据出现的频率远大于一分钟的时候,上报频率尽量不要是60s的倍数。否则可能运气不好,一直无法收集到采集数据。比如offet commit的频率一般是flink checkpoint时间,这时候尽量不要设置60的倍数。
- 正如我们前面说的,生成数据 + Metrics+Measurable 来统计任何维度的数据。如果非要收集确切值,我们也可以通过Measurable方法使采集和上报同步,即每次在上报的同时才清空数据。但是这样要注意,如果存在多个Reporter,可能收集直接会彼此干扰。