Elasticsearch聚合限制内存使用

push 发表了文章 0 个评论 5541 次浏览 2016-09-14 15:05 来自相关话题

限制内存使用 通常为了让聚合(或者任何需要访问字段值的请求)能够快点,访问fielddata一定会快点, 这就是为什么加载到内存的原因。但是加载太多的数据到内存会导致垃圾回收(gc)缓慢, 因为JVM试着发现堆里面的额外空间,甚至导致 ...查看全部


限制内存使用


通常为了让聚合(或者任何需要访问字段值的请求)能够快点,访问fielddata一定会快点, 这就是为什么加载到内存的原因。但是加载太多的数据到内存会导致垃圾回收(gc)缓慢, 因为JVM试着发现堆里面的额外空间,甚至导致OutOfMemory异常。
 
最让你吃惊的是,你会发现Elaticsearch不是只把符合你的查询的值加载到fielddata. 而是把index里的所document都加载到内存,甚至是不同的 _type 的document。
 
逻辑是这样的,如果你在这个查询需要访问documents X,Y和Z, 你可能在下一次查询就需要访问别documents。而一次把所有的值都加载并保存在内存 , 比每次查询都去扫描倒排索引要更方便。
 
JVM堆是一个有限制的资源需要聪明的使用。有许多现成的机制去限制fielddata对堆内存使用的影响。这些限制非常重要,因为滥用堆将会导致节点的不稳定(多亏缓慢的垃圾回收)或者甚至节点死亡(因为OutOfMemory异常);但是垃圾回收时间过长,在垃圾回收期间,ES节点的性能就会大打折扣,查询就会非常缓慢,直到最后超时。
 
如何设置堆大小
对于环境变量 $ES_HEAP_SIZE 在设置Elasticsearch堆大小的时候有2个法则可以运用:
  • 不超过RAM的50%
Lucene很好的利用了文件系统cache,文件系统cache是由内核管理的。如果没有足够的文件系统cache空间,性能就会变差。
  • 不超过32G
如果堆小于32GB,JVM能够使用压缩的指针,这会节省许多内存:每个指针就会使用4字节而不是8字节。 把对内存从32GB增加到34GB将意味着你将有更少的内存可用,因为所有的指针占用了双倍的空间。同样,更大的堆,垃圾回收变得代价更大并且可能导致节点不稳定;这个限制主要是大内存对fielddata影响比较大。

Fielddata大小

参数 indices.fielddata.cache.size 控制有多少堆内存是分配给fielddata。当你执行一个查询需要访问新的字段值的时候,将会把值加载到内存,然后试着把它们加入到fielddata。如果结果的fielddata大小超过指定的大小 ,为了腾出空间,别的值就会被驱逐出去。 默认情况下,这个参数设置的是无限制 — Elasticsearch将永远不会把数据从fielddata里替换出去。 这个默认值是故意选择的:fielddata不是临时的cache。它是一个在内存里为了快速执行必须能被访问的数据结构,而且构建它代价非常昂贵。如果你每个请求都要重新加载数据,性能就会很差。 一个有限的大小强迫数据结构去替换数据。我们将看看什么时候去设置下面的值,首先让我们看一个警告: 【warning】这个设置是一个保护措施,而不是一个内存不足的解决方案
break.png
 如果你没有足够的内存区保存你的fielddata到内存里,Elasticsearch将会经常性的从磁盘重新加载数据,并且驱逐别的数据区腾出空间。这种数据的驱逐会导致严重的磁盘I/O,并且在内存里产生大量的垃圾,这个会在后面被垃圾回收。 假设你在索引日志,每天使用给一个新的索引。通常情况下你只会对过去1天或者2天的数据感兴趣。即使你把老的索引数据保留着,你也很少查询它们。尽管如此,使用默认的设置, 来自老索引的fielddata也不会被清除出去!fielddata会一直增长直到它触发fielddata circuit breaker --参考断路器 --它将阻止你继续加载fielddata。 在那个时候你被卡住了。即使你仍然能够执行访问老的索引里的fielddata的查询, 你再也不能加载任何新的值了。相反,我们应该把老的值清除出去给新的值腾出空间。 为了防止这种情景,通过在 config/elasticsearch.yml 文件里加上如下的配置给fielddata 设置一个上限:
