Elasticsearch内存详解

空心菜 发表了文章 3 个评论 12598 次浏览 2016-01-12 01:06 来自相关话题

“该给ES分配多少内存?”  “JVM参数如何优化?“ “为何我的Heap占用这么高?” “为何经常有某个field的数据量超出内存限制的异常?“ “为何感觉上没多少数据,也会经常Out Of Memory?” ...查看全部
“该给ES分配多少内存?” 
“JVM参数如何优化?“
“为何我的Heap占用这么高?”
“为何经常有某个field的数据量超出内存限制的异常?“
“为何感觉上没多少数据,也会经常Out Of Memory?”

以上问题,显然没有一个统一的数学公式能够给出答案。 和数据库类似,ES对于内存的消耗,和很多因素相关,诸如数据总量、mapping设置、查询方式、查询频度等等。默认的设置虽开箱即用,但不能适用每一种使用场景。作为ES的开发、运维人员,如果不了解ES对内存使用的一些基本原理,就很难针对特有的应用场景,有效的测试、规划和管理集群,从而踩到各种坑,被各种问题挫败。

要理解ES如何使用内存,先要尊重下面两个基本事实:
  1.  ES是JAVA应用
  2.  底层存储引擎是基于Lucene的

看似很普通是吗?但其实没多少人真正理解这意味着什么。 

首先,作为一个JAVA应用,就脱离不开JVM和GC。很多人上手ES的时候,对GC一点概念都没有就去网上抄各种JVM“优化”参数,却仍然被heap不够用,内存溢出这样的问题搞得焦头烂额。了解JVM GC的概念和基本工作机制是很有必要的,本文不在此做过多探讨,读者可以自行Google相关资料进行学习。如何知道ES heap是否真的有压力了? 推荐阅读这篇博客:Understanding Memory Pressure Indicator。 即使对于JVM GC机制不够熟悉,头脑里还是需要有这么一个基本概念: 应用层面生成大量长生命周期的对象,是给heap造成压力的主要原因,例如读取一大片数据在内存中进行排序,或者在heap内部建cache缓存大量数据。如果GC释放的空间有限,而应用层面持续大量申请新对象,GC频度就开始上升,同时会消耗掉很多CPU时间。严重时可能恶性循环,导致整个集群停工。因此在使用ES的过程中,要知道哪些设置和操作容易造成以上问题,有针对性的予以规避。

其次,Lucene的倒排索引(Inverted Index)是先在内存里生成,然后定期以段文件(segment file)的形式刷到磁盘的。每个段实际就是一个完整的倒排索引,并且一旦写到磁盘上就不会做修改。 API层面的文档更新和删除实际上是增量写入的一种特殊文档,会保存在新的段里。不变的段文件易于被操作系统cache,热数据几乎等效于内存访问。 
 
基于以上2个基本事实,我们不难理解,为何官方建议的heap size不要超过系统可用内存的一半。heap以外的内存并不会被浪费,操作系统会很开心的利用他们来cache被用读取过的段文件。

Heap分配多少合适?遵从官方建议就没错。 不要超过系统可用内存的一半,并且不要超过32GB。JVM参数呢?
对于初级用户来说,并不需要做特别调整,仍然遵从官方的建议,将xms和xmx设置成和heap一样大小,避免动态分配heap size就好了。虽然有针对性的调整JVM参数可以带来些许GC效率的提升,当有一些“坏”用例的时候,这些调整并不会有什么魔法效果帮你减轻heap压力,甚至可能让问题更糟糕。

那么,ES的heap是如何被瓜分掉的? 说几个我知道的内存消耗大户并分别做解读:
    [] segment memory[/][]  filter cache[/][]  field data cache[/][]  bulk queue[/][]  indexing buffer[/][]  state buffer[/][]  超大搜索聚合结果集的fetch[/]


Segment Memory


