如何增加或减少kafka的Partition数量?

采菊篱下 回复了问题 • 2 人关注 • 1 个回复 • 483 次浏览 • 2016-09-12 17:02 • 来自相关话题

Kafka集群中如何平衡Topics

采菊篱下 发表了文章 • 3 个评论 • 410 次浏览 • 2016-09-12 14:55 • 来自相关话题

为什么我们需要平衡Topic

当一个kafka集群中一个节点关闭或者宕机,该服务器的负载任务将分配到集群中的其他节点,这种分配是不均匀的;即负载不是均匀的分布在集群中所有节点,我们需要做一些措施来实现这一平衡(也称为再平衡)。
 
再平衡我们需要注意两件事情。一是leader连任,或首选副本选举,另一种是分区的再平衡。多数情况下,一个节点宕机,然后过段时间又重新加入集群,第二种情况就是,当我们想要么减少或增加集群中节点的数目。 让我们来看看如何处理这些不同的场景平衡Topic的情况。
 
我下面的案例以5个broker的kafka集群为例。
 

案例1:当broker因为维护或由于服务器故障而关闭,并在一定的时间恢复 

有两种方法来处理这​​种情况。 一种是添加以下行到代理配置“auto.leader.rebalance.enable”自动重新平衡,但这个报告是个问题 。 另一种方法是手动使用“kafka-preferred-replica-election.sh”的工具。
 
编辑配置文件server.properties配置文件,添加auto.leader.rebalance.enable = false




 
我们让broker4宕机,然后看看topic负载如何分布




 
在这里我们可以看到负载不均匀分布,接下来我们在恢复broker4看看。




 
现在,我们可以看到,即使broker4已经恢复到集群中服务了,但是它不是作为一个领导者作用于任何分区。那让我们使用kafka-preferred-replica-election.sh工具来平衡负载试试。








现在我们可以看到topic分布是平衡的。

案例2:一个节点宕机且无法恢复

我们创建了一个新的broker,即使设置其borker.id为以前的broker的ID,那也是不能恢复的,只能手动运行kafka-preferred-replica-election.sh是topic平衡。

案例3:增加或者减少kafka集群中节点数量

增加或者删除节点,平衡topic步骤如下:
使用分区重新分配的工具(kafka-reassign-partition.sh),使用--generta参数生成一个推荐配置,这显示了当前和建议的副本分配情况;复制推荐分配配置方案到jason文件中;执行分区重新分配工具(kafka-reassign-partition.sh -execute)来更新元数据平衡。 确保运行的时候集群中节点负载问题,因为这个过程是在不同节点间发生数据移动;等待一段时间(基于必须移动一定量的数据),并使用--verify选​​项验证平衡成功完成一旦分区重新分配完成后,运行“kafka-preferred-replica-election.sh”工具来完成平衡。
 
在这里我们假设为减少kafka集群中的节点数(这里为介绍broker5),当节点终止后,我们使用使用分区重新分配工具,生成一份当前和建议的副本任务作为JSONs候选人分配的配置。





把建议重新分配配置复制到一个JSON文件并执行分区重新分配的工具。 这将更新元数据,然后开始四处移动数据来平衡负载。








 
接下来我们验证重新平衡/重新分配是否成功:




 
一旦重新分配所有分区是成功的,我们运行的首选副本竞选工具来平衡的主题,然后运行“描述主题”来检查主题平衡。








现在我们可以看到topic(以及leaders和replicas)都是平衡的。
英文原文:https://blog.imaginea.com/how-to-rebalance-topics-in-kafka-cluster/
  查看全部


为什么我们需要平衡Topic


当一个kafka集群中一个节点关闭或者宕机,该服务器的负载任务将分配到集群中的其他节点,这种分配是不均匀的;即负载不是均匀的分布在集群中所有节点,我们需要做一些措施来实现这一平衡(也称为再平衡)。
 
再平衡我们需要注意两件事情。一是leader连任,或首选副本选举,另一种是分区的再平衡。多数情况下,一个节点宕机,然后过段时间又重新加入集群,第二种情况就是,当我们想要么减少或增加集群中节点的数目。 让我们来看看如何处理这些不同的场景平衡Topic的情况。
 
我下面的案例以5个broker的kafka集群为例。
 


案例1:当broker因为维护或由于服务器故障而关闭,并在一定的时间恢复 


有两种方法来处理这​​种情况。 一种是添加以下行到代理配置“auto.leader.rebalance.enable”自动重新平衡,但这个报告是个问题 。 另一种方法是手动使用“kafka-preferred-replica-election.sh”的工具。
 
编辑配置文件server.properties配置文件,添加auto.leader.rebalance.enable = false
kafkap1.png

 
我们让broker4宕机,然后看看topic负载如何分布
KafkaPrition.png

 
在这里我们可以看到负载不均匀分布,接下来我们在恢复broker4看看。
KafkaEvenly.png

 
现在,我们可以看到,即使broker4已经恢复到集群中服务了,但是它不是作为一个领导者作用于任何分区。那让我们使用kafka-preferred-replica-election.sh工具来平衡负载试试。
KafkaReelection.png

KafkaDescribe.png

现在我们可以看到topic分布是平衡的。


案例2:一个节点宕机且无法恢复


我们创建了一个新的broker,即使设置其borker.id为以前的broker的ID,那也是不能恢复的,只能手动运行kafka-preferred-replica-election.sh是topic平衡。


案例3:增加或者减少kafka集群中节点数量


增加或者删除节点,平衡topic步骤如下:
  1. 使用分区重新分配的工具(kafka-reassign-partition.sh),使用--generta参数生成一个推荐配置,这显示了当前和建议的副本分配情况;
  2. 复制推荐分配配置方案到jason文件中;
  3. 执行分区重新分配工具(kafka-reassign-partition.sh -execute)来更新元数据平衡。 确保运行的时候集群中节点负载问题,因为这个过程是在不同节点间发生数据移动;
  4. 等待一段时间(基于必须移动一定量的数据),并使用--verify选​​项验证平衡成功完成
  5. 一旦分区重新分配完成后,运行“kafka-preferred-replica-election.sh”工具来完成平衡。

 