indices.fielddata.cache.size:  40%
当然可以设置成堆大小的百分比,也可以是一个具体的值,比如 8gb;通过适当的设置这个值,最近被访问的fielddata将被清除出去,给新加载的数据腾出空间。 在网上你可能会看到另外一个设置参数: indices.fielddata.cache.expire 。千万不要使用这个设置!这个设置高版本已经废弃。这个设置告诉Elasticsearch把比过期时间老的数据从fielddata里驱逐出去,而不管这个值是否被用到。这对性能是非常可怕的 。驱逐数据是有代价的,并且这个有目的的高效的安排驱逐数据并没有任何真正的收获。没有任何理由去使用这个设置;我们一点也不能从理论上制造一个假设的有用的情景。现阶段存 在只是为了向后兼容。我们在这个书里提到这个设置是因为这个设置曾经在网络上的各种文章里 被作为一个 ``性能小窍门'' 被推荐过。记住永远不要使用它,就ok!

监控fielddata

监控fielddata使用了多少内存以及是否有数据被驱逐是非常重要的。大量的数据被驱逐会导致严重的资源问题以及不好的性能。 Fielddata使用可以通过下面的方式来监控:
  • 对于单个索引使用 {ref}indices-stats.html[indices-stats API]:
GET /_stats/fielddata?fields=*
  • 对于单个节点使用 {ref}cluster-nodes-stats.html[nodes-stats API]:
GET /_nodes/stats/indices/fielddata?fields=*
  • 或者甚至单个节点单个索引

GET /_nodes/stats/indices/fielddata?level=indices&fields=*
通过设置 ?fields=* 内存使用按照每个字段分解了.


断路器(breaker)


聪明的读者可能已经注意到fielddata大小设置的一个问题。fielddata的大小是在数据被加载之后才校验的。如果一个查询尝试加载到fielddata的数据比可用的内存大会发生什么情况?答案是不客观的:你将会获得一个OutOfMemory异常。
 
Elasticsearch包含了一个 fielddata断路器 ,这个就是设计来处理这种情况的。断路器通过检查涉及的字段(它们的类型,基数,大小等等)来估计查询需要的内存。然后检查加 载需要的fielddata会不会导致总的fielddata大小超过设置的堆的百分比。
 
如果估计的查询大小超过限制,断路器就会触发并且查询会被抛弃返回一个异常。这个发生在数据被加载之前,这就意味着你不会遇到OutOfMemory异常。
 
Elasticsearch拥有一系列的断路器,所有的这些都是用来保证内存限制不会被突破
indices.breaker.fielddata.limit
这个 fielddata 断路器限制fielddata的大小为堆大小的60%,默认情况下。
indices.breaker.request.limit
这个 request 断路器估算完成查询的其他部分要求的结构的大小,比如创建一个聚集通, 以及限制它们到堆大小的40%,默认情况下。
indices.breaker.total.limit
这个total断路器封装了 request 和 fielddata 断路器去确保默认情况下这2个 使用的总内存不超过堆大小的70%。
 
断路器限制可以通过文件 config/elasticsearch.yml 指定,也可以在集群上动态更新:
PUT /_cluster/settings
{
"persistent" : {
"indices.breaker.fielddata.limit" : 40% (1)
}
}
这个限制设置的是堆的百分比。
 
最好把断路器设置成一个相对保守的值。记住fielddata需要和堆共享 request 断路器, 索引内存缓冲区,过滤器缓存,打开的索引的Lucene数据结构,以及各种各样别的临时数据 结构。所以默认为相对保守的60%。过分乐观的设置可能会导致潜在的OOM异常,从而导致整 个节点挂掉。

从另一方面来说,一个过分保守的值将会简单的返回一个查询异常,这个异常会被应用处理。 异常总比挂掉好。这些异常也会促使你重新评估你的查询:为什么单个的查询需要超过60%的 堆空间。