Segment不是file吗?segment memory又是什么?前面提到过,一个segment是一个完备的lucene倒排索引,而倒排索引是通过词典 (Term Dictionary)到文档列表(Postings List)的映射关系,快速做查询的。 由于词典的size会很大,全部装载到heap里不现实,因此Lucene为词典做了一层前缀索引(Term Index),这个索引在Lucene4.0以后采用的数据结构是FST (Finite State Transducer)。 这种数据结构占用空间很小,Lucene打开索引的时候将其全量装载到内存中,加快磁盘上词典查询速度的同时减少随机磁盘访问次数。
 
下面是词典索引和词典主存储之间的一个对应关系图:
esmem.png

Lucene  file的完整数据结构参见Apache Lucene - Index File Formats
 
说了这么多,要传达的一个意思就是,ES的data node存储数据并非只是耗费磁盘空间的,为了加速数据的访问,每个segment都有会一些索引数据驻留在heap里。因此segment越多,瓜分掉的heap也越多,并且这部分heap是无法被GC掉的! 理解这点对于监控和管理集群容量很重要,当一个node的segment memory占用过多的时候,就需要考虑删除、归档数据,或者扩容了。

怎么知道segment memory占用情况呢?  CAT API可以给出答案。
  1.  查看一个索引所有segment的memory占用情况:
esin1.png

 
  1.  查看一个node上所有segment占用的memory总和:
esin2.png

那么有哪些途径减少data node上的segment memory占用呢? 总结起来有三种方法:
  1.  删除不用的索引
  2.  关闭索引 (文件仍然存在于磁盘,只是释放掉内存)。需要的时候可以重新打开。
  3.  定期对不再更新的索引做optimize (ES2.0以后更改为force merge api)。这Optimze的实质是对segment file强制做合并,可以节省大量的segment memory。


Filter Cache


Filter cache是用来缓存使用过的filter的结果集的,需要注意的是这个缓存也是常驻heap,无法GC的。我的经验是默认的10% heap设置工作得够好了,如果实际使用中heap没什么压力的情况下,才考虑加大这个设置。


Field Data cache


在有大量排序、数据聚合的应用场景,可以说field data cache是性能和稳定性的杀手。 对搜索结果做排序或者聚合操作,需要将倒排索引里的数据进行解析,然后进行一次倒排。 这个过程非常耗费时间,因此ES 2.0以前的版本主要依赖这个cache缓存已经计算过的数据,提升性能。但是由于heap空间有限,当遇到用户对海量数据做计算的时候,就很容易导致heap吃紧,集群频繁GC,根本无法完成计算过程。 ES2.0以后,正式默认启用Doc Values特性(1.x需要手动更改mapping开启),将field data在indexing time构建在磁盘上,经过一系列优化,可以达到比之前采用field data cache机制更好的性能。因此需要限制对field data cache的使用,最好是完全不用,可以极大释放heap压力。 需要注意的是,很多同学已经升级到ES2.0,或者1.0里已经设置mapping启用了doc values,在kibana里仍然会遇到问题。 这里一个陷阱就在于kibana的table panel可以对所有字段排序。 设想如果有一个字段是analyzed过的,而用户去点击对应字段的排序表头是什么后果? 一来排序的结果并不是用户想要的,排序的对象实际是词典; 二来analyzed过的字段无法利用doc values,需要装载到field data cache,数据量很大的情况下可能集群就在忙着GC或者根本出不来结果。


Bulk Queue


一般来说,Bulk queue不会消耗很多的heap,但是见过一些用户为了提高bulk的速度,客户端设置了很大的并发量,并且将bulk Queue设置到不可思议的大,比如好几千。 Bulk Queue是做什么用的?当所有的bulk thread都在忙,无法响应新的bulk request的时候,将request在内存里排列起来,然后慢慢清掉。 这在应对短暂的请求爆发的时候有用,但是如果集群本身索引速度一直跟不上,设置的好几千的queue都满了会是什么状况呢? 取决于一个bulk的数据量大小,乘上queue的大小,heap很有可能就不够用,内存溢出了。一般来说官方默认的thread pool设置已经能很好的工作了,建议不要随意去“调优”相关的设置,很多时候都是适得其反的效果。