在这里我们假设为减少kafka集群中的节点数(这里为介绍broker5),当节点终止后,我们使用使用分区重新分配工具,生成一份当前和建议的副本任务作为JSONs候选人分配的配置。
KafkaGenerate.png


把建议重新分配配置复制到一个JSON文件并执行分区重新分配的工具。 这将更新元数据,然后开始四处移动数据来平衡负载。
KafkaJson.png

KafkaExecute.png

 
接下来我们验证重新平衡/重新分配是否成功:
KafkaVerify.png

 
一旦重新分配所有分区是成功的,我们运行的首选副本竞选工具来平衡的主题,然后运行“描述主题”来检查主题平衡。
Kafkapre.png

KafkaView.png

现在我们可以看到topic(以及leaders和replicas)都是平衡的。
英文原文:https://blog.imaginea.com/how-to-rebalance-topics-in-kafka-cluster/
 

Elasticsearch Recovery详解

OpenSkill 发表了文章 • 0 个评论 • 416 次浏览 • 2016-09-08 23:56 • 来自相关话题

基础知识点

在Eleasticsearch中recovery指的就是一个索引的分片分配到另外一个节点的过程;一般在快照恢复、索引副本数变更、节点故障、节点重启时发生。由于master保存整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点,例如:
如果某个shard主分片在,副分片所在结点挂了,那么选择另外一个可用结点,将副分片分配(allocate)上去,然后进行主从分片的复制。如果某个shard的主分片所在结点挂了,副分片还在,那么将副分片升级为主分片,然后做主从分片复制。如果某个shard的主副分片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从该结点上恢复主分片,再选择另外的结点复制副分片。
 
正常情况下,我们可以通过ES的health的API接口,查看整个集群的健康状态和整个集群数据的完整性:




状态及含义如下:
green: 所有的shard主副分片都是正常的;yellow: 所有shard的主分片都完好,部分副分片没有或者不完整,数据完整性依然完好;red: 某些shard的主副分片都没有了,对应的索引数据不完整。
 
recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务性能下降,或者一部分功能暂时不可用。了解一些recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。
 

减少集群Full Restart造成的数据来回拷贝

ES集群可能会有整体重启的情况,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群, 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡,显然是不均衡的,master会触发rebalance过程,将数据在节点之间挪动。整个过程无谓消耗了大量的网络流量;合理设置recovery相关参数则可以防范这种问题的发生。gateway.expected_nodes
gateway.expected_master_nodes
gateway.expected_data_nodes以上三个参数是说集群里一旦有多少个节点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。
 
在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:gateway.recover_after_nodes
gateway.recover_after_master_nodes
gateway.recover_after_data_nodes
 
举例来说,对于一个有10个data node的集群,如果有以下的设置:gateway.expected_data_nodes: 10
gateway.recover_after_time: 5m
gateway.recover_after_data_nodes: 8那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。
 

减少主副本之间的数据复制

如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:




然后在节点重启完成加入集群后,再重新打开:




这样在节点重启完成后,尽量多的从本地直接恢复数据。

但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同节点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。
 
为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启节点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。
 
需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
暂停数据写入程序关闭集群shard allocation手动执行POST /_flush/synced重启节点重新开启集群shard allocation 等待recovery完成,集群health status变成green重新开启数据写入程序
 

特大热索引为何恢复慢

对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨节点拷贝segment file以外,translog recovery是导致慢的更重要的原因。
 
从主片恢复数据到副片需要经历3个阶段:
对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。
 
可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。
 
万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery。
 

其他Recovery相关的专家级设置

还有其他一些专家级的设置(参见: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。
 
最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9226)  。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。
分享阅读原文:http://elasticsearch.cn/article/38 查看全部


基础知识点


在Eleasticsearch中recovery指的就是一个索引的分片分配到另外一个节点的过程;一般在快照恢复、索引副本数变更、节点故障、节点重启时发生。由于master保存整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点,例如:
  1. 如果某个shard主分片在,副分片所在结点挂了,那么选择另外一个可用结点,将副分片分配(allocate)上去,然后进行主从分片的复制。
  2. 如果某个shard的主分片所在结点挂了,副分片还在,那么将副分片升级为主分片,然后做主从分片复制。
  3. 如果某个shard的主副分片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从该结点上恢复主分片,再选择另外的结点复制副分片。

 
正常情况下,我们可以通过ES的health的API接口,查看整个集群的健康状态和整个集群数据的完整性:
EsHealth.png

状态及含义如下:
  • green: 所有的shard主副分片都是正常的;
  • yellow: 所有shard的主分片都完好,部分副分片没有或者不完整,数据完整性依然完好;
  • red: 某些shard的主副分片都没有了,对应的索引数据不完整。

 
recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务性能下降,或者一部分功能暂时不可用。了解一些recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。
 


减少集群Full Restart造成的数据来回拷贝


ES集群可能会有整体重启的情况,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群, 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡,显然是不均衡的,master会触发rebalance过程,将数据在节点之间挪动。整个过程无谓消耗了大量的网络流量;合理设置recovery相关参数则可以防范这种问题的发生。
gateway.expected_nodes
gateway.expected_master_nodes
gateway.expected_data_nodes
以上三个参数是说集群里一旦有多少个节点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。
 
在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:
gateway.recover_after_nodes
gateway.recover_after_master_nodes
gateway.recover_after_data_nodes
 
举例来说,对于一个有10个data node的集群,如果有以下的设置:
gateway.expected_data_nodes: 10
gateway.recover_after_time: 5m
gateway.recover_after_data_nodes: 8
那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。
 


减少主副本之间的数据复制


如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:
EsNone.png

然后在节点重启完成加入集群后,再重新打开:
EsAll.png

这样在节点重启完成后,尽量多的从本地直接恢复数据。

但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同节点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。
 
为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启节点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。
 
需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
  1. 暂停数据写入程序
  2. 关闭集群shard allocation
  3. 手动执行POST /_flush/synced
  4. 重启节点
  5. 重新开启集群shard allocation 
  6. 等待recovery完成,集群health status变成green
  7. 重新开启数据写入程序

 


特大热索引为何恢复慢


对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨节点拷贝segment file以外,translog recovery是导致慢的更重要的原因。
 