断路器和Fielddata大小


在 Fielddata大小部分我们谈到了要给fielddata大小增加一个限制去保证老的不使用 的fielddata被驱逐出去。indices.fielddata.cache.size 和 indices.breaker.fielddata.limit 的关系是非常重要的。如果断路器限制比缓冲区大小要小,就会没有数据会被驱逐。为了能够 让它正确的工作,断路器限制必须比缓冲区大小要大。

我们注意到断路器是和总共的堆大小对比查询大小,而不是和真正已经使用的堆内存区比较。 这样做是有一系列技术原因的(比如,堆可能看起来是满的,但是实际上可能正在等待垃圾 回收,这个很难准确的估算)。但是作为终端用户,这意味着设置必须是保守的,因为它是 和整个堆大小比较,而不是空闲的堆比较。


参考:Elasticsearch权威指南笔记 
官网:https://www.elastic.co/guide/en/elasticsearch/guide/current/_limiting_memory_usage.html


Hadoop fs命令学习小记

空心菜 发表了文章 0 个评论 4263 次浏览 2016-09-13 20:08 来自相关话题

1,hadoop fs –fs [local | ]:声明hadoop使用的文件系统,如果不声明的话,使用当前配置文件配置的,按如下顺序查找:hadoop jar里的hadoop-default.xml->[Math Proc ...查看全部
HDFS.png

1,hadoop fs –fs [local | ]:声明hadoop使用的文件系统,如果不声明的话,使用当前配置文件配置的,按如下顺序查找:hadoop jar里的hadoop-default.xml->[Math Processing Error]HADOOPCONFDIR下的hadoop−default.xml−>HADOOP_CONF_DIR下的hadoop-site.xml。使用local代表将本地文件系统作为hadoop的DFS。如果传递uri做参数,那么就是特定的文件系统作为DFS。

2,hadoop fs –ls :等同于本地系统的ls,列出在指定目录下的文件内容,支持pattern匹配。输出格式如filename(full path)     size.其中n代表replica的个数,size代表大小(单位bytes)。

3,hadoop fs –lsr :递归列出匹配pattern的文件信息,类似ls,只不过递归列出所有子目录信息。

4,hadoop fs –du :列出匹配pattern的指定的文件系统空间总量(单位bytes),等价于unix下的针对目录的du –sb /*和针对文件的du –b ,输出格式如name(full path)  size(in bytes)。

5,hadoop fs –dus :等价于-du,输出格式也相同,只不过等价于unix的du -sb。

6,hadoop fs –mv :将制定格式的文件 move到指定的目标位置。当src为多个文件时,dst必须是个目录。

7,hadoop fs –cp :拷贝文件到目标位置,当src为多个文件时,dst必须是个目录。

8,hadoop fs –rm [-skipTrash] :删除匹配pattern的指定文件,等价于unix下的rm

9,hadoop fs –rmr [skipTrash] :递归删掉所有的文件和目录,等价于unix下的rm –rf

10,hadoop fs –rmi [skipTrash] :等价于unix的rm –rfi

11,hadoop fs –put :从本地系统拷贝文件到DFS。

12,hadoop fs –copyFromLocal :等价于-put。

13,hadoop fs –moveFromLocal :等同于-put,只不过源文件在拷贝后被删除。

14,hadoop fs –get [-ignoreCrc] [-crc] :从DFS拷贝文件到本地文件系统,文件匹配pattern,若是多个文件,则dst必须是目录。

15,hadoop fs –getmerge :顾名思义,从DFS拷贝多个文件、合并排序为一个文件到本地文件系统。

16,hadoop fs –cat :展示文件内容。

17,hadoop fs –copyToLocal [-ignoreCrc] [-crc] :等价于-get。

18,hadoop fs –mkdir :在指定位置创建目录。

19,hadoop fs –setrep [-R] [-w] :设置文件的备份级别,-R标志控制是否递归设置子目录及文件。

20,hadoop fs –chmod [-R] PATH…:修改文件的权限,-R标记递归修改。MODE为a+r,g-w,+rwx等,OCTALMODE为755这样。

21,hadoop fs -chown [-R] [OWNER][:[GROUP]] PATH…:修改文件的所有者和组。-R表示递归。

22,hadoop fs -chgrp [-R] GROUP PATH…:等价于-chown … :GROUP …。

23,hadoop fs –count[-q] :计数文件个数及所占空间的详情,输出表格的列的含义依次为:DIR_COUNT,FILE_COUNT,CONTENT_SIZE,FILE_NAME或者如果加了-q的话,还会列出QUOTA,REMAINING_QUOTA,SPACE_QUOTA,REMAINING_SPACE_QUOTA。

最后就是万能的hadoop fs –help

Hbase shell常用命令小记

空心菜 发表了文章 0 个评论 3235 次浏览 2016-09-13 00:19 来自相关话题

1、进入hbase shell console $HBASE_HOME/bin/hbase shell 如果有kerberos认证,需要事先使用相应的keytab进行一下认证(使用kinit命令),认证成功之后再使用hbase shel ...查看全部
hbase.png
1、进入hbase shell console
$HBASE_HOME/bin/hbase shell
如果有kerberos认证,需要事先使用相应的keytab进行一下认证(使用kinit命令),认证成功之后再使用hbase shell进入可以使用whoami命令可查看当前用户
hbase(main):002:0> whoami
2016-09-12 13:09:42,440 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root (auth:SIMPLE)
groups: root

 2、表的管理
1)查看有哪些表
hbase(main):001:0> list
TABLE
pythonTrace
1 row(s) in 0.1320 seconds
2)创建表
语法:create , {NAME => , VERSIONS => }
例如:创建表t1,有两个family name:f1,f2,且版本数均为2
hbase(main):001:0> create 't1',{NAME => 'f1', VERSIONS => 2},{NAME => 'f2', VERSIONS => 2}
0 row(s) in 0.4400 seconds

=> Hbase::Table - t1
3)删除表
分两步:首先disable,然后drop ;  例如:删除表t1
hbase(main):002:0> disable 't1'
0 row(s) in 1.2030 seconds

hbase(main):003:0> drop 't1'
0 row(s) in 0.1870 seconds
4)查看表的结构
语法:describe
,  例如:查看表t1的结构
hbase(main):005:0> describe 't1'
Table t1 is ENABLED
t1
COLUMN FAMILIES DESCRIPTION
{NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '2', COMP
RESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_M
EMORY => 'false', BLOCKCACHE => 'true'}
{NAME => 'f2', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '2', COMP
RESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_M
EMORY => 'false', BLOCKCACHE => 'true'}
2 row(s) in 0.0240 seconds
5)修改表结构
修改表结构必须先disable
语法:alter 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'}; 例如:修改表t1的cf的TTL为180天
hbase(main):006:0> disable 't1'
0 row(s) in 1.1950 seconds

hbase(main):007:0> alter 't1',{NAME=>'body',TTL=>'15552000'},{NAME=>'meta', TTL=>'15552000'}
Updating all regions with the new schema...
1/1 regions updated.
Done.
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 2.1910 seconds

hbase(main):008:0> enable 't1'
0 row(s) in 0.3930 seconds

3、权限管理
1)分配权限
语法 : grant
参数后面用逗号分隔
权限用五个字母表示: "RWXCA"; READ('R'), WRITE('W'), EXEC('X'), CREATE('C'), ADMIN('A')
例如,给用户‘test'分配对表t1有读写的权限
hbase(main)> grant 'test','RW','t1'
2)查看权限
语法:user_permission
; 例如,查看表t1的权限列表
hbase(main)> user_permission 't1'
3)收回权限
与分配权限类似,语法:revoke

例如,收回test用户在表t1上的权限
hbase(main)> revoke 'test','t1'

4、表数据的增删改查
1)添加数据
语法:put
,,,,
例如:给表t1的添加一行记录:rowkey是rowkey001,family name:f1,column name:col1,value:value01,timestamp:系统默认
hbase(main)> put 't1','rowkey001','f1:col1','value01'
用法比较单一
2)查询数据
a)查询某行记录
语法:get
,,[,....]   ;例如:查询表t1,rowkey001中的f1下的col1的值
hbase(main)> get 't1','rowkey001', 'f1:col1'
# 或者:
hbase(main)> get 't1','rowkey001', {COLUMN=>'f1:col1'}
查询表t1,rowke002中的f1下的所有列值
hbase(main)> get 't1','rowkey001'
b)扫描表
语法:scan
, {COLUMNS => [ ,.... ], LIMIT => num}
另外,还可以添加STARTROW、TIMERANGE和FITLER等高级功能; 例如:扫描表t1的前5条数据
hbase(main)> scan 't1',{LIMIT=>5}
c)查询表中的数据行数
语法:count
, {INTERVAL => intervalNum, CACHE => cacheNum}
INTERVAL设置多少行显示一次及对应的rowkey,默认1000;CACHE每次去取的缓存区大小,默认是10,调整该参数可提高查询速度
例如,查询表t1中的行数,每100条显示一次,缓存区为500
hbase(main)> count 't1', {INTERVAL => 100, CACHE => 500}
3)删除数据
a )删除行中的某个列值
语法:delete
, , ,必须指定列名
例如:删除表t1,rowkey001中的f1:col1的数据
hbase(main)> delete 't1','rowkey001','f1:col1'
注:将删除改行f1:col1列所有版本的数据
 
b )删除行
语法:deleteall
, , ,可以不指定列名,删除整行数据
例如:删除表t1,rowk001的数据
hbase(main)> deleteall 't1','rowkey001'
c)删除表中的所有数据
语法: truncate

其具体过程是:disable table -> drop table -> create table ;例如:删除表t1的所有数据
hbase(main)> truncate 't1'

5、Region管理
1)移动region
语法:move 'encodeRegionName', 'ServerName'
encodeRegionName指的regioName后面的编码,ServerName指的是master-status的Region Servers列表
示例如下:
hbase(main)>move '4343995a58be8e5bbc739af1e91cd72d', 'db-41.xxx.xxx.org,60020,1390274516739'
2)开启/关闭region
语法:balance_switch true|false
hbase(main)> balance_switch
3)手动split
语法:split 'regionName', 'splitKey'
4)手动触发major compaction
#语法:
#Compact all regions in a table:
hbase> major_compact 't1'
#Compact an entire region:
hbase> major_compact 'r1'
#Compact a single column family within a region:
hbase> major_compact 'r1', 'c1'
#Compact a single column family within a table:
hbase> major_compact 't1', 'c1'

6、配置管理及节点重启
1)修改hdfs配置
hdfs配置位置:/etc/hadoop/conf
# 同步hdfs配置
cat /home/hadoop/slaves|xargs -i -t scp /etc/hadoop/conf/hdfs-site.xml hadoop@{}:/etc/hadoop/conf/hdfs-site.xml
# 关闭:
cat /home/hadoop/slaves|xargs -i -t ssh hadoop@{} "sudo /home/hadoop/cdh4/hadoop-2.0.0-cdh4.2.1/sbin/hadoop-daemon.sh --config /etc/hadoop/conf stop datanode"
# 启动:
cat /home/hadoop/slaves|xargs -i -t ssh hadoop@{} "sudo /home/hadoop/cdh4/hadoop-2.0.0-cdh4.2.1/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start datanode"
2)修改hbase配置
hbase配置位置:
# 同步hbase配置
cat /home/hadoop/hbase/conf/regionservers|xargs -i -t scp /home/hadoop/hbase/conf/hbase-site.xml hadoop@{}:/home/hadoop/hbase/conf/hbase-site.xml
# graceful重启
cd ~/hbase
bin/graceful_stop.sh --restart --reload --debug inspurXXX.xxx.xxx.org

如何增加或减少kafka的Partition数量?

空心菜 回复了问题 2 人关注 1 个回复 10505 次浏览 2016-09-12 17:02 来自相关话题

Kafka集群中如何平衡Topics

空心菜 发表了文章 3 个评论 11662 次浏览 2016-09-12 14:55 来自相关话题

为什么我们需要平衡Topic 当一个kafka集群中一个节点关闭或者宕机,该服务器的负载任务将分配到集群中的其他节点,这种分配是不均匀的;即负载不是均匀的分布在集群中所有节点,我们需要做一些措施来实现这一平衡(也称为再平衡)。 ...查看全部


为什么我们需要平衡Topic


当一个kafka集群中一个节点关闭或者宕机,该服务器的负载任务将分配到集群中的其他节点,这种分配是不均匀的;即负载不是均匀的分布在集群中所有节点,我们需要做一些措施来实现这一平衡(也称为再平衡)。
 
再平衡我们需要注意两件事情。一是leader连任,或首选副本选举,另一种是分区的再平衡。多数情况下,一个节点宕机,然后过段时间又重新加入集群,第二种情况就是,当我们想要么减少或增加集群中节点的数目。 让我们来看看如何处理这些不同的场景平衡Topic的情况。
 
我下面的案例以5个broker的kafka集群为例。
 


案例1:当broker因为维护或由于服务器故障而关闭,并在一定的时间恢复 


有两种方法来处理这​​种情况。 一种是添加以下行到代理配置“auto.leader.rebalance.enable”自动重新平衡,但这个报告是个问题 。 另一种方法是手动使用“kafka-preferred-replica-election.sh”的工具。
 
编辑配置文件server.properties配置文件,添加auto.leader.rebalance.enable = false
kafkap1.png

 
我们让broker4宕机,然后看看topic负载如何分布
KafkaPrition.png

 
在这里我们可以看到负载不均匀分布,接下来我们在恢复broker4看看。
KafkaEvenly.png

 
现在,我们可以看到,即使broker4已经恢复到集群中服务了,但是它不是作为一个领导者作用于任何分区。那让我们使用kafka-preferred-replica-election.sh工具来平衡负载试试。
KafkaReelection.png

KafkaDescribe.png

现在我们可以看到topic分布是平衡的。


案例2:一个节点宕机且无法恢复


我们创建了一个新的broker,即使设置其borker.id为以前的broker的ID,那也是不能恢复的,只能手动运行kafka-preferred-replica-election.sh是topic平衡。


案例3:增加或者减少kafka集群中节点数量


增加或者删除节点,平衡topic步骤如下:
  1. 使用分区重新分配的工具(kafka-reassign-partition.sh),使用--generta参数生成一个推荐配置,这显示了当前和建议的副本分配情况;
  2. 复制推荐分配配置方案到jason文件中;
  3. 执行分区重新分配工具(kafka-reassign-partition.sh -execute)来更新元数据平衡。 确保运行的时候集群中节点负载问题,因为这个过程是在不同节点间发生数据移动;
  4. 等待一段时间(基于必须移动一定量的数据),并使用--verify选​​项验证平衡成功完成
  5. 一旦分区重新分配完成后,运行“kafka-preferred-replica-election.sh”工具来完成平衡。

 
在这里我们假设为减少kafka集群中的节点数(这里为介绍broker5),当节点终止后,我们使用使用分区重新分配工具,生成一份当前和建议的副本任务作为JSONs候选人分配的配置。
KafkaGenerate.png


把建议重新分配配置复制到一个JSON文件并执行分区重新分配的工具。 这将更新元数据,然后开始四处移动数据来平衡负载。
KafkaJson.png

KafkaExecute.png

 
接下来我们验证重新平衡/重新分配是否成功:
KafkaVerify.png

 
一旦重新分配所有分区是成功的,我们运行的首选副本竞选工具来平衡的主题,然后运行“描述主题”来检查主题平衡。
Kafkapre.png

KafkaView.png

现在我们可以看到topic(以及leaders和replicas)都是平衡的。
英文原文:https://blog.imaginea.com/how-to-rebalance-topics-in-kafka-cluster/
 

Elasticsearch Recovery详解

OpenSkill 发表了文章 0 个评论 5484 次浏览 2016-09-08 23:56 来自相关话题

基础知识点在Eleasticsearch中recovery指的就是一个索引的分片分配到另外一个节点的过程;一般在快照恢复、索引副本数变更、节点故障、节点重启时发生。由于master保存整个集群的状态信息,因此可以判断出哪些shard需要 ...查看全部

基础知识点

在Eleasticsearch中recovery指的就是一个索引的分片分配到另外一个节点的过程;一般在快照恢复、索引副本数变更、节点故障、节点重启时发生。由于master保存整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点,例如:


  1. 如果某个shard主分片在,副分片所在结点挂了,那么选择另外一个可用结点,将副分片分配(allocate)上去,然后进行主从分片的复制;
  2. 如果某个shard的主分片所在结点挂了,副分片还在,那么将副分片升级为主分片,然后做主从分片复制;
  3. 如果某个shard的主副分片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从该结点上恢复主分片,再选择另外的结点复制副分片。

正常情况下,我们可以通过ES的health的API接口,查看整个集群的健康状态和整个集群数据的完整性:

状态及含义如下:

  • green: 所有的shard主副分片都是正常的;
  • yellow: 所有shard的主分片都完好,部分副分片没有或者不完整,数据完整性依然完好;
  • red: 某些shard的主副分片都没有了,对应的索引数据不完整。

ecovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务性能下降,或者一部分功能暂时不可用。了解一些recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。


减少集群Full Restart造成的数据来回拷贝

ES集群可能会有整体重启的情况,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群, 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。


那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡,显然是不均衡的,master会触发rebalance过程,将数据在节点之间挪动。


整个过程无谓消耗了大量的网络流量;合理设置recovery相关参数则可以防范这种问题的发生。


gateway.expected_nodes
gateway.expected_master_nodes
gateway.expected_data_nodes

以上三个参数是说集群里一旦有多少个节点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data节点都算在内,而后面两个参数则分指masterdata node


在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:


gateway.recover_after_nodes
gateway.recover_after_master_nodes
gateway.recover_after_data_nodes

举例来说,对于一个有10个data node的集群,如果有以下的设置:


gateway.expected_data_nodes: 10
gateway.recover_after_time: 5m
gateway.recover_after_data_nodes: 8

那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。


减少主副本之间的数据复制

如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:

然后在节点重启完成加入集群后,再重新打开:

这样在节点重启完成后,尽量多的从本地直接恢复数据。

但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同节点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。


为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启节点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。


需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:


  1. 暂停数据写入程序
  2. 关闭集群shard allocation
  3. 手动执行POST /_flush/synced
  4. 重启节点
  5. 重新开启集群shard allocation
  6. 等待recovery完成,集群health status变成green
  7. 重新开启数据写入程序

特大热索引为何恢复慢

对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨节点拷贝segment file以外,translog recovery是导致慢的更重要的原因。


从主片恢复数据到副片需要经历3个阶段:


  1. 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里;
  2. 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里;
  3. 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。

可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。


万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery


其他Recovery相关的专家级设置

还有其他一些专家级的设置(参考: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。


最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9926) 。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。


分享阅读参考: https://henduan.com/rCWPD

怎么查看Elasticsearch关闭的索引和索引的状态

空心菜 回复了问题 2 人关注 1 个回复 18149 次浏览 2016-09-02 16:31 来自相关话题

Kafka入门简介

chris 发表了文章 2 个评论 3622 次浏览 2016-09-02 15:01 来自相关话题

Kafka背景 Kafka它本质上是一个消息系统,由当时从LinkedIn出来创业的三人小组开发,他们开发出了Apache Kafka实时信息队列技术,该技术致力于为各行各业的公司提供实时数据处理服务解决方案。Kafka为Linked ...查看全部


Kafka背景


Kafka它本质上是一个消息系统,由当时从LinkedIn出来创业的三人小组开发,他们开发出了Apache Kafka实时信息队列技术,该技术致力于为各行各业的公司提供实时数据处理服务解决方案。Kafka为LinkedIn的中枢神经系统,管理从各个应用程序的汇聚,这些数据经过处理后再被分发到其他地方。Kafka不同于传统的企业信息队列系统,它是以近乎实时的方式处理流经一个公司的所有数据,目前已经服务于LinkedIn、Netflix、Uber以及Verizon,并为此建立了实时信息处理平台。

流水数据是所有站点对其网站使用情况做报表时都要用到的数据中最常用的一部分,流水数据包括PV,浏览内容信息以及搜索记录等。这些数据通常是先以日志文件的形式存在,然后有周期的去对这些日志文件进行统计分析处理,然后获得需要的KPI指标结果。
 


Kafka应用场景


我们在接触一门新技术或是新语言时,得明白这门技术(或是语言)的应用场景,也就说要明白它能做什么,服务的对象是谁,下面用一个图来说明,如下图所示:
KafkaArch.png

首先,Kafka可以应用于消息系统,比如,当下较为热门的消息推送,这些消息推送系统的消息源,可以使用Kafka作为系统的核心组建来完成消息的生产和消息的消费。然后是网站的行迹,我们可以将企业的Portal,用户的操作记录等信息发送到Kafka中,按照实际业务需求,可以进行实时监控,或者做离线处理等。最后,一个是日志收集,类似于Flume套件这样的日志收集系统,但Kafka的设计架构采用push/pull,适合异构集群,Kafka可以批量提交消息,对Producer来说,在性能方面基本上是无消耗的,而在Consumer端中,我们可以使用HDFS这类的分布式文件存储系统进行存储。
 


Kafka架构原理


Kafka的设计之初是希望做一个统一的信息收集平台,能够实时的收集反馈信息,并且具有良好的容错能力。Kafka中我们最直观的感受就是它的消费者与生产者,如下图所示:

Producer And Consumer:
KafkaConsumer.png

这里Kafka对消息的保存是根据Topic进行归类的,由消息生产者(Producer)和消息消费者(Consumer)组成,另外,每一个Server称为一个Broker。对于Kafka集群而言,Producer和Consumer都依赖于ZooKeeper来保证数据的一致性。

Topic:
在每条消息输送到Kafka集群后,消息都会由一个Type,这个Type被称为一个Topic,不同的Topic的消息是分开存储的。如下图所示:
KafkaTopic.png

一个Topic会被归类为一则消息,每个Topic可以被分割为多个Partition,在每条消息中,它在文件中的位置称为Offset,用于标记唯一一条消息。在Kafka中,消息被消费后,消息仍然会被保留一定时间后在删除,比如在配置信息中,文件信息保留7天,那么7天后,不管Kafka中的消息是否被消费,都会被删除;以此来释放磁盘空间,减少磁盘的IO消耗。

在Kafka中,一个Topic的多个分区,被分布在Kafka集群的多个Server上,每个Server负责分区中消息的读写操作。另外,Kafka还可以配置分区需要备份的个数,以便提高可用行。由于用到来ZK来协调,每个分区都有一个Server为Leader状态,服务对外响应(如读写操作),若该Leader宕机,会由其他的Follower来选举出新的Leader来保证集群的高可用性。
 


总结


kafka是一个非常强悍的消息队列系统,他支持持久化和数据偏移,从而实现多个消费者同时消费一份数据是没有问题的。简单的来说他就是一个生产者消费模型的架构,居于zookeeper来管理每个Topic的信息记录和偏移量,从而达到高效和多复用的概念,消息不会消失,除非过期。

Hadoop 2.2.0的HA环境下journalnode一般搭建几个?

空心菜 回复了问题 2 人关注 1 个回复 5109 次浏览 2016-08-12 15:11 来自相关话题

due to compression bugs - see issue #7210 for details

空心菜 回复了问题 2 人关注 1 个回复 3702 次浏览 2016-08-08 17:37 来自相关话题