Indexing Buffer


Indexing Buffer是用来缓存新数据,当其满了或者refresh/flush interval到了,就会以segment file的形式写入到磁盘。 这个参数的默认值是10% heap size。根据经验,这个默认值也能够很好的工作,应对很大的索引吞吐量。 但有些用户认为这个buffer越大吞吐量越高,因此见过有用户将其设置为40%的。到了极端的情况,写入速度很高的时候,40%都被占用,导致OOM。


Cluster State Buffer


ES被设计成每个node都可以响应用户的api请求,因此每个node的内存里都包含有一份集群状态的拷贝。这个cluster state包含诸如集群有多少个node,多少个index,每个index的mapping是什么?有少shard,每个shard的分配情况等等 (ES有各类stats api获取这类数据)。 在一个规模很大的集群,这个状态信息可能会非常大的,耗用的内存空间就不可忽视了。并且在ES2.0之前的版本,state的更新是由master node做完以后全量散播到其他结点的。 频繁的状态更新都有可能给heap带来压力。 在超大规模集群的情况下,可以考虑分集群并通过tribe node连接做到对用户api的透明,这样可以保证每个集群里的state信息不会膨胀得过大。


超大搜索聚合结果集的fetch


ES是分布式搜索引擎,搜索和聚合计算除了在各个data node并行计算以外,还需要将结果返回给汇总节点进行汇总和排序后再返回。无论是搜索,还是聚合,如果返回结果的size设置过大,都会给heap造成很大的压力,特别是数据汇聚节点。超大的size多数情况下都是用户用例不对,比如本来是想计算cardinality,却用了terms aggregation + size:0这样的方式; 对大结果集做深度分页;一次性拉取全量数据等等。


小结


  1.  倒排词典的索引需要常驻内存,无法GC,需要监控data node上segment memory增长趋势。
  2.  各类缓存,field cache, filter cache, indexing cache, bulk queue等等,要设置合理的大小,并且要应该根据最坏的情况来看heap是否够用,也就是各类缓存全部占满的时候,还有heap空间可以分配给其他任务吗?避免采用clear cache等“自欺欺人”的方式来释放内存。
  3.  避免返回大量结果集的搜索与聚合。缺失需要大量拉取数据可以采用scan & scroll api来实现。
  4.  cluster stats驻留内存并无法水平扩展,超大规模集群可以考虑分拆成多个集群通过tribe node连接。
  5.  想知道heap够不够,必须结合实际应用场景,并对集群的heap使用情况做持续的监控。


分享阅读原文:http://elasticsearch.cn/article/32


Elasticsearch常见几个问题解决

空心菜 发表了文章 0 个评论 7291 次浏览 2016-01-03 03:13 来自相关话题

ElasticSearch常见的几个问题如下: []大量unassigned shards[/][]status: red[/][]fix unassigned shards[/][]"Too many open files"[/] 按 ...查看全部
ElasticSearch常见的几个问题如下:
    []大量unassigned shards[/][]status: red[/][]fix unassigned shards[/][]"Too many open files"[/]

按照惯例查看统计报表, 才发现es集群悲剧了......昨天下午到今天早上, 持续报错, 写了1G的错误日志>_<#(暂无监控....)
当前状态: 单台机器, 单节点(空集群), 200W 数据, 500+shrads, 约3G大小
以下几个不同问题的处理过程


大量unassigned shards


其实ES集群刚搭完运行时状态status: yellow(所有主分片可用,但存在不可用的从分片), 只有一个节点, 主分片启动并运行正常, 可以成功处理请求, 但是存在unassigned_shards, 即存在没有被分配到节点的从分片.(只有一个节点.....)
当时数据量小,就没有太在意. 然后, 随着时间推移, 出现了大量unassigned shards
curl -XGET http://localhost:9200/_cluster/health\?pretty
{
"cluster_name" : "elasticsearch",
"status" : "yellow",
"timed_out" : false,
"number_of_nodes" : 2,
"number_of_data_nodes" : 1,
"active_primary_shards" : 538,
"active_shards" : 538,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 558,
"number_of_pending_tasks" : 0
}
处理方式:
找了台内网机器, 部署另一个节点(保证cluster.name一致即可, 自动发现, 赞一个). 当然, 如果你资源有限只有一台机器, 使用相同命令再启动一个es实例(不过实例的端口得修改)也行. 再次检查集群健康, 发现unassigned_shards减少, active_shards增多. 