从主片恢复数据到副片需要经历3个阶段:
  1. 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。
  2. 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。
  3. 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。

 
可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。
 
万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery
 


其他Recovery相关的专家级设置


还有其他一些专家级的设置(参见: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。
 
最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9226)  。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。
分享阅读原文:http://elasticsearch.cn/article/38

怎么查看Elasticsearch关闭的索引和索引的状态

采菊篱下 回复了问题 • 2 人关注 • 1 个回复 • 670 次浏览 • 2016-09-02 16:31 • 来自相关话题

Kafka入门简介

chris 发表了文章 • 2 个评论 • 323 次浏览 • 2016-09-02 15:01 • 来自相关话题

Kafka背景

Kafka它本质上是一个消息系统,由当时从LinkedIn出来创业的三人小组开发,他们开发出了Apache Kafka实时信息队列技术,该技术致力于为各行各业的公司提供实时数据处理服务解决方案。Kafka为LinkedIn的中枢神经系统,管理从各个应用程序的汇聚,这些数据经过处理后再被分发到其他地方。Kafka不同于传统的企业信息队列系统,它是以近乎实时的方式处理流经一个公司的所有数据,目前已经服务于LinkedIn、Netflix、Uber以及Verizon,并为此建立了实时信息处理平台。

流水数据是所有站点对其网站使用情况做报表时都要用到的数据中最常用的一部分,流水数据包括PV,浏览内容信息以及搜索记录等。这些数据通常是先以日志文件的形式存在,然后有周期的去对这些日志文件进行统计分析处理,然后获得需要的KPI指标结果。
 

Kafka应用场景

我们在接触一门新技术或是新语言时,得明白这门技术(或是语言)的应用场景,也就说要明白它能做什么,服务的对象是谁,下面用一个图来说明,如下图所示:




首先,Kafka可以应用于消息系统,比如,当下较为热门的消息推送,这些消息推送系统的消息源,可以使用Kafka作为系统的核心组建来完成消息的生产和消息的消费。然后是网站的行迹,我们可以将企业的Portal,用户的操作记录等信息发送到Kafka中,按照实际业务需求,可以进行实时监控,或者做离线处理等。最后,一个是日志收集,类似于Flume套件这样的日志收集系统,但Kafka的设计架构采用push/pull,适合异构集群,Kafka可以批量提交消息,对Producer来说,在性能方面基本上是无消耗的,而在Consumer端中,我们可以使用HDFS这类的分布式文件存储系统进行存储。
 

Kafka架构原理

Kafka的设计之初是希望做一个统一的信息收集平台,能够实时的收集反馈信息,并且具有良好的容错能力。Kafka中我们最直观的感受就是它的消费者与生产者,如下图所示:

Producer And Consumer:




这里Kafka对消息的保存是根据Topic进行归类的,由消息生产者(Producer)和消息消费者(Consumer)组成,另外,每一个Server称为一个Broker。对于Kafka集群而言,Producer和Consumer都依赖于ZooKeeper来保证数据的一致性。

Topic:
在每条消息输送到Kafka集群后,消息都会由一个Type,这个Type被称为一个Topic,不同的Topic的消息是分开存储的。如下图所示:




一个Topic会被归类为一则消息,每个Topic可以被分割为多个Partition,在每条消息中,它在文件中的位置称为Offset,用于标记唯一一条消息。在Kafka中,消息被消费后,消息仍然会被保留一定时间后在删除,比如在配置信息中,文件信息保留7天,那么7天后,不管Kafka中的消息是否被消费,都会被删除;以此来释放磁盘空间,减少磁盘的IO消耗。

在Kafka中,一个Topic的多个分区,被分布在Kafka集群的多个Server上,每个Server负责分区中消息的读写操作。另外,Kafka还可以配置分区需要备份的个数,以便提高可用行。由于用到来ZK来协调,每个分区都有一个Server为Leader状态,服务对外响应(如读写操作),若该Leader宕机,会由其他的Follower来选举出新的Leader来保证集群的高可用性。
 

总结

kafka是一个非常强悍的消息队列系统,他支持持久化和数据偏移,从而实现多个消费者同时消费一份数据是没有问题的。简单的来说他就是一个生产者消费模型的架构,居于zookeeper来管理每个Topic的信息记录和偏移量,从而达到高效和多复用的概念,消息不会消失,除非过期。 查看全部


Kafka背景


Kafka它本质上是一个消息系统,由当时从LinkedIn出来创业的三人小组开发,他们开发出了Apache Kafka实时信息队列技术,该技术致力于为各行各业的公司提供实时数据处理服务解决方案。Kafka为LinkedIn的中枢神经系统,管理从各个应用程序的汇聚,这些数据经过处理后再被分发到其他地方。Kafka不同于传统的企业信息队列系统,它是以近乎实时的方式处理流经一个公司的所有数据,目前已经服务于LinkedIn、Netflix、Uber以及Verizon,并为此建立了实时信息处理平台。

流水数据是所有站点对其网站使用情况做报表时都要用到的数据中最常用的一部分,流水数据包括PV,浏览内容信息以及搜索记录等。这些数据通常是先以日志文件的形式存在,然后有周期的去对这些日志文件进行统计分析处理,然后获得需要的KPI指标结果。
 


Kafka应用场景


我们在接触一门新技术或是新语言时,得明白这门技术(或是语言)的应用场景,也就说要明白它能做什么,服务的对象是谁,下面用一个图来说明,如下图所示:
KafkaArch.png

首先,Kafka可以应用于消息系统,比如,当下较为热门的消息推送,这些消息推送系统的消息源,可以使用Kafka作为系统的核心组建来完成消息的生产和消息的消费。然后是网站的行迹,我们可以将企业的Portal,用户的操作记录等信息发送到Kafka中,按照实际业务需求,可以进行实时监控,或者做离线处理等。最后,一个是日志收集,类似于Flume套件这样的日志收集系统,但Kafka的设计架构采用push/pull,适合异构集群,Kafka可以批量提交消息,对Producer来说,在性能方面基本上是无消耗的,而在Consumer端中,我们可以使用HDFS这类的分布式文件存储系统进行存储。
 


