kafka的监控和告警

其实对于大多数用kafka的人来说,一般都会选择两个开源的工具:KafkaOffsetMonitorkafka-web-console,这两款我都有用过,而且各有优缺点。   KafkaOffsetMonitor:最大的好处就是配置简单,只需要配个zookeeper的地址就能用了,坑爹的地方就是不能自动刷新,手动刷新时耗时较长,而且有时候都刷不出来,另外就是图像用了一段时间就完全显示不了了,不知道大家是不是这样。   kafka-web-console:相比与前者,数据是落地的,因此刷新较快,而且支持在前端自定义zookeeper的地址,还能列出实时的topic里的具体内容。但是搭建比较复杂,而且github上的默认数据库是H2的,像我们一般用mysql的,还得自己转化。另外在用的过程中,我遇到一个问题,在连接kafka的leader失败的时候,会一直重试,其结果就是导致我kafka的那台机子连接数过高,都到2w了,不知道是不是它的一个bug。   而且我们还得关心其他指标吧,http://kafka.apache.org/documentation.html  里的momitor部分不是列出了那么多监控项么,迫不得已,我得靠自己去另辟新法,我现在的做法是用ganglia来做监控。 哈哈,github上一搜,有戏,https://github.com/criteo/kafka-ganglia,只需要在server.properties里添加几个配置项就解决问题了,结果反复试验都没有成功,一看都一年没更新了,估计是版本问题吧,也懒得管他了。   当然还有个比较傻一些的办法,用CSVMetricsReporter,在配置文件中开启之后,就会把相应的指标分别写入到csv文件中,然后再用脚本去采集即可,这个的确是可行的,但是对资源的消耗比较大, 等等,不是还有这个嘛https://github.com/adambarthelson/kafka-ganglia  ,用JMXTrans来做,修改kafka配置,将其jmx端口暴露出来,然后用JMXTrans把数据发到ganglia,你的JMXTrans的配置文件可以是这样:  
{
  "servers": [
    {
      "port": "9999",
      "host": "xxxxxx",
      "queries": [
        {
          "outputWriters": [
            {
              "@class": "com.googlecode.jmxtrans.model.output.GangliaWriter",
              "settings": {
                "groupName": "jvmheapmemory",
                "port": 8649,
                "host": "xxxxx"
              }
            }
          ],
          "obj": "java.lang:type=Memory",
          "resultAlias": "heap",
          "attr": [
            "HeapMemoryUsage",
            "NonHeapMemoryUsage"
          ]
        },
................#[size=16]#其他配置[/size]
]
}
这里可以把http://kafka.apache.org/documentation.html  里列出来的mbean都加入进来,而且jmxtrans还支持GraphiteWriter,这样数据就落地了,你再想怎么处理就很easy啦。 若是你仅仅是想监控lag和logsize这些指标,亦如KafkaOffsetMonitor中展示的那样,这里提供两个方法:

用bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test就能列出你想要的,你可以写个脚本去定时获取lag,再制定出大于多少就发告警邮件啊什么的。  

用命令行的方式总感觉不像脚本该做的,有没有client可以去获取呢,答案是有,我就把我用KafkaClient和KazooClient获取lag的脚本贡献给大家吧:
#!/usr/local/bin/python
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kazoo.client import KazooClient

# Zookeepers - no need to add ports
zookeepers="localhost"

# Kafka broker
kafka="localhost:9092"

#consumer group
group="test"

if __name__ == '__main__':
    broker = KafkaClient(kafka)  
    lags = {}
    zk = KazooClient(hosts=zookeepers, read_only=True)  #zookeeper客户端,read_only确保不会对zookeeper更改
    zk.start()
    logsize=0
    topics = zk.get_children("/consumers/%s/owners" %(group))
    for topic in topics:
        logsize =0 
        consumer = SimpleConsumer(broker, group, str(topic))
        latest_offset = consumer.pending()
        partitions = zk.get_children("/consumers/%s/offsets/%s" %(group, topic))
        for partition in partitions:
            log = "/consumers/%s/offsets/%s/%s" % (group, topic, partition)
            if zk.exists(log):
                data, stat = zk.get(log)
                logsize += int(data)
        lag = latest_offset - logsize
        lags[topic] = lag
    zk.stop()
上面的lags就是一个当前topic的lag的字典咯,其实大体的逻辑就是通过SimpleConsumer获取到当前的offset,再由KazooClient对zookeeper层层剥皮,获取topic和partition的信息,得到每个partition的logsize后累加与offset比较,就能有lag信息了,之后你想干嘛干嘛了,比如发报警邮件等等。当然也可以像KafkaOffsetMonitor那样做自己的展示了。

分享阅读原文:http://www.opscoder.info/kafka_monitor.html

0 个评论

要回复文章请先登录注册