操作完后, 集群健康从yellow恢复到green


status: red


集群健康恶化了......
这次检查发现是status: red(存在不可用的主要分片)
curl -XGET http://localhost:9200/_cluster/health\?pretty
{
"cluster_name" : "elasticsearch",
"status" : "red", // missing some primary shards
"timed_out" : false,
"number_of_nodes" : 4,
"number_of_data_nodes" : 2,
"active_primary_shards" : 538,
"active_shards" : 1076,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 20, // where your lost primary shards are.
"number_of_pending_tasks" : 0
}


fix unassigned shards


开始着手修复,查看所有分片状态
curl -XGET http://localhost:9200/_cat/shards
找出UNASSIGNED分片
curl -s "http://localhost:9200/_cat/shards" | grep UNASSIGNED
pv-2015.05.22 3 p UNASSIGNED
pv-2015.05.22 3 r UNASSIGNED
pv-2015.05.22 1 p UNASSIGNED
pv-2015.05.22 1 r UNASSIGNED
查询得到master节点的唯一标识
curl 'localhost:9200/_nodes/process?pretty'

{
"cluster_name" : "elasticsearch",
"nodes" : {
"AfUyuXmGTESHXpwi4OExxx" : {
"name" : "Master",
....
"attributes" : {
"master" : "true"
},
.....
执行reroute(分多次, 变更shard的值为UNASSIGNED查询结果中编号, 上一步查询结果是1和3)
curl -XPOST 'localhost:9200/_cluster/reroute' -d '{
"commands" : [ {
"allocate" : {
"index" : "pv-2015.05.22",
"shard" : 1,
"node" : "AfUyuXmGTESHXpwi4OExxx",
"allow_primary" : true
}
}
]
}'
批量处理的脚本(当数量很多的话, 注意替换node的名字)
#!/bin/bash

for index in $(curl -s 'http://localhost:9200/_cat/shards' | grep UNASSIGNED | awk '{print $1}' | sort | uniq); do
for shard in $(curl -s 'http://localhost:9200/_cat/shards' | grep UNASSIGNED | grep $index | awk '{print $2}' | sort | uniq); do
echo $index $shard

curl -XPOST 'localhost:9200/_cluster/reroute' -d "{
'commands' : [ {
'allocate' : {
'index' : $index,
'shard' : $shard,
'node' : 'Master',
'allow_primary' : true
}
}
]
}"

sleep 5
done
done


“Too many open files”


发现日志中大量出现这个错误,执行
curl http://localhost:9200/_nodes/process\?pretty
可以看到
"max_file_descriptors" : 4096,
官方文档中


Make sure to increase the number of open files descriptors on the machine (or for the user running elasticsearch). Setting it to 32k or even 64k is recommended.


而此时, 可以在系统级做修改, 然后全局生效,最简单的做法, 在bin/elasticsearch文件开始的位置加入
ulimit -n 64000
然后重启es, 再次查询看到
"max_file_descriptors" : 64000,
问题解决.
分享阅读总结原文

怎么查看kafka的版本号

OpenSkill 回复了问题 2 人关注 1 个回复 14515 次浏览 2016-10-26 21:37 来自相关话题

利用Python进行Elasticsearch数据索引

Ansible 发表了文章 0 个评论 5866 次浏览 2015-12-24 00:56 来自相关话题

Elasticsearch是一个分布式、Restful的搜索及分析服务器,和Apache Solr一样,它也是基于Lucence的索引服务器,而Elasticsearch对比Solr的优点在于: []轻量级:安装启动方便,下载文件之后一条命令就可以 ...查看全部
Elasticsearch是一个分布式、Restful的搜索及分析服务器,和Apache Solr一样,它也是基于Lucence的索引服务器,而Elasticsearch对比Solr的优点在于:
    []轻量级:安装启动方便,下载文件之后一条命令就可以启动;[/][]Schema free:可以向服务器提交任意结构的JSON对象,Solr中使用schema.xml指定了索引结构;[/][]多索引文件支持:使用不同的index参数就能创建另一个索引文件,Solr中需要另行配置;[/][]分布式:Solr Cloud的配置比较复杂。[/]
 

环境配置

启动Elasticsearch,访问端口在9200,通过浏览器可以查看到返回的JSON数据,Elasticsearch提交和返回的数据格式都是JSON.
>> bin/elasticsearch -f
 安装官方提供的Python API
>> pip install elasticsearch

索引操作

对于单条索引,可以调用create或index方法。
from datetime import datetimefrom elasticsearch import Elasticsearches = Elasticsearch() #create a localhost server connection, or Elasticsearch("ip")es.create(index="test-index", doc_type="test-type", id=1,    body={"any":"data", "timestamp": datetime.now()})
 Elasticsearch批量索引的命令是bulk,利用Python API提交
from datetime import datetimefrom elasticsearch import Elasticsearchfrom elasticsearch import helperses = Elasticsearch("10.18.13.3")j = 0count = int(df[0].count())actions = while (j < count):     action = {               "_index": "tickets-index",               "_type": "tickets",               "_id": j + 1,               "_source": {                           "crawaldate":df[0][j],                           "flight":df[1][j],                           "price":float(df[2][j]),                           "discount":float(df[3][j]),                           "date":df[4][j],                           "takeoff":df[5][j],                           "land":df[6][j],                           "source":df[7][j],                           "timestamp": datetime.now()}               }    actions.append(action)    j += 1    if (len(actions) == 500000):        helpers.bulk(es, actions)        del actions[0:len(actions)]if (len(actions) > 0):    helpers.bulk(es, actions)    del actions[0:len(actions)]
 实践过程中发现Python API序列化JSON时对数据类型支撑比较有限,原始数据使用的NumPy.Int32必须转换为int才能索引。此外,现在的bulk操作默认是每次提交500条数据,我修改为5000甚至50000进行测试,会有索引不成功的情况。
#helpers.py source codedef streaming_bulk(client, actions, chunk_size=500, raise_on_error=False,        expand_action_callback=expand_action, **kwargs):    actions = map(expand_action_callback, actions)    # if raise on error is set, we need to collect errors per chunk before raising them    errors =     while True:        chunk = islice(actions, chunk_size)        bulk_actions =         for action, data in chunk:            bulk_actions.append(action)            if data is not None:                bulk_actions.append(data)        if not bulk_actions:            returndef bulk(client, actions, stats_only=False, **kwargs):    success, failed = 0, 0    # list of errors to be collected is not stats_only    errors =     for ok, item in streaming_bulk(client, actions, **kwargs):        # go through request-reponse pairs and detect failures        if not ok:            if not stats_only:                errors.append(item)            failed += 1        else:            success += 1    return success, failed if stats_only else errors
 对于索引的批量删除和更新操作,对应的文档格式如下,更新文档中的doc节点是必须的。
{    '_op_type': 'delete',    '_index': 'index-name',    '_type': 'document',    '_id': 42,}{    '_op_type': 'update',    '_index': 'index-name',    '_type': 'document',    '_id': 42,    'doc': {'question': 'The life, universe and everything.'}}

常见错误

    []SerializationError:JSON数据序列化出错,通常是因为不支持某个节点值的数据类型[/][]RequestError:提交数据格式不正确[/][]ConflictError:索引ID冲突[/][]TransportError:连接无法建立[/]

 


性能 


perf.png
上面是使用MongoDB和Elasticsearch存储相同数据的对比,虽然服务器和操作方式都不完全相同,但还是可以看出批量写入数据,单纯的数据库还是比索引服务器更具备优势。
Elasticsearch的索引文件是自动分块,达到千万级数据对写入速度也没有影响。但在达到磁盘空间上限时,Elasticsearch出现了文件合并错误,并且大量丢失数据(共丢了100多万条),停止客户端写入后,服务器也无法自动恢复,必须手动停止。在生产环境中这点比较致命,尤其是使用非Java客户端,似乎无法在客户端获取到服务端的Java异常,这使得程序员必须很小心地处理服务端的返回信息。

Kafka“Failed to send messages after 3 tries”

回复

Rock 回复了问题 1 人关注 1 个回复 12534 次浏览 2015-12-23 01:08 来自相关话题

Hbase HRegionServer 启动失败

koyo 回复了问题 2 人关注 2 个回复 5676 次浏览 2015-12-15 14:33 来自相关话题

如何让docker以daemon方式永久运行/bin/bash

Rock 回复了问题 2 人关注 3 个回复 6627 次浏览 2015-12-03 21:41 来自相关话题

怎么使用docker命令下载image?

Rock 回复了问题 2 人关注 1 个回复 5109 次浏览 2015-12-03 20:58 来自相关话题

kafka删除topic分析

Ansible 发表了文章 0 个评论 4146 次浏览 2015-12-02 17:24 来自相关话题

官方kafka中提供了创建和查看topic的命令: 从0.8的官方文档提供了一个删除topic的命令: kafka-topics.sh --delete[size=13]  ...查看全部
官方kafka中提供了创建和查看topic的命令:
seekafka.png

从0.8的官方文档提供了一个删除topic的命令:
kafka-topics.sh --delete[size=13] [/size]

但是在运行时会报错找不到这个方法。
kafka-topics.sh最终是运行了kafka.admin.TopicCommand这个类,在0.8的源码中这个类中没有找到有delete topic相关的代码。
 
在kafka的admin包下,提供了一个DeleteTopicCommand的类,可以实现删除topic的功能。 
kafka.admin.DeleteTopicCommand 
其中删除topic的具体实现代码如下:
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
.......
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) //其实最终还是通过删除zk里面对应的路径来实现删除topic的功能
println("deletion succeeded!")
}
catch {
case e: Throwable =>
println("delection failed because of " + e.getMessage)
println(Utils.stackTrace(e))
}
finally {
if (zkClient != null)
zkClient.close()
}
因为这个命令只会删除zk里面的信息,真实的数据还是没有删除,所以需要登录各个broker,把对应的topic的分区数据目录删除,也可能正因为这一点,delete命令才没有集成到kafka.admin.TopicCommand这个类。
 