Kafka架构原理


Kafka的设计之初是希望做一个统一的信息收集平台,能够实时的收集反馈信息,并且具有良好的容错能力。Kafka中我们最直观的感受就是它的消费者与生产者,如下图所示:

Producer And Consumer:
KafkaConsumer.png

这里Kafka对消息的保存是根据Topic进行归类的,由消息生产者(Producer)和消息消费者(Consumer)组成,另外,每一个Server称为一个Broker。对于Kafka集群而言,Producer和Consumer都依赖于ZooKeeper来保证数据的一致性。

Topic:
在每条消息输送到Kafka集群后,消息都会由一个Type,这个Type被称为一个Topic,不同的Topic的消息是分开存储的。如下图所示:
KafkaTopic.png

一个Topic会被归类为一则消息,每个Topic可以被分割为多个Partition,在每条消息中,它在文件中的位置称为Offset,用于标记唯一一条消息。在Kafka中,消息被消费后,消息仍然会被保留一定时间后在删除,比如在配置信息中,文件信息保留7天,那么7天后,不管Kafka中的消息是否被消费,都会被删除;以此来释放磁盘空间,减少磁盘的IO消耗。

在Kafka中,一个Topic的多个分区,被分布在Kafka集群的多个Server上,每个Server负责分区中消息的读写操作。另外,Kafka还可以配置分区需要备份的个数,以便提高可用行。由于用到来ZK来协调,每个分区都有一个Server为Leader状态,服务对外响应(如读写操作),若该Leader宕机,会由其他的Follower来选举出新的Leader来保证集群的高可用性。
 


总结


kafka是一个非常强悍的消息队列系统,他支持持久化和数据偏移,从而实现多个消费者同时消费一份数据是没有问题的。简单的来说他就是一个生产者消费模型的架构,居于zookeeper来管理每个Topic的信息记录和偏移量,从而达到高效和多复用的概念,消息不会消失,除非过期。

Hadoop 2.2.0的HA环境下journalnode一般搭建几个?

采菊篱下 回复了问题 • 2 人关注 • 1 个回复 • 412 次浏览 • 2016-08-12 15:11 • 来自相关话题

due to compression bugs - see issue #7210 for details

采菊篱下 回复了问题 • 2 人关注 • 1 个回复 • 440 次浏览 • 2016-08-08 17:37 • 来自相关话题

Hadoop 2.6.0修改配置PID文件路径

being 发表了文章 • 0 个评论 • 415 次浏览 • 2016-08-02 19:09 • 来自相关话题

修改原因

Hadoop启动后的PID文件默认配置是保存在 /tmp 目录下的,而linux下 /tmp 目录会定时清理,所以在集群运行一段时间后如果在停Hadoop相关服务是会出现类似:no datanode to stop 的错误提示,一般生产环境中我们需要重新修改PID的保存路径。关于Linux定期清理tmp目录参考我之前的文章:http://openskill.cn/article/413
 

Hadoop修改

1、HDFS
增加或修改:$HADOOP_HOME/etc/hadoop/hadoop-env.sh如下:# 修改为你想存放的路径
export HADOOP_PID_DIR=/data/hadoop/pids
export HADOOP_SECURE_DN_PID_DIR=${HADOOP_PID_DIR}
2、MapReduce
增加或修改:$HADOOP_HOME/etc/hadoop/mapred-env.sh如下:# 修改mapred的pid存放路径
export HADOOP_MAPRED_PID_DIR=/data/hadoop/pids**ps: 自定义存储目录需要先创建好。
 

Hbase 修改

增加或修改:$HBASE_HOME/conf/hbase-env.sh 如下:#hbase PID存放路径配置
export HBASE_PID_DIR=/data/hadoop/pids

命名规则说明

我阅读了一下stop-all.sh stop-dfs.sh,stop-yarn.sh脚本,发现原理都是通过一个pid文件来停止集群的。
 
这些进程的pid文件默认都是保存在系统的/tmp目录下面,Linux系统(Centos/RHEL等)每个一段时间就会清楚/tmp下面的内容,如果/tmp下没有相关的pid文件停止就会出错“no datanode to stop”
 
当我重启出现问题的时候我怕强制kill -9杀进程的话会破坏集群,于是我想到一个方法,按照pid文件的命名规则重新在/tmp目录下面创建这些pid文件,在翻看了一堆sbin目录下的脚本之后,找到了它们的命名规则。

比如hadoop相关进程的pid文件命名规则为:pid=$HADOOP_PID_DIR/hadoop-$HADOOP_IDENT_STRING-$command.pidYarn进程相关的PID文件:pid=$YARN_PID_DIR/yarn-$YARN_IDENT_STRING-$command.pid默认情况下$HADOOP_PID_DIR和$YARN_PID_DIR都为/tmp,$HADOOP_IDENT_STRING和$YARN_IDENT_STRING都为当前系统登录的用户名,比如我的用户名为root,$command为当前执行的命令:
 
比如执行了一个 hadoop-daemon.sh stop namenode,这时候就会去找/tmp/hadoop-root-namenode.pid文件拿到namenode进程的pid号,来停止namenode进程。
 
了解原理之后,于是我就开始手动创建这些文件,我首先jps把所有进程的pid都记录下来了,然后在/tmp目录按照命名规则创建好了这些进程的pid文件,然后再重新执行stop-all.sh命令,ok可以成功关闭集群了。这是我处理的一个过程,最后为了避免这种情况,我就做了如上修改的操作! 查看全部


修改原因


Hadoop启动后的PID文件默认配置是保存在 /tmp 目录下的,而linux下 /tmp 目录会定时清理,所以在集群运行一段时间后如果在停Hadoop相关服务是会出现类似:no datanode to stop 的错误提示,一般生产环境中我们需要重新修改PID的保存路径。关于Linux定期清理tmp目录参考我之前的文章:http://openskill.cn/article/413
 


Hadoop修改


1、HDFS
增加或修改:$HADOOP_HOME/etc/hadoop/hadoop-env.sh如下:
# 修改为你想存放的路径
export HADOOP_PID_DIR=/data/hadoop/pids
export HADOOP_SECURE_DN_PID_DIR=${HADOOP_PID_DIR}