Kafka删除的操作,首先,在zookeeper的/admin/delete_topic中添加要删除的topic,此时并不真实删除broker中的数据。

如果Kafka的配置中配置了delete.topic.enable=true,broker检查到zookeeper中的delete_topic中有要删除的项时,才会真实的去删除topic数据。否则(delete.topic.enable=false),topic数据不会被删除。

kafka的监控和告警

Ansible 发表了文章 0 个评论 4787 次浏览 2015-12-01 23:09 来自相关话题

其实对于大多数用kafka的人来说,一般都会选择两个开源的工具:KafkaOffsetMonitor和kafka-web-console,这两款我都有用过,而且各有优缺点。   KafkaOffsetMonitor:最大的好处就是配置简 ...查看全部
其实对于大多数用kafka的人来说,一般都会选择两个开源的工具:KafkaOffsetMonitorkafka-web-console,这两款我都有用过,而且各有优缺点。
 
KafkaOffsetMonitor:最大的好处就是配置简单,只需要配个zookeeper的地址就能用了,坑爹的地方就是不能自动刷新,手动刷新时耗时较长,而且有时候都刷不出来,另外就是图像用了一段时间就完全显示不了了,不知道大家是不是这样。
 
kafka-web-console:相比与前者,数据是落地的,因此刷新较快,而且支持在前端自定义zookeeper的地址,还能列出实时的topic里的具体内容。但是搭建比较复杂,而且github上的默认数据库是H2的,像我们一般用mysql的,还得自己转化。另外在用的过程中,我遇到一个问题,在连接kafka的leader失败的时候,会一直重试,其结果就是导致我kafka的那台机子连接数过高,都到2w了,不知道是不是它的一个bug。
 