2、MapReduce
增加或修改:$HADOOP_HOME/etc/hadoop/mapred-env.sh如下:
# 修改mapred的pid存放路径
export HADOOP_MAPRED_PID_DIR=/data/hadoop/pids
**ps: 自定义存储目录需要先创建好。
 


Hbase 修改


增加或修改:$HBASE_HOME/conf/hbase-env.sh 如下:
#hbase PID存放路径配置
export HBASE_PID_DIR=/data/hadoop/pids


命名规则说明


我阅读了一下stop-all.sh stop-dfs.sh,stop-yarn.sh脚本,发现原理都是通过一个pid文件来停止集群的。
 
这些进程的pid文件默认都是保存在系统的/tmp目录下面,Linux系统(Centos/RHEL等)每个一段时间就会清楚/tmp下面的内容,如果/tmp下没有相关的pid文件停止就会出错“no datanode to stop”
 
当我重启出现问题的时候我怕强制kill -9杀进程的话会破坏集群,于是我想到一个方法,按照pid文件的命名规则重新在/tmp目录下面创建这些pid文件,在翻看了一堆sbin目录下的脚本之后,找到了它们的命名规则。

比如hadoop相关进程的pid文件命名规则为:
pid=$HADOOP_PID_DIR/hadoop-$HADOOP_IDENT_STRING-$command.pid
Yarn进程相关的PID文件:
pid=$YARN_PID_DIR/yarn-$YARN_IDENT_STRING-$command.pid
默认情况下$HADOOP_PID_DIR和$YARN_PID_DIR都为/tmp,$HADOOP_IDENT_STRING和$YARN_IDENT_STRING都为当前系统登录的用户名,比如我的用户名为root,$command为当前执行的命令:
 
比如执行了一个 hadoop-daemon.sh stop namenode,这时候就会去找/tmp/hadoop-root-namenode.pid文件拿到namenode进程的pid号,来停止namenode进程。
 
了解原理之后,于是我就开始手动创建这些文件,我首先jps把所有进程的pid都记录下来了,然后在/tmp目录按照命名规则创建好了这些进程的pid文件,然后再重新执行stop-all.sh命令,ok可以成功关闭集群了。这是我处理的一个过程,最后为了避免这种情况,我就做了如上修改的操作!

Kafka consumer处理大消息数据问题分析

采菊篱下 发表了文章 • 0 个评论 • 708 次浏览 • 2016-07-22 22:34 • 来自相关话题

案例分析

今天在处理kafka consumer的程序的时候,发现如下错误:ERROR [2016-07-22 07:16:02,466] com.flow.kafka.consumer.main.KafkaConsumer: Unexpected Error Occurred
! kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic codeTopic partition 3 at fetch offset 94. Increase the fetch size, or decrease the maximum message size the broker will allow.
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:91) ~[pip-kafka-consumer.jar:na]
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) ~[pip-kafka-consumer.jar:na]
! at com.flow.kafka.consumer.main.KafkaConsumer$KafkaRiverFetcher.run(KafkaConsumer.java:291) ~[original-pip-kafka-consumer.jar:na]
! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
! at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]如上log可以看出,问题就是有一个较大的消息数据在codeTopic的partition 3上,然后consumer未能消费,提示我可以减小broker允许进入的消息数据的大小,或者增大consumer程序消费数据的大小。
 
从log上来看一目了然,如果要解决当前问题的话,1、减小broker消息体大小(设置message.max.bytes
参数)。;2、增大consumer获取数据信息大小(设置fetch.message.max.bytes参数)。默认broker消息体大小为1000000字节即为1M大小。消费者方面:fetch.message.max.bytes——>这将决定消费者可以获取的数据大小。
broker方面:replica.fetch.max.bytes——>这将允许broker的副本发送消息在集群并确保消息被正确地复制。如果这是太小,则消息不会被复制,因此,消费者永远不会看到的消息,因为消息永远不会承诺(完全复制)。
broker方面:message.max.bytes——>可以接受数据生产者最大消息数据大小。
由我的场景来看较大的消息体已经进入到了kafka,我这里要解决这个问题,只需要增加consumer的fetch.message.max.bytes数值就好。我单独把那条数据消费出来,写到一个文件中发现那条消息大小为1.5M左右,为了避免再次发生这种问题我把consumer程序的fetch.message.max.bytes参数调节为了3072000即为3M,重启consumer程序,查看log一切正常,解决这个消费错误到此结束,下面介绍一下kafka针对大数据处理的思考。

kafka的设计初衷

Kafka设计的初衷是迅速处理小量的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?
 
针对这个问题,可以参考如下建议:
[]  最好的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。[/][]  第二个方法是,将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。[/][]  第三,Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。[/]
 
不过如果上述方法都不是你需要的,而你最终还是希望传送大的消息,那么,则可以在kafka中设置下面一些参数:
broker 配置:
[]message.max.bytes (默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。[/][]log.segment.bytes (默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。[/][]replica.fetch.max.bytes (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。[/]
 
Consumer 配置:
[]fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。[/][]性能: 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。[/][]可用的内存和分区数:Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保 分区数最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数最大需要内存空间 不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。[/][]垃圾回收:到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。[/]
具体使用情况可以视具体情况而定。
参考:http://www.4byte.cn/question/288008/kafka-sending-a-15mb-message.html 查看全部


案例分析


今天在处理kafka consumer的程序的时候,发现如下错误:
ERROR [2016-07-22 07:16:02,466] com.flow.kafka.consumer.main.KafkaConsumer: Unexpected Error Occurred
! kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic codeTopic partition 3 at fetch offset 94. Increase the fetch size, or decrease the maximum message size the broker will allow.
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:91) ~[pip-kafka-consumer.jar:na]
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) ~[pip-kafka-consumer.jar:na]
! at com.flow.kafka.consumer.main.KafkaConsumer$KafkaRiverFetcher.run(KafkaConsumer.java:291) ~[original-pip-kafka-consumer.jar:na]
! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
! at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
如上log可以看出,问题就是有一个较大的消息数据在codeTopic的partition 3上,然后consumer未能消费,提示我可以减小broker允许进入的消息数据的大小,或者增大consumer程序消费数据的大小。
 
从log上来看一目了然,如果要解决当前问题的话,1、减小broker消息体大小(设置message.max.bytes
参数)。;2、增大consumer获取数据信息大小(设置fetch.message.max.bytes参数)。默认broker消息体大小为1000000字节即为1M大小。
消费者方面:fetch.message.max.bytes——>这将决定消费者可以获取的数据大小。
broker方面:replica.fetch.max.bytes——>这将允许broker的副本发送消息在集群并确保消息被正确地复制。如果这是太小,则消息不会被复制,因此,消费者永远不会看到的消息,因为消息永远不会承诺(完全复制)。
broker方面:message.max.bytes——>可以接受数据生产者最大消息数据大小。

由我的场景来看较大的消息体已经进入到了kafka,我这里要解决这个问题,只需要增加consumer的fetch.message.max.bytes数值就好。我单独把那条数据消费出来,写到一个文件中发现那条消息大小为1.5M左右,为了避免再次发生这种问题我把consumer程序的fetch.message.max.bytes参数调节为了3072000即为3M,重启consumer程序,查看log一切正常,解决这个消费错误到此结束,下面介绍一下kafka针对大数据处理的思考。


kafka的设计初衷


Kafka设计的初衷是迅速处理小量的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?
 
针对这个问题,可以参考如下建议:
    []  最好的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。[/][]  第二个方法是,将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。[/][]  第三,Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。[/]

 
不过如果上述方法都不是你需要的,而你最终还是希望传送大的消息,那么,则可以在kafka中设置下面一些参数:
broker 配置:
    []message.max.bytes (默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。[/][]log.segment.bytes (默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。[/][]replica.fetch.max.bytes (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。[/]

 
Consumer 配置:
    []fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。[/][]性能: 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。[/][]可用的内存和分区数:Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保 分区数最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数最大需要内存空间 不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。[/][]垃圾回收:到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。[/]

具体使用情况可以视具体情况而定。
参考:http://www.4byte.cn/question/288008/kafka-sending-a-15mb-message.html

Flume(NG)架构设计要点及配置实践

Ansible 发表了文章 • 0 个评论 • 415 次浏览 • 2016-07-14 21:34 • 来自相关话题

Flume NG是一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡。
 

架构设计要点

Flume的架构主要有一下几个核心概念:
[]Event:一个数据单元,带有一个可选的消息头[/][]Flow:Event从源点到达目的点的迁移的抽象[/][]Client:操作位于源点处的Event,将其发送到Flume Agent[/][]Agent:一个独立的Flume进程,包含组件Source、Channel、Sink[/][]Source:用来消费传递到该组件的Event[/][]Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event[/][]Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)[/]
 
Flume NG架构,如图所示:




外部系统产生日志,直接通过Flume的Agent的Source组件将事件(如日志行)发送到中间临时的channel组件,最后传递给Sink组件,HDFS Sink组件可以直接把数据存储到HDFS集群上。
一个最基本Flow的配置,格式如下:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set channel for source
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ...
<Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ...

# set channel for sink
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>尖括号里面的,我们可以根据实际需求或业务来修改名称。下面详细说明:
表示配置一个Agent的名称,一个Agent肯定有一个名称。与是Agent的Source组件的名称,消费传递过来的Event。与是Agent的Channel组件的名称。与是Agent的Sink组件的名称,从Channel中消费(移除)Event。
上面配置内容中,第一组中配置Source、Sink、Channel,它们的值可以有1个或者多个;第二组中配置Source将把数据存储(Put)到哪一个Channel中,可以存储到1个或多个Channel中,同一个Source将数据存储到多个Channel中,实际上是Replication;第三组中配置Sink从哪一个Channel中取(Task)数据,一个Sink只能从一个Channel中取数据。

下面,根据官网文档,我们展示几种Flow Pipeline,各自适应于什么样的应用场景:
 
[]多个Agent顺序连接[/]




可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。
 
[]多个Agent的数据汇聚到同一个Agent[/]




这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。
 
[]多路(Multiplexing)Agent[/]




这种模式,有两种方式,一种是用来复制(Replication),另一种是用来分流(Multiplexing)。Replication方式,可以将最前端的数据源复制多份,分别传递到多个channel中,每个channel接收到的数据都是相同的,配置格式,如下所示:
# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating上面指定了selector的type的值为replication,其他的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,然后数据被传递到Sink1和Sink2。

Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel,配置格式,如下所示:
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的值,即header的值:如果header的值为Value1、Value2,数据从Source1路由到Channel1;如果header的值为Value2、Value3,数据从Source1路由到Channel2。
 
[]实现load balance功能[/]




Load balancing Sink Processor能够实现load balance功能,上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上,示例配置,如下所示:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
[]实现failover功能[/]
Failover Sink Processor能够实现failover功能,具体流程类似load balance,但是内部处理机制与load balance完全不同:Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。如果一个Sink能够成功处理Event,则会加入到一个Pool中,否则会被移出Pool并计算失败次数,设置一个惩罚因子,示例配置如下所示:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 20000

基本功能

我们看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),了解它的功能集合,能够让我们在应用中更好地选择使用哪一种方案。说明Flume NG的功能,实际还是围绕着Agent的三个组件Source、Channel、Sink来看它能够支持哪些技术或协议。我们不再对各个组件支持的协议详细配置进行说明,通过列表的方式分别对三个组件进行概要说明:
 
[]Flume Source[/]




 
[]Flume Channel[/]




 
[]Flume Sink[/]




另外还有Channel Selector、Sink Processor、Event Serializer、Interceptor等组件,可以参考官网提供的用户手册。

应用实践

安装Flume NG非常简单,我们使用最新的1.5.0.1版本,执行如下命令:
cd /usr/local
wget http://mirror.bit.edu.cn/apache/flume/1.5.0.1/apache-flume-1.5.0.1-bin.tar.gz
tar xvzf apache-flume-1.5.0.1-bin.tar.gz
cd apache-flume-1.5.0.1-bin如果需要使用到Hadoop集群,保证Hadoop相关的环境变量都已经正确配置,并且Hadoop集群可用。下面,通过一些实际的配置实例,来了解Flume的使用。为了简单期间,channel我们使用Memory类型的channel。
 
[]Avro Source+Memory Channel+Logger Sink[/]
使用apache-flume-1.5.0.1自带的例子,使用Avro Source接收外部数据源,Logger作为sink,即通过Avro RPC调用,将数据缓存在channel中,然后通过Logger打印出调用发送的数据。

配置Agent,修改配置文件conf/flume-conf.properties,内容如下:
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.channels.ch1.capacity = 1000
agent1.sources = avro-source1
agent1.sinks = log-sink1首先,启动Agent进程:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1然后,启动Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console
[]Avro Source+Memory Channel+HDFS Sink[/]
配置Agent,修改配置文件conf/flume-conf-hdfs.properties,内容如下:
# Define a source, channel, sink
agent1.sources = avro-source1
agent1.channels = ch1
agent1.sinks = hdfs-sink

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_file
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
agent1.sinks.hdfs-sink1.rollInterval = 0
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
agent1.sinks.hdfs-sink1.fileType = SequenceFile
agent1.sinks.hdfs-sink1.writeFormat = TEXT
首先,启动Agent:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-hdfs.properties -Dflume.root.logger=INFO,console -n agent1然后,启动Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console可以查看同步到HDFS上的数据:
hdfs dfs -ls /data/flume结果示例,如下所示:
-rw-r--r-- 3 shirdrn supergroup 1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log
-rw-r--r-- 3 shirdrn supergroup 1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log
-rw-r--r-- 3 shirdrn supergroup 259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log
[]Spooling Directory Source+Memory Channel+HDFS Sink[/]
配置Agent,修改配置文件flume-conf-spool.properties,内容如下:
# Define source, channel, sink
agent1.sources = spool-source1
agent1.channels = ch1
agent1.sinks = hdfs-sink1

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define and configure an Spool directory source
agent1.sources.spool-source1.channels = ch1
agent1.sources.spool-source1.type = spooldir
agent1.sources.spool-source1.spoolDir = /home/shirdrn/data/
agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)?
agent1.sources.spool-source1.batchSize = 50
agent1.sources.spool-source1.inputCharset = UTF-8

# Define and configure a hdfs sink
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%S
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
agent1.sinks.hdfs-sink1.fileType = SequenceFile
agent1.sinks.hdfs-sink1.writeFormat = TEXT
agent1.sinks.hdfs-sink1.rollInterval = 0启动Agent进程,执行如下命令:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1可以查看HDFS上同步过来的数据:
hdfs dfs -ls /data/flume结果示例,如下所示:
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log
-rw-r--r-- 3 shirdrn supergroup 1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log
[]Exec Source+Memory Channel+File Roll Sink[/]
配置Agent,修改配置文件flume-conf-file.properties,内容如下:
# Define source, channel, sink
agent1.sources = tail-source1
agent1.channels = ch1
agent1.sinks = file-sink1

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define and configure an Exec source
agent1.sources.tail-source1.channels = ch1
agent1.sources.tail-source1.type = exec
agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log
agent1.sources.tail-source1.shell = /bin/sh -c
agent1.sources.tail-source1.batchSize = 50

# Define and configure a File roll sink
# and connect it to the other end of the same channel.
agent1.sinks.file-sink1.channel = ch1
agent1.sinks.file-sink1.type = file_roll
agent1.sinks.file-sink1.batchSize = 100
agent1.sinks.file-sink1.serializer = TEXT
agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data启动Agent进程,执行如下命令:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-file.properties -Dflume.root.logger=INFO,console -n agent1可以查看File Roll Sink对应的本地文件系统目录/home/shirdrn/sink_data下,示例如下所示:
-rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1
-rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2
-rw-rw-r-- 1 shirdrn shirdrn 0 Sep 17 11:37 1410924990039-3
-rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4
-rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5有关Flume NG更多配置及其说明,请参考官方用户手册,非常详细。

参考链接:
http://flume.apache.org/FlumeUserGuide.html 
https://blogs.apache.org/flume/entry/flume_ng_architecture
 

分享阅读原文:http://shiyanjun.cn/archives/915.html​ 
作者:时延军 查看全部
Flume NG是一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡。
 


架构设计要点


Flume的架构主要有一下几个核心概念:
    []Event:一个数据单元,带有一个可选的消息头[/][]Flow:Event从源点到达目的点的迁移的抽象[/][]Client:操作位于源点处的Event,将其发送到Flume Agent[/][]Agent:一个独立的Flume进程,包含组件Source、Channel、Sink[/][]Source:用来消费传递到该组件的Event[/][]Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event[/][]Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)[/]

 