而且我们还得关心其他指标吧,http://kafka.apache.org/documentation.html  里的momitor部分不是列出了那么多监控项么,迫不得已,我得靠自己去另辟新法,我现在的做法是用ganglia来做监控。 哈哈,github上一搜,有戏,https://github.com/criteo/kafka-ganglia,只需要在server.properties里添加几个配置项就解决问题了,结果反复试验都没有成功,一看都一年没更新了,估计是版本问题吧,也懒得管他了。
 
当然还有个比较傻一些的办法,用CSVMetricsReporter,在配置文件中开启之后,就会把相应的指标分别写入到csv文件中,然后再用脚本去采集即可,这个的确是可行的,但是对资源的消耗比较大, 等等,不是还有这个嘛https://github.com/adambarthelson/kafka-ganglia  ,用JMXTrans来做,修改kafka配置,将其jmx端口暴露出来,然后用JMXTrans把数据发到ganglia,你的JMXTrans的配置文件可以是这样:
 
{
"servers": [
{
"port": "9999",
"host": "xxxxxx",
"queries": [
{
"outputWriters": [
{
"@class": "com.googlecode.jmxtrans.model.output.GangliaWriter",
"settings": {
"groupName": "jvmheapmemory",
"port": 8649,
"host": "xxxxx"
}
}
],
"obj": "java.lang:type=Memory",
"resultAlias": "heap",
"attr": [
"HeapMemoryUsage",
"NonHeapMemoryUsage"
]
},
................#[size=16]#其他配置[/size]
]
}
这里可以把http://kafka.apache.org/documentation.html  里列出来的mbean都加入进来,而且jmxtrans还支持GraphiteWriter,这样数据就落地了,你再想怎么处理就很easy啦。

若是你仅仅是想监控lag和logsize这些指标,亦如KafkaOffsetMonitor中展示的那样,这里提供两个方法:




用bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test就能列出你想要的,你可以写个脚本去定时获取lag,再制定出大于多少就发告警邮件啊什么的。
 




用命令行的方式总感觉不像脚本该做的,有没有client可以去获取呢,答案是有,我就把我用KafkaClient和KazooClient获取lag的脚本贡献给大家吧:
#!/usr/local/bin/python
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kazoo.client import KazooClient

# Zookeepers - no need to add ports
zookeepers="localhost"

# Kafka broker
kafka="localhost:9092"

#consumer group
group="test"

if __name__ == '__main__':
broker = KafkaClient(kafka)
lags = {}
zk = KazooClient(hosts=zookeepers, read_only=True) #zookeeper客户端,read_only确保不会对zookeeper更改
zk.start()
logsize=0
topics = zk.get_children("/consumers/%s/owners" %(group))
for topic in topics:
logsize =0
consumer = SimpleConsumer(broker, group, str(topic))
latest_offset = consumer.pending()
partitions = zk.get_children("/consumers/%s/offsets/%s" %(group, topic))
for partition in partitions:
log = "/consumers/%s/offsets/%s/%s" % (group, topic, partition)
if zk.exists(log):
data, stat = zk.get(log)
logsize += int(data)
lag = latest_offset - logsize
lags[topic] = lag
zk.stop()
上面的lags就是一个当前topic的lag的字典咯,其实大体的逻辑就是通过SimpleConsumer获取到当前的offset,再由KazooClient对zookeeper层层剥皮,获取topic和partition的信息,得到每个partition的logsize后累加与offset比较,就能有lag信息了,之后你想干嘛干嘛了,比如发报警邮件等等。当然也可以像KafkaOffsetMonitor那样做自己的展示了。


分享阅读原文:http://www.opscoder.info/kafka_monitor.html