Flume NG架构,如图所示:
FlumeArch.png

外部系统产生日志,直接通过Flume的Agent的Source组件将事件(如日志行)发送到中间临时的channel组件,最后传递给Sink组件,HDFS Sink组件可以直接把数据存储到HDFS集群上。
一个最基本Flow的配置,格式如下:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set channel for source
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ...
<Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ...

# set channel for sink
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
尖括号里面的,我们可以根据实际需求或业务来修改名称。下面详细说明:
表示配置一个Agent的名称,一个Agent肯定有一个名称。与是Agent的Source组件的名称,消费传递过来的Event。与是Agent的Channel组件的名称。与是Agent的Sink组件的名称,从Channel中消费(移除)Event。
上面配置内容中,第一组中配置Source、Sink、Channel,它们的值可以有1个或者多个;第二组中配置Source将把数据存储(Put)到哪一个Channel中,可以存储到1个或多个Channel中,同一个Source将数据存储到多个Channel中,实际上是Replication;第三组中配置Sink从哪一个Channel中取(Task)数据,一个Sink只能从一个Channel中取数据。

下面,根据官网文档,我们展示几种Flow Pipeline,各自适应于什么样的应用场景:
 
    []多个Agent顺序连接[/]

Order.png

可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。
 
    []多个Agent的数据汇聚到同一个Agent[/]

fulu.png

这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。
 
    []多路(Multiplexing)Agent[/]

multip.png

这种模式,有两种方式,一种是用来复制(Replication),另一种是用来分流(Multiplexing)。Replication方式,可以将最前端的数据源复制多份,分别传递到多个channel中,每个channel接收到的数据都是相同的,配置格式,如下所示:
# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating
上面指定了selector的type的值为replication,其他的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,然后数据被传递到Sink1和Sink2。

Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel,配置格式,如下所示:
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>
上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的值,即header的值:如果header的值为Value1、Value2,数据从Source1路由到Channel1;如果header的值为Value2、Value3,数据从Source1路由到Channel2。
 
    []实现load balance功能[/]

loadbalance.png

Load balancing Sink Processor能够实现load balance功能,上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上,示例配置,如下所示:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

    []实现failover功能[/]

Failover Sink Processor能够实现failover功能,具体流程类似load balance,但是内部处理机制与load balance完全不同:Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。如果一个Sink能够成功处理Event,则会加入到一个Pool中,否则会被移出Pool并计算失败次数,设置一个惩罚因子,示例配置如下所示:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 20000


基本功能


我们看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),了解它的功能集合,能够让我们在应用中更好地选择使用哪一种方案。说明Flume NG的功能,实际还是围绕着Agent的三个组件Source、Channel、Sink来看它能够支持哪些技术或协议。我们不再对各个组件支持的协议详细配置进行说明,通过列表的方式分别对三个组件进行概要说明:
 
    []Flume Source[/]

souce.png

 
    []Flume Channel[/]

channel.png

 
    []Flume Sink[/]

sink.png

另外还有Channel Selector、Sink Processor、Event Serializer、Interceptor等组件,可以参考官网提供的用户手册。


应用实践


安装Flume NG非常简单,我们使用最新的1.5.0.1版本,执行如下命令:
cd /usr/local
wget http://mirror.bit.edu.cn/apache/flume/1.5.0.1/apache-flume-1.5.0.1-bin.tar.gz
tar xvzf apache-flume-1.5.0.1-bin.tar.gz
cd apache-flume-1.5.0.1-bin
如果需要使用到Hadoop集群,保证Hadoop相关的环境变量都已经正确配置,并且Hadoop集群可用。下面,通过一些实际的配置实例,来了解Flume的使用。为了简单期间,channel我们使用Memory类型的channel。
 
    []Avro Source+Memory Channel+Logger Sink[/]

使用apache-flume-1.5.0.1自带的例子,使用Avro Source接收外部数据源,Logger作为sink,即通过Avro RPC调用,将数据缓存在channel中,然后通过Logger打印出调用发送的数据。

配置Agent,修改配置文件conf/flume-conf.properties,内容如下:
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.channels.ch1.capacity = 1000
agent1.sources = avro-source1
agent1.sinks = log-sink1
首先,启动Agent进程:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1
然后,启动Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console

    []Avro Source+Memory Channel+HDFS Sink[/]

配置Agent,修改配置文件conf/flume-conf-hdfs.properties,内容如下:
# Define a source, channel, sink
agent1.sources = avro-source1
agent1.channels = ch1
agent1.sinks = hdfs-sink

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_file
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
agent1.sinks.hdfs-sink1.rollInterval = 0
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
agent1.sinks.hdfs-sink1.fileType = SequenceFile
agent1.sinks.hdfs-sink1.writeFormat = TEXT

首先,启动Agent:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-hdfs.properties -Dflume.root.logger=INFO,console -n agent1
然后,启动Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console
可以查看同步到HDFS上的数据:
hdfs dfs -ls /data/flume
结果示例,如下所示:
-rw-r--r--   3 shirdrn supergroup    1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log
-rw-r--r-- 3 shirdrn supergroup 1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log
-rw-r--r-- 3 shirdrn supergroup 259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log

    []Spooling Directory Source+Memory Channel+HDFS Sink[/]

配置Agent,修改配置文件flume-conf-spool.properties,内容如下:
# Define source, channel, sink
agent1.sources = spool-source1
agent1.channels = ch1
agent1.sinks = hdfs-sink1

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define and configure an Spool directory source
agent1.sources.spool-source1.channels = ch1
agent1.sources.spool-source1.type = spooldir
agent1.sources.spool-source1.spoolDir = /home/shirdrn/data/
agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)?
agent1.sources.spool-source1.batchSize = 50
agent1.sources.spool-source1.inputCharset = UTF-8

# Define and configure a hdfs sink
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%S
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
agent1.sinks.hdfs-sink1.fileType = SequenceFile
agent1.sinks.hdfs-sink1.writeFormat = TEXT
agent1.sinks.hdfs-sink1.rollInterval = 0
启动Agent进程,执行如下命令:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1
可以查看HDFS上同步过来的数据:
hdfs dfs -ls /data/flume
结果示例,如下所示:
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log
-rw-r--r-- 3 shirdrn supergroup 1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log

    []Exec Source+Memory Channel+File Roll Sink[/]

配置Agent,修改配置文件flume-conf-file.properties,内容如下:
# Define source, channel, sink
agent1.sources = tail-source1
agent1.channels = ch1
agent1.sinks = file-sink1

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define and configure an Exec source
agent1.sources.tail-source1.channels = ch1
agent1.sources.tail-source1.type = exec
agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log
agent1.sources.tail-source1.shell = /bin/sh -c
agent1.sources.tail-source1.batchSize = 50

# Define and configure a File roll sink
# and connect it to the other end of the same channel.
agent1.sinks.file-sink1.channel = ch1
agent1.sinks.file-sink1.type = file_roll
agent1.sinks.file-sink1.batchSize = 100
agent1.sinks.file-sink1.serializer = TEXT
agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data
启动Agent进程,执行如下命令:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-file.properties -Dflume.root.logger=INFO,console -n agent1
可以查看File Roll Sink对应的本地文件系统目录/home/shirdrn/sink_data下,示例如下所示:
-rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1
-rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2
-rw-rw-r-- 1 shirdrn shirdrn 0 Sep 17 11:37 1410924990039-3
-rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4
-rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5
有关Flume NG更多配置及其说明,请参考官方用户手册,非常详细。

参考链接:
http://flume.apache.org/FlumeUserGuide.html 
https://blogs.apache.org/flume/entry/flume_ng_architecture
 


分享阅读原文:http://shiyanjun.cn/archives/915.html​ 
作者:时延军