源码分析Elasticsearch Master选举过程

Geek小A 发表了文章 • 2 个评论 • 391 次浏览 • 2017-01-05 20:16 • 来自相关话题

ES 有Master节点和Data节点,Master节点什么意思呢? 就是主人节点,这个集群的主人,就是皇帝。ES同一时刻只有一个Master节点。小生一直看古装走火入魔,所谓心里有王朝,眼里就有王朝,看啥啥是王朝。就用王朝解释下集群,皇帝比喻下Master。我们先来看2个配置项。
 
node.master: true  就是皇子,意思是有资格成为Master,成为皇帝的人选,这是天生的,是无字天书,在elasticsearch.yml里写好的。

discovery.zen.minimum_master_nodes: 1 就是几个皇子在场的时候,才能选新皇帝,不然难以服众,容易脑裂(brain split)是吧。
 

Master的职责

只有皇帝才有资格发布圣旨ClusterState(集群状态)。他维护着这个王朝的状态,决定着这个王朝很多重要的大小事物。有一些事情必须皇帝才能执行,比如砍头(删除索引)。但是ES作为P2P集群,Master的职责,还是被弱化了一些。一张图看一下皇帝的工作内容。





 

什么时候选Master

只有在皇帝驾崩,和王朝诞生的时候,才选举Master皇帝,是吧,想让皇帝禅位,除非他死了,或者王朝被推翻了(所有节点重启)

 
节点启动,要加入一个集群的时候
 
//ZenDiscovery
private void innterJoinCluster() {
boolean retry = true;
while (retry) {
if (lifecycle.stoppedOrClosed()) {
return;
}
retry = false;
DiscoveryNode masterNode = findMaster(); //找一个节点出来当皇帝
if (masterNode == null) {
logger.trace("no masterNode returned");
retry = true;
continue;
}
//....
或者节点关闭 Master Gone
private void handleMasterGone(final DiscoveryNode masterNode, final String reason) {
if (lifecycleState() != Lifecycle.State.STARTED) {
return;
}
if (master) {
return;
}
logger.info("master_left [{}], reason [{}]", masterNode, reason);
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (!masterNode.id().equals(currentState.nodes().masterNodeId())) {
return currentState;
}
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.newNodesBuilder()
.putAll(currentState.nodes())
.remove(masterNode.id())
.masterNodeId(null);
if (!electMaster.hasEnoughMasterNodes(nodesBuilder.build())) {
return rejoin(ClusterState.builder().state(currentState).nodes(nodesBuilder).build(), "not enough master nodes after master left (reason = " + reason + ")");
}
final DiscoveryNode electedMaster = electMaster.electMaster(nodesBuilder.build()); // 选举Master
 

Master选举

首先必须是皇子(node.master: true),具体哪皇子成为皇帝呢? 看天意啊,最先启动的那个节点。老臣认为。当立嫡长子为太子,成为皇帝啊,这样江山社稷才能稳固啊,(一阵激动)省略上万句。。。好了,演完戏了,看代码。

 
ZenDiscovery模块启动的时候,要加入集群。findMaster 方法里,Ping一堆节点出来,Ping就是发现节点,这里的Ping不是Linux的命令Ping,是向ES的9300端口发送数据的意思。Linux的Ping是可以禁止的,不能因为命令Ping不通机器,就认为相互不能发现节点。Ping有组播MulticastZenPing和单播UnicastZenPing 两种。如果节点少,用单播也可以。组播在一些环境下可能无法相互发现节点,或者被安全软件识别为恶意程序。节点列表确定后。交给 ElectMasterService 去选举,快排后的第一个节点
//ElectMasterService
/**
* Elects a new master out of the possible nodes, returning it. Returns <tt>null</tt>
* if no master has been elected.
*/
public DiscoveryNode electMaster(Iterable<DiscoveryNode> nodes) {
List<DiscoveryNode> sortedNodes = sortedMasterNodes(nodes);
if (sortedNodes == null || sortedNodes.isEmpty()) {
return null;
}
return sortedNodes.get(0);
}

private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
List<DiscoveryNode> possibleNodes = Lists.newArrayList(nodes);
if (possibleNodes.isEmpty()) {
return null;
}
// clean non master nodes
for (Iterator<DiscoveryNode> it = possibleNodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!node.masterNode()) {
it.remove();
}
}
CollectionUtil.quickSort(possibleNodes, nodeComparator);
return possibleNodes;
}对了,皇帝上位以后,第一件事情是发布圣旨,昭告天下,以后寡人就是皇帝了。

怎么看,现在谁是皇帝呢?
 
curl http://localhost:9200/_cat/master?v
脑裂问题
 
关于brain split脑裂问题,可以看这个:
如何避免脑裂: http://blog.trifork.com/2013/10/24/how-to-avoid-the-split-brain-problem-in-elasticsearch/  
官方讨论:https://github.com/elasticsearch/elasticsearch/issues/2488 
 
最后,为了让大家对皇帝有个感性的认识,赠图一张,不谢!




原文分享:http://www.codeweblog.com/elasticsearch-%E6%BA%90%E4%BB%A3%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8Bmaster%E9%80%89%E4%B8%BE/  查看全部
ES 有Master节点和Data节点,Master节点什么意思呢? 就是主人节点,这个集群的主人,就是皇帝。ES同一时刻只有一个Master节点。小生一直看古装走火入魔,所谓心里有王朝,眼里就有王朝,看啥啥是王朝。就用王朝解释下集群,皇帝比喻下Master。我们先来看2个配置项。
 
node.master: true  就是皇子,意思是有资格成为Master,成为皇帝的人选,这是天生的,是无字天书,在elasticsearch.yml里写好的。

discovery.zen.minimum_master_nodes: 1 就是几个皇子在场的时候,才能选新皇帝,不然难以服众,容易脑裂(brain split)是吧。
 


Master的职责


只有皇帝才有资格发布圣旨ClusterState(集群状态)。他维护着这个王朝的状态,决定着这个王朝很多重要的大小事物。有一些事情必须皇帝才能执行,比如砍头(删除索引)。但是ES作为P2P集群,Master的职责,还是被弱化了一些。一张图看一下皇帝的工作内容。

Masterkpi.png

 


什么时候选Master


只有在皇帝驾崩,和王朝诞生的时候,才选举Master皇帝,是吧,想让皇帝禅位,除非他死了,或者王朝被推翻了(所有节点重启)

 
节点启动,要加入一个集群的时候
 
//ZenDiscovery
private void innterJoinCluster() {
boolean retry = true;
while (retry) {
if (lifecycle.stoppedOrClosed()) {
return;
}
retry = false;
DiscoveryNode masterNode = findMaster(); //找一个节点出来当皇帝
if (masterNode == null) {
logger.trace("no masterNode returned");
retry = true;
continue;
}
//....

或者节点关闭 Master Gone
private void handleMasterGone(final DiscoveryNode masterNode, final String reason) {
if (lifecycleState() != Lifecycle.State.STARTED) {
return;
}
if (master) {
return;
}
logger.info("master_left [{}], reason [{}]", masterNode, reason);
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (!masterNode.id().equals(currentState.nodes().masterNodeId())) {
return currentState;
}
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.newNodesBuilder()
.putAll(currentState.nodes())
.remove(masterNode.id())
.masterNodeId(null);
if (!electMaster.hasEnoughMasterNodes(nodesBuilder.build())) {
return rejoin(ClusterState.builder().state(currentState).nodes(nodesBuilder).build(), "not enough master nodes after master left (reason = " + reason + ")");
}
final DiscoveryNode electedMaster = electMaster.electMaster(nodesBuilder.build()); // 选举Master

 


Master选举


首先必须是皇子(node.master: true),具体哪皇子成为皇帝呢? 看天意啊,最先启动的那个节点。老臣认为。当立嫡长子为太子,成为皇帝啊,这样江山社稷才能稳固啊,(一阵激动)省略上万句。。。好了,演完戏了,看代码。

 
ZenDiscovery模块启动的时候,要加入集群。findMaster 方法里,Ping一堆节点出来,Ping就是发现节点,这里的Ping不是Linux的命令Ping,是向ES的9300端口发送数据的意思。Linux的Ping是可以禁止的,不能因为命令Ping不通机器,就认为相互不能发现节点。Ping有组播MulticastZenPing和单播UnicastZenPing 两种。如果节点少,用单播也可以。组播在一些环境下可能无法相互发现节点,或者被安全软件识别为恶意程序。节点列表确定后。交给 ElectMasterService 去选举,快排后的第一个节点
//ElectMasterService
/**
* Elects a new master out of the possible nodes, returning it. Returns <tt>null</tt>
* if no master has been elected.
*/
public DiscoveryNode electMaster(Iterable<DiscoveryNode> nodes) {
List<DiscoveryNode> sortedNodes = sortedMasterNodes(nodes);
if (sortedNodes == null || sortedNodes.isEmpty()) {
return null;
}
return sortedNodes.get(0);
}

private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
List<DiscoveryNode> possibleNodes = Lists.newArrayList(nodes);
if (possibleNodes.isEmpty()) {
return null;
}
// clean non master nodes
for (Iterator<DiscoveryNode> it = possibleNodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!node.masterNode()) {
it.remove();
}
}
CollectionUtil.quickSort(possibleNodes, nodeComparator);
return possibleNodes;
}
对了,皇帝上位以后,第一件事情是发布圣旨,昭告天下,以后寡人就是皇帝了。

怎么看,现在谁是皇帝呢?
 
curl http://localhost:9200/_cat/master?v

脑裂问题
 
关于brain split脑裂问题,可以看这个:
如何避免脑裂: http://blog.trifork.com/2013/10/24/how-to-avoid-the-split-brain-problem-in-elasticsearch/  
官方讨论:https://github.com/elasticsearch/elasticsearch/issues/2488 
 
最后,为了让大家对皇帝有个感性的认识,赠图一张,不谢!
hd.png

原文分享:http://www.codeweblog.com/elasticsearch-%E6%BA%90%E4%BB%A3%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8Bmaster%E9%80%89%E4%B8%BE/ 

消费kafka数据出错Found a message larger than the maximum fetch size

采菊篱下 回复了问题 • 2 人关注 • 1 个回复 • 353 次浏览 • 2017-01-05 14:18 • 来自相关话题

如何为Kafka集群选择合适的主题和分区数量

Ansible 发表了文章 • 0 个评论 • 326 次浏览 • 2016-12-26 00:03 • 来自相关话题

这是许多Kafka用户提出的常见问题。 这篇文章的目的是解释几个重要的决定因素,并提供一些简单的公式。
 

更多的分区提供更高的吞吐量

首先要理解的是,主题分区是Kafka中并行性的单位。 在生产者和代理端,对不同分区的写入可以完全并行完成。 因此,昂贵的操作(例如压缩)可以利用更多的硬件资源。 在消费者方面,Kafka总是将一个分区的数据提供给一个消费者线程。 因此,消费者(在消费者组内)的并行度受所消耗的分区的数量的限制。 因此,一般来说,Kafka集群中的分区越多,吞吐量就越高。

用于选择分区数量的粗略公式基于吞吐量。 你衡量整个,你可以在生产单个分区(称之为P)和消费实现(称为C)。 比方说,你的目标吞吐量吨 。 然后,你需要至少有MAX(T / P,T / C)分区。 人们可以在生产者上实现的每分区吞吐量取决于诸如批处理大小,压缩编解码器,确认类型,复制因子等等的配置。然而,一般来说,可以在10秒的MB /如在此所示单个分区的基准 。 消费者吞吐量通常是与应用相关的,因为它对应于消费者逻辑可以如何快速地处理每个消息。 所以,你真的需要测量它。

虽然可以随着时间增加分区的数量,但是如果使用密钥生成消息,则必须小心。 当发布带密钥的消息时,Kafka基于密钥的散列将消息确定性地映射到分区。 这提供了保证具有相同密钥的消息总是路由到同一分区。 此保证对于某些应用可能是重要的,因为分区内的消息总是按顺序递送给消费者。 如果分区的数量改变,则这样的保证可能不再保持。 为了避免这种情况,一种常见的做法是过度分区。 基本上,您可以根据未来的目标吞吐量确定分区数,比如一两年后。 最初,您可以根据您的当前吞吐量只有一个小的Kafka集群。 随着时间的推移,您可以向集群添加更多代理,并按比例将现有分区的一部分移动到新代理(可以在线完成)。 这样,当使用密钥时,您可以跟上吞吐量增长,而不会破坏应用程序中的语义。

除了吞吐量,还有一些其他因素,在选择分区数时值得考虑。 正如你将看到的,在某些情况下,分区太多也可能产生负面影响。
 

更多分区需要打开更多的文件句柄

每个分区映射到broker中文件系统中的目录。 在该日志目录中,每个日志段将有两个文件(一个用于索引,另一个用于实际数据)。 目前,在Kafka中,每个broker打开每个日志段的索引和数据文件的文件句柄。 因此,分区越多,底层操作系统中需要配置打开文件句柄限制的分区越多。 这大多只是一个配置问题。 我们已经看到生产Kafka集群中,运行的每个broker打开的文件句柄数超过30000。
 

更多分区可能是不可用性增加

Kafka支持群集内复制 ,它提供更高的可用性和耐用性。 分区可以有多个副本,每个副本存储在不同的代理上。 其中一个副本被指定为领导者,其余的副本是追随者。 在内部,Kafka自动管理所有这些副本,并确保它们保持同步。 生产者和消费者对分区的请求都在领导副本上提供。 当代理失败时,该代理上具有leader的分区变得暂时不可用。 Kafka将自动将那些不可用分区的领导者移动到其他一些副本以继续服务客户端请求。 这个过程由指定为控制器的Kafka代理之一完成。 它涉及在ZooKeeper中为每个受影响的分区读取和写入一些元数据。 目前,ZooKeeper的操作是在控制器中串行完成的。

在通常情况下,当代理被干净地关闭时,控制器将主动地将领导者一次一个地关闭关闭代理。 单个领导者的移动只需要几毫秒。 因此,从客户的角度来看,在干净的代理关闭期间只有一小窗口的不可用性。

然而,当代理不干净地关闭(例如,kill -9)时,所观察到的不可用性可能与分区的数量成比例。 假设代理具有总共2000个分区,每个具有2个副本。 大致来说,这个代理将是大约1000个分区的领导者。 当这个代理不干净失败时,所有这1000个分区完全不能同时使用。 假设为单个分区选择新的领导需要5毫秒。 对于所有1000个分区,选择新的领导将需要5秒钟。 因此,对于一些分区,它们的观察到的不可用性可以是5秒加上检测故障所花费的时间。

如果一个是不幸的,失败的代理可能是控制器。 在这种情况下,选举新领导者的过程将不会开始,直到控制器故障转移到新的代理。 控制器故障转移自动发生,但需要新控制器在初始化期间从ZooKeeper读取每个分区的一些元数据。 例如,如果Kafka集群中有10,000个分区,并且从ZooKeeper初始化元数据每个分区需要2ms,则可以向不可用性窗口添加20多秒。

一般来说,不洁的故障是罕见的。 但是,如果在这种极少数情况下关心可用性,则可能最好将每个代理的分区数限制为两个到四千个,集群中的分区总数限制到几万个。
 

更多分区可能会增加端到端延迟

Kafka中的端到端延迟由从生产者发布消息到消费者读取消息的时间来定义。 Kafka仅在提交后向消费者公开消息,即,当消息被复制到所有同步副本时。 因此,提交消息的时间可以是端到端等待时间的显着部分。 默认情况下,对于在两个代理之间共享副本的所有分区,Kafka代理仅使用单个线程从另一个代理复制数据。 我们的实验表明,将1000个分区从一个代理复制到另一个代理可以添加大约20毫秒的延迟,这意味着端到端延迟至少为20毫秒。 这对于一些实时应用来说可能太高。

请注意,此问题在较大的群集上减轻。 例如,假设代理上有1000个分区引导,并且在同一个Kafka集群中有10个其他代理。 剩余的10个代理中的每个仅需要从第一代理平均获取100个分区。 因此,由于提交消息而增加的延迟将仅为几毫秒,而不是几十毫秒。

作为一个经验法则,如果你关心的延迟,它可能是一个好主意,每个经纪人的分区数量限制为100 * b * r,其中b是经纪人在kafka集群的数量和r是复制因子。
 

更多分区可能需要更多的内存在客户端

在最新的0.8.2版本,我们汇合到我们平台1.0,我们已经开发出一种更有效的Java生产商。 新生产者的一个好的功能是它允许用户设置用于缓冲传入消息的内存量的上限。 在内部,生产者缓冲每个分区的消息。 在累积了足够的数据或已经过去足够的时间之后,累积的消息从缓冲器中移除并发送到代理。

如果增加分区的数量,消息将在生产者中的更多分区中累积。 所使用的内存总量现在可能超过配置的内存限制。 当这种情况发生时,生产者必须阻止或丢弃任何新的消息,这两者都不是理想的。 为了防止这种情况发生,需要重新配置具有较大内存大小的生产者。

作为经验法则,为了实现良好的吞吐量,应该在生成器中分配至少几十KB的每个分区,并且如果分区的数量显着增加,则调整存储器的总量。

消费者也存在类似的问题。 消费者每个分区获取一批消息。 消费者消耗的分区越多,需要的内存就越多。 然而,这通常只是不是实时的消费者的问题。
 

总结

通常,Kafka集群中的更多分区导致更高的吞吐量。 然而,人们不得不意识到在总体上或每个代理中具有过多分区对可用性和等待时间的潜在影响。 在未来,我们计划改进一些限制,使Kafka在分区数方面更具可扩展性。
 

翻译原文:https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/  
作者:饶俊 查看全部
kafka.png

这是许多Kafka用户提出的常见问题。 这篇文章的目的是解释几个重要的决定因素,并提供一些简单的公式。
 


更多的分区提供更高的吞吐量


首先要理解的是,主题分区是Kafka中并行性的单位。 在生产者和代理端,对不同分区的写入可以完全并行完成。 因此,昂贵的操作(例如压缩)可以利用更多的硬件资源。 在消费者方面,Kafka总是将一个分区的数据提供给一个消费者线程。 因此,消费者(在消费者组内)的并行度受所消耗的分区的数量的限制。 因此,一般来说,Kafka集群中的分区越多,吞吐量就越高。

用于选择分区数量的粗略公式基于吞吐量。 你衡量整个,你可以在生产单个分区(称之为P)和消费实现(称为C)。 比方说,你的目标吞吐量吨 。 然后,你需要至少有MAX(T / P,T / C)分区。 人们可以在生产者上实现的每分区吞吐量取决于诸如批处理大小,压缩编解码器,确认类型,复制因子等等的配置。然而,一般来说,可以在10秒的MB /如在此所示单个分区的基准 。 消费者吞吐量通常是与应用相关的,因为它对应于消费者逻辑可以如何快速地处理每个消息。 所以,你真的需要测量它。

虽然可以随着时间增加分区的数量,但是如果使用密钥生成消息,则必须小心。 当发布带密钥的消息时,Kafka基于密钥的散列将消息确定性地映射到分区。 这提供了保证具有相同密钥的消息总是路由到同一分区。 此保证对于某些应用可能是重要的,因为分区内的消息总是按顺序递送给消费者。 如果分区的数量改变,则这样的保证可能不再保持。 为了避免这种情况,一种常见的做法是过度分区。 基本上,您可以根据未来的目标吞吐量确定分区数,比如一两年后。 最初,您可以根据您的当前吞吐量只有一个小的Kafka集群。 随着时间的推移,您可以向集群添加更多代理,并按比例将现有分区的一部分移动到新代理(可以在线完成)。 这样,当使用密钥时,您可以跟上吞吐量增长,而不会破坏应用程序中的语义。

除了吞吐量,还有一些其他因素,在选择分区数时值得考虑。 正如你将看到的,在某些情况下,分区太多也可能产生负面影响。
 


更多分区需要打开更多的文件句柄


每个分区映射到broker中文件系统中的目录。 在该日志目录中,每个日志段将有两个文件(一个用于索引,另一个用于实际数据)。 目前,在Kafka中,每个broker打开每个日志段的索引和数据文件的文件句柄。 因此,分区越多,底层操作系统中需要配置打开文件句柄限制的分区越多。 这大多只是一个配置问题。 我们已经看到生产Kafka集群中,运行的每个broker打开的文件句柄数超过30000。
 


更多分区可能是不可用性增加


Kafka支持群集内复制 ,它提供更高的可用性和耐用性。 分区可以有多个副本,每个副本存储在不同的代理上。 其中一个副本被指定为领导者,其余的副本是追随者。 在内部,Kafka自动管理所有这些副本,并确保它们保持同步。 生产者和消费者对分区的请求都在领导副本上提供。 当代理失败时,该代理上具有leader的分区变得暂时不可用。 Kafka将自动将那些不可用分区的领导者移动到其他一些副本以继续服务客户端请求。 这个过程由指定为控制器的Kafka代理之一完成。 它涉及在ZooKeeper中为每个受影响的分区读取和写入一些元数据。 目前,ZooKeeper的操作是在控制器中串行完成的。

在通常情况下,当代理被干净地关闭时,控制器将主动地将领导者一次一个地关闭关闭代理。 单个领导者的移动只需要几毫秒。 因此,从客户的角度来看,在干净的代理关闭期间只有一小窗口的不可用性。

然而,当代理不干净地关闭(例如,kill -9)时,所观察到的不可用性可能与分区的数量成比例。 假设代理具有总共2000个分区,每个具有2个副本。 大致来说,这个代理将是大约1000个分区的领导者。 当这个代理不干净失败时,所有这1000个分区完全不能同时使用。 假设为单个分区选择新的领导需要5毫秒。 对于所有1000个分区,选择新的领导将需要5秒钟。 因此,对于一些分区,它们的观察到的不可用性可以是5秒加上检测故障所花费的时间。

如果一个是不幸的,失败的代理可能是控制器。 在这种情况下,选举新领导者的过程将不会开始,直到控制器故障转移到新的代理。 控制器故障转移自动发生,但需要新控制器在初始化期间从ZooKeeper读取每个分区的一些元数据。 例如,如果Kafka集群中有10,000个分区,并且从ZooKeeper初始化元数据每个分区需要2ms,则可以向不可用性窗口添加20多秒。

一般来说,不洁的故障是罕见的。 但是,如果在这种极少数情况下关心可用性,则可能最好将每个代理的分区数限制为两个到四千个,集群中的分区总数限制到几万个。
 


更多分区可能会增加端到端延迟


Kafka中的端到端延迟由从生产者发布消息到消费者读取消息的时间来定义。 Kafka仅在提交后向消费者公开消息,即,当消息被复制到所有同步副本时。 因此,提交消息的时间可以是端到端等待时间的显着部分。 默认情况下,对于在两个代理之间共享副本的所有分区,Kafka代理仅使用单个线程从另一个代理复制数据。 我们的实验表明,将1000个分区从一个代理复制到另一个代理可以添加大约20毫秒的延迟,这意味着端到端延迟至少为20毫秒。 这对于一些实时应用来说可能太高。

请注意,此问题在较大的群集上减轻。 例如,假设代理上有1000个分区引导,并且在同一个Kafka集群中有10个其他代理。 剩余的10个代理中的每个仅需要从第一代理平均获取100个分区。 因此,由于提交消息而增加的延迟将仅为几毫秒,而不是几十毫秒。

作为一个经验法则,如果你关心的延迟,它可能是一个好主意,每个经纪人的分区数量限制为100 * b * r,其中b是经纪人在kafka集群的数量和r是复制因子。
 


更多分区可能需要更多的内存在客户端


在最新的0.8.2版本,我们汇合到我们平台1.0,我们已经开发出一种更有效的Java生产商。 新生产者的一个好的功能是它允许用户设置用于缓冲传入消息的内存量的上限。 在内部,生产者缓冲每个分区的消息。 在累积了足够的数据或已经过去足够的时间之后,累积的消息从缓冲器中移除并发送到代理。

如果增加分区的数量,消息将在生产者中的更多分区中累积。 所使用的内存总量现在可能超过配置的内存限制。 当这种情况发生时,生产者必须阻止或丢弃任何新的消息,这两者都不是理想的。 为了防止这种情况发生,需要重新配置具有较大内存大小的生产者。

作为经验法则,为了实现良好的吞吐量,应该在生成器中分配至少几十KB的每个分区,并且如果分区的数量显着增加,则调整存储器的总量。

消费者也存在类似的问题。 消费者每个分区获取一批消息。 消费者消耗的分区越多,需要的内存就越多。 然而,这通常只是不是实时的消费者的问题。
 


总结


通常,Kafka集群中的更多分区导致更高的吞吐量。 然而,人们不得不意识到在总体上或每个代理中具有过多分区对可用性和等待时间的潜在影响。 在未来,我们计划改进一些限制,使Kafka在分区数方面更具可扩展性。
 


翻译原文:https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/  
作者:饶俊


Druid每个任务内存调整

回复

采菊篱下 发起了问题 • 1 人关注 • 0 个回复 • 371 次浏览 • 2016-12-08 16:22 • 来自相关话题

Elasticsearch如何防止内存被交换出去

采菊篱下 回复了问题 • 2 人关注 • 1 个回复 • 349 次浏览 • 2016-12-07 11:00 • 来自相关话题

云主机IO性能测试报告

Nock 发表了文章 • 0 个评论 • 339 次浏览 • 2016-12-01 18:49 • 来自相关话题

不同厂商的云主机在性能、设计、操作方面都存在很大的差异化。
 

吐槽云商

1、阿里云 -- 地主黑势力
阿里的包月包年捆绑式销售,创建完成后你不能及时删除释放主机,只能默默的等到期了,记得我第一次使用阿里云的时候,一口气开了10台云主机,后面我发现我不需要这么多,预算出错,但是阿里告诉我的是你人生中只有一次反悔的机会,我只能删掉一台主机。
 
其他的主机我只能等他默认到期了,如果实在没有多大作用我只能浪费我的银两,然后阿里坐收消费,我就当打赏王坚一个小红包了。
 
2、Ucloud -- 清廉百姓官
Ucloud的业务是我最喜欢的,尤其是喜欢他们的服务和UI设计。在国内做市场,我想你最应该的是调研国内技术人员的操作习惯,然后把云产品的使用达到极简、易使用,而不是黑灯找芝麻。在我的心理Ucloud是国内在云主机的操作和体验上给我的感受是最佳的。 还有就是他们的服务,他们可以一对一的帮你解决问题,耐心的解答,没有阿里那财大气粗的蛮横,不会绑架你。
 
3、Azure -- 没长齐毛的鸟
为什么我说Azure是没有长齐毛的鸟,因为他的产品设计还没有完善,操作困难,可能Azure还没有大力投入做中国市场吧,好多功能都依赖于PowerShell去完成操作,比如挂盘、弹性公网ip的绑定申请等,我想说的是我是个运维,脑子里面记了一大堆命令,我使用你们产品,挂个盘你还得让我去记一大堆命令和规则,我疯了。so.......
 
我使用过Ucloud、阿里云、Azure、Google云、AWS等诸多厂商的云产品服务,具体的其他功能我就不一一对比,下面我就介绍一下,我最近做的IO测试几家厂商的报告。
 

IO测试

测试命令如下:
fio -filename=/data/test.out1 -direct=1 -rw=read -bs=4k -size=10g -numjobs=4 -runtime=1200 -group_reporting -name=test1

fio -filename=/data/test.out2 -direct=1 -rw=write -bs=4k -size=10g -numjobs=4 -runtime=1200 -group_reporting -name=test2

fio -filename=/data/test.out3 -direct=1 -rw=randread -bs=4k -size=10g -numjobs=4 -runtime=1200 -group_reporting -name=test3

fio -filename=/data/test.out4 -direct=1 -rw=randwrite -bs=4k -size=10g -numjobs=4 -runtime=1200 -group_reporting -name=test4
测试结果如下:
















上面分别是Ucloud、阿里、Azure、谷歌云的IO测试结果。
 

速率测试

测试命令如下:
dd iflag=direct </data/test.out3 >/dev/null bs=16k count=102400
dd oflag=direct </dev/zero >/data/test2 bs=16k count=102400
测试结果如下:




 

总结

废话不多说,以上的测试结果仅供大家参考,新时代的云运维者,必须要有语言基础了,没有语言基础,就算Ucloud的操作再怎么人性化,你也不能自动化,所以从云时代的趋势来看作为运维者掌握一门语言还是很有必要的了。云平台有很多坑,在没有充分了解厂商的情况下,你会发现云时代的运维更难。 查看全部
不同厂商的云主机在性能、设计、操作方面都存在很大的差异化。
 


吐槽云商


1、阿里云 -- 地主黑势力
阿里的包月包年捆绑式销售,创建完成后你不能及时删除释放主机,只能默默的等到期了,记得我第一次使用阿里云的时候,一口气开了10台云主机,后面我发现我不需要这么多,预算出错,但是阿里告诉我的是你人生中只有一次反悔的机会,我只能删掉一台主机。
 
其他的主机我只能等他默认到期了,如果实在没有多大作用我只能浪费我的银两,然后阿里坐收消费,我就当打赏王坚一个小红包了。
 
2、Ucloud -- 清廉百姓官
Ucloud的业务是我最喜欢的,尤其是喜欢他们的服务和UI设计。在国内做市场,我想你最应该的是调研国内技术人员的操作习惯,然后把云产品的使用达到极简、易使用,而不是黑灯找芝麻。在我的心理Ucloud是国内在云主机的操作和体验上给我的感受是最佳的。 还有就是他们的服务,他们可以一对一的帮你解决问题,耐心的解答,没有阿里那财大气粗的蛮横,不会绑架你。
 
3、Azure -- 没长齐毛的鸟
为什么我说Azure是没有长齐毛的鸟,因为他的产品设计还没有完善,操作困难,可能Azure还没有大力投入做中国市场吧,好多功能都依赖于PowerShell去完成操作,比如挂盘、弹性公网ip的绑定申请等,我想说的是我是个运维,脑子里面记了一大堆命令,我使用你们产品,挂个盘你还得让我去记一大堆命令和规则,我疯了。so.......
 
我使用过Ucloud、阿里云、Azure、Google云、AWS等诸多厂商的云产品服务,具体的其他功能我就不一一对比,下面我就介绍一下,我最近做的IO测试几家厂商的报告。
 


IO测试


测试命令如下:
fio -filename=/data/test.out1 -direct=1 -rw=read -bs=4k -size=10g -numjobs=4 -runtime=1200 -group_reporting -name=test1

fio -filename=/data/test.out2 -direct=1 -rw=write -bs=4k -size=10g -numjobs=4 -runtime=1200 -group_reporting -name=test2

fio -filename=/data/test.out3 -direct=1 -rw=randread -bs=4k -size=10g -numjobs=4 -runtime=1200 -group_reporting -name=test3

fio -filename=/data/test.out4 -direct=1 -rw=randwrite -bs=4k -size=10g -numjobs=4 -runtime=1200 -group_reporting -name=test4

测试结果如下:
Ucloud.png

ali.png

Azure.png

GoogleCloud.png

上面分别是Ucloud、阿里、Azure、谷歌云的IO测试结果。
 


速率测试


测试命令如下:
dd iflag=direct </data/test.out3 >/dev/null bs=16k count=102400
dd oflag=direct </dev/zero >/data/test2 bs=16k count=102400

测试结果如下:
mancloud.png

 


总结


废话不多说,以上的测试结果仅供大家参考,新时代的云运维者,必须要有语言基础了,没有语言基础,就算Ucloud的操作再怎么人性化,你也不能自动化,所以从云时代的趋势来看作为运维者掌握一门语言还是很有必要的了。云平台有很多坑,在没有充分了解厂商的情况下,你会发现云时代的运维更难。

关于Elasticsearch性能优化几个点

小白菜 发表了文章 • 0 个评论 • 953 次浏览 • 2016-11-13 12:07 • 来自相关话题

Elasticsearch简述

ElasticSearch是现在技术前沿的大数据引擎,常见的组合有ES+Logstash+Kibana作为一套成熟的日志系统,其中Logstash是ETL工具,Kibana是数据分析展示平台。Elasticsearch让人惊艳的是他强大的搜索相关能力和灾备策略,Elastcisearch开放了一些接口供开发者研发自己的插件,Elasticsearch结合中文分词的插件会给Elasticsearch的搜索和分析起到很大的推动作用。ElasticSearch是使用开源全文检索库Apache Lucene进行索引和搜索的,所以Elasticsearch底层是依赖的Lucene。
 
关于Lucene:
Apache Lucene将写入索引的所有信息组织成一种倒排索引(Inverted Index)的结构之中,该结构是种将词项映射到文档的数据结构。其工作方式与传统的关系数据库不同,大致来说倒排索引是面向词项而不是面向文档的。且Lucene索引之中还存储了很多其他的信息,如词向量等等,每个Lucene都是由多个段构成的,每个段只会被创建一次但会被查询多次,段一旦创建就不会再被修改。多个段会在段合并的阶段合并在一起,何时合并由Lucene的内在机制决定,段合并后数量会变少,但是相应的段本身会变大。段合并的过程是非常消耗I/O的,且与之同时会有些不再使用的信息被清理掉。在Lucene中,将数据转化为倒排索引,将完整串转化为可用于搜索的词项的过程叫做分析。文本分析由分析器(Analyzer)来执行,分析其由分词器(Tokenizer),过滤器(Filter)和字符映射器(Character Mapper)组成,其各个功能显而易见。除此之外,Lucene有自己的一套完整的查询语言来帮助我们进行搜索和读写。
 
*注:Elasticsearch中的索引指的是查询/寻址时URI中的一个字段如:[host]:[port(9200)]/[index]/[type]/[ID]?[option],而Lucene中的索引更多地和ES中的分片的概念相对应。
 
Elasticsearch架构设计理念特性如下:
合理的默认配置:只需修改节点中的Yaml配置文件,就可以快速配置。这和Spring4中对配置的简化有相似的地方。分布式工作模式:Elasticsearch强大的Zen发现机制不仅支持组广播也支持点单播,且有“知一点即知天下”之妙。对等架构:节点之间自动备份分片,且使分片本身和样本之间尽量”远离“,可以避免单点故障。且Master节点和Data节点几乎完全等价。易于向集群扩充新节点:大大简化研发或运维将新节点加入集群所需的工作。不对索引中的数据结构增加任何限制:ES支持在一个索引之中存在多种数据类型。准实时:搜索和版本同步,由于ES是分布式应用,一个重大的挑战就是一致性问题,无论索引还是文档数据,然而事实证明ES表现优秀。
 

分片策略

选择合适的分片数和副本数:
ES的分片分为两种,主分片(Primary Shard)和副本(Replicas)。默认情况下,ES会为每个索引创建5个分片,即使是在单机环境下,这种冗余被称作过度分配(Over Allocation),目前看来这么做完全没有必要,仅在散布文档到分片和处理查询的过程中就增加了更多的复杂性,好在ES的优秀性能掩盖了这一点。假设一个索引由一个分片构成,那么当索引的大小超过单个节点的容量的时候,ES不能将索引分割成多份,因此必须在创建索引的时候就指定好需要的分片数量。此时我们所能做的就是创建一个新的索引,并在初始设定之中指定这个索引拥有更多的分片。反之如果过度分配,就增大了Lucene在合并分片查询结果时的复杂度,从而增大了耗时,所以我们得到了以下结论: 我们应该使用最少的分片!
 
主分片,副本和节点最大数之间数量存在以下关系:
节点数 <= 主分片数 *(副本数+1) NodeNum <= PNum * ( Rnum + 1 )这个关系,其实就是保持最好一个数据节点,最好保存一个索引的一个分片(不管主副)。
 
控制分片分配行为:
以上是在创建每个索引的时候需要考虑的优化方法,然而在索引已创建好的前提下,是否就是没有办法从分片的角度提高了性能了呢?当然不是,首先能做的是调整分片分配器的类型,具体是在elasticsearch.yml中设置cluster.routing.allocation.type属性,共有两种分片器even_shard,balanced(默认)。even_shard是尽量保证每个节点都具有相同数量的分片,balanced是基于可控制的权重进行分配,相对于前一个分配器,它更暴漏了一些参数而引入调整分配过程的能力。
 
每次ES的分片调整都是在ES上的数据分布发生了变化的时候进行的,最有代表性的就是有新的数据节点加入了集群的时候。当然调整分片的时机并不是由某个阈值触发的,ES内置十一个裁决者来决定是否触发分片调整,这里暂不赘述。另外,这些分配部署策略都是可以在运行时更新的,更多配置分片的属性也请大家自行Google。
 

路由优化

ES中所谓的路由和IP网络不同,是一个类似于Tag的东西。在创建文档的时候,可以通过字段为文档增加一个路由属性的Tag。ES内在机制决定了拥有相同路由属性的文档,一定会被分配到同一个分片上,无论是主分片还是副本。那么,在查询的过程中,一旦指定了感兴趣的路由属性,ES就可以直接到相应的分片所在的机器上进行搜索,而避免了复杂的分布式协同的一些工作,从而提升了ES的性能。于此同时,假设机器1上存有路由属性A的文档,机器2上存有路由属性为B的文档,那么我在查询的时候一旦指定目标路由属性为A,即使机器2故障瘫痪,对机器1构不成很大影响,所以这么做对灾况下的查询也提出了解决方案。所谓的路由,本质上是一个分桶(Bucketing)操作。当然,查询中也可以指定多个路由属性,机制大同小异。
 

Elasticsearch GC调优

ElasticSearch本质上是个Java程序,所以配置JVM垃圾回收器本身也是一个很有意义的工作。我们使用JVM的Xms和Xmx参数来提供指定内存大小,本质上提供的是JVM的堆空间大小,当JVM的堆空间不足的时候就会触发致命的OutOfMemoryException。这意味着要么内存不足,要么出现了内存泄露。处理GC问题,首先要确定问题的源头,一般有三种方案:
开启ElasticSearch上的GC日志 使用jstat命令生成内存Dump
第一条,在ES的配置文件elasticsearch.yml中有相关的属性可以配置,关于每个属性的用途这里当然说不完。
第二条,jstat命令可以帮助我们查看JVM堆中各个区的使用情况和GC的耗时情况。
第三条,最后的办法就是将JVM的堆空间转储到文件中去,实质上是对JVM堆空间的一个快照。

想了解更多关于JVM本身GC调优方法请参考:http://www.oracle.com/technetwork/java/javase/gc-tuning-6-140523.html 

另外,通过修改ES节点的启动参数,也可以调整GC的方式,但是实质上和上述方法是等同的。
 

避免内存交换

这一点很简单,由于操作系统的虚拟内存页交换机制,会给性能带来障碍,如数据写满内存会写入Linux中的Swap分区。

可以通过在elasticsearch.yml文件中的bootstrap.mlockall设置为true来实现,但是需要管理员权限,需要修改操作系统的相关配置文件。
 

控制索引合并

上文提到过,ES中的分片和副本本质上都是Lucene索引,而Lucene索引又基于多个索引段构建(至少一个),索引文件中的绝大多数都是只被写一次,读多次,在Lucene内在机制控制下,当满足某种条件的时候多个索引段会被合并到一个更大的索引段,而那些旧的索引段会被抛弃并移除磁盘,这个操作叫做段合并。 

Lucene要执行段合并的理由很简单充分:索引段粒度越小,查询性能越低且耗费的内存越多。频繁的文档更改操作会导致大量的小索引段,从而导致文件句柄打开过多的问题,如修改系统配置,增大系统允许的最大文件打开数。总的来讲,当索引段由多一个合并为一个的时候,会减少索引段的数量从而提高ES性能。对于研发者来讲,我们所能做的就是选择合适的合并策略,尽管段合并完全是Lucene的任务,但随着Lucene开放更多配置借口,新版本的ES还是提供了三种合并的策略tiered,log_byte_size,log_doc。另外,ES也提供了两种Lucene索引段合并的调度器:concurrent和serial。其中各者具体区别,这里暂不赘述,只是抛砖引玉。

分享阅读原文:http://www.cnblogs.com/guguli/p/5218297.html 查看全部


Elasticsearch简述


ElasticSearch是现在技术前沿的大数据引擎,常见的组合有ES+Logstash+Kibana作为一套成熟的日志系统,其中Logstash是ETL工具,Kibana是数据分析展示平台。Elasticsearch让人惊艳的是他强大的搜索相关能力和灾备策略,Elastcisearch开放了一些接口供开发者研发自己的插件,Elasticsearch结合中文分词的插件会给Elasticsearch的搜索和分析起到很大的推动作用。ElasticSearch是使用开源全文检索库Apache Lucene进行索引和搜索的,所以Elasticsearch底层是依赖的Lucene。
 
关于Lucene:
Apache Lucene将写入索引的所有信息组织成一种倒排索引(Inverted Index)的结构之中,该结构是种将词项映射到文档的数据结构。其工作方式与传统的关系数据库不同,大致来说倒排索引是面向词项而不是面向文档的。且Lucene索引之中还存储了很多其他的信息,如词向量等等,每个Lucene都是由多个段构成的,每个段只会被创建一次但会被查询多次,段一旦创建就不会再被修改。多个段会在段合并的阶段合并在一起,何时合并由Lucene的内在机制决定,段合并后数量会变少,但是相应的段本身会变大。段合并的过程是非常消耗I/O的,且与之同时会有些不再使用的信息被清理掉。在Lucene中,将数据转化为倒排索引,将完整串转化为可用于搜索的词项的过程叫做分析。文本分析由分析器(Analyzer)来执行,分析其由分词器(Tokenizer),过滤器(Filter)和字符映射器(Character Mapper)组成,其各个功能显而易见。除此之外,Lucene有自己的一套完整的查询语言来帮助我们进行搜索和读写。
 
*注:Elasticsearch中的索引指的是查询/寻址时URI中的一个字段如:[host]:[port(9200)]/[index]/[type]/[ID]?[option],而Lucene中的索引更多地和ES中的分片的概念相对应。
 
Elasticsearch架构设计理念特性如下:
  1. 合理的默认配置:只需修改节点中的Yaml配置文件,就可以快速配置。这和Spring4中对配置的简化有相似的地方。
  2. 分布式工作模式:Elasticsearch强大的Zen发现机制不仅支持组广播也支持点单播,且有“知一点即知天下”之妙。
  3. 对等架构:节点之间自动备份分片,且使分片本身和样本之间尽量”远离“,可以避免单点故障。且Master节点和Data节点几乎完全等价。
  4. 易于向集群扩充新节点:大大简化研发或运维将新节点加入集群所需的工作。
  5. 不对索引中的数据结构增加任何限制:ES支持在一个索引之中存在多种数据类型。
  6. 准实时:搜索和版本同步,由于ES是分布式应用,一个重大的挑战就是一致性问题,无论索引还是文档数据,然而事实证明ES表现优秀。

 


分片策略


选择合适的分片数和副本数:
ES的分片分为两种,主分片(Primary Shard)和副本(Replicas)。默认情况下,ES会为每个索引创建5个分片,即使是在单机环境下,这种冗余被称作过度分配(Over Allocation),目前看来这么做完全没有必要,仅在散布文档到分片和处理查询的过程中就增加了更多的复杂性,好在ES的优秀性能掩盖了这一点。假设一个索引由一个分片构成,那么当索引的大小超过单个节点的容量的时候,ES不能将索引分割成多份,因此必须在创建索引的时候就指定好需要的分片数量。此时我们所能做的就是创建一个新的索引,并在初始设定之中指定这个索引拥有更多的分片。反之如果过度分配,就增大了Lucene在合并分片查询结果时的复杂度,从而增大了耗时,所以我们得到了以下结论: 我们应该使用最少的分片!
 
主分片,副本和节点最大数之间数量存在以下关系:
节点数 <= 主分片数 *(副本数+1)    NodeNum <= PNum * ( Rnum + 1 )
这个关系,其实就是保持最好一个数据节点,最好保存一个索引的一个分片(不管主副)。
 
控制分片分配行为:
以上是在创建每个索引的时候需要考虑的优化方法,然而在索引已创建好的前提下,是否就是没有办法从分片的角度提高了性能了呢?当然不是,首先能做的是调整分片分配器的类型,具体是在elasticsearch.yml中设置cluster.routing.allocation.type属性,共有两种分片器even_shard,balanced(默认)。even_shard是尽量保证每个节点都具有相同数量的分片,balanced是基于可控制的权重进行分配,相对于前一个分配器,它更暴漏了一些参数而引入调整分配过程的能力。
 
每次ES的分片调整都是在ES上的数据分布发生了变化的时候进行的,最有代表性的就是有新的数据节点加入了集群的时候。当然调整分片的时机并不是由某个阈值触发的,ES内置十一个裁决者来决定是否触发分片调整,这里暂不赘述。另外,这些分配部署策略都是可以在运行时更新的,更多配置分片的属性也请大家自行Google。
 


路由优化


ES中所谓的路由和IP网络不同,是一个类似于Tag的东西。在创建文档的时候,可以通过字段为文档增加一个路由属性的Tag。ES内在机制决定了拥有相同路由属性的文档,一定会被分配到同一个分片上,无论是主分片还是副本。那么,在查询的过程中,一旦指定了感兴趣的路由属性,ES就可以直接到相应的分片所在的机器上进行搜索,而避免了复杂的分布式协同的一些工作,从而提升了ES的性能。于此同时,假设机器1上存有路由属性A的文档,机器2上存有路由属性为B的文档,那么我在查询的时候一旦指定目标路由属性为A,即使机器2故障瘫痪,对机器1构不成很大影响,所以这么做对灾况下的查询也提出了解决方案。所谓的路由,本质上是一个分桶(Bucketing)操作。当然,查询中也可以指定多个路由属性,机制大同小异。
 


Elasticsearch GC调优


ElasticSearch本质上是个Java程序,所以配置JVM垃圾回收器本身也是一个很有意义的工作。我们使用JVM的Xms和Xmx参数来提供指定内存大小,本质上提供的是JVM的堆空间大小,当JVM的堆空间不足的时候就会触发致命的OutOfMemoryException。这意味着要么内存不足,要么出现了内存泄露。处理GC问题,首先要确定问题的源头,一般有三种方案:
  1. 开启ElasticSearch上的GC日志
  2.  使用jstat命令
  3. 生成内存Dump

第一条,在ES的配置文件elasticsearch.yml中有相关的属性可以配置,关于每个属性的用途这里当然说不完。
第二条,jstat命令可以帮助我们查看JVM堆中各个区的使用情况和GC的耗时情况。
第三条,最后的办法就是将JVM的堆空间转储到文件中去,实质上是对JVM堆空间的一个快照。

想了解更多关于JVM本身GC调优方法请参考:http://www.oracle.com/technetwork/java/javase/gc-tuning-6-140523.html 

另外,通过修改ES节点的启动参数,也可以调整GC的方式,但是实质上和上述方法是等同的。
 


避免内存交换


这一点很简单,由于操作系统的虚拟内存页交换机制,会给性能带来障碍,如数据写满内存会写入Linux中的Swap分区。

可以通过在elasticsearch.yml文件中的bootstrap.mlockall设置为true来实现,但是需要管理员权限,需要修改操作系统的相关配置文件。
 


控制索引合并


上文提到过,ES中的分片和副本本质上都是Lucene索引,而Lucene索引又基于多个索引段构建(至少一个),索引文件中的绝大多数都是只被写一次,读多次,在Lucene内在机制控制下,当满足某种条件的时候多个索引段会被合并到一个更大的索引段,而那些旧的索引段会被抛弃并移除磁盘,这个操作叫做段合并。 

Lucene要执行段合并的理由很简单充分:索引段粒度越小,查询性能越低且耗费的内存越多。频繁的文档更改操作会导致大量的小索引段,从而导致文件句柄打开过多的问题,如修改系统配置,增大系统允许的最大文件打开数。总的来讲,当索引段由多一个合并为一个的时候,会减少索引段的数量从而提高ES性能。对于研发者来讲,我们所能做的就是选择合适的合并策略,尽管段合并完全是Lucene的任务,但随着Lucene开放更多配置借口,新版本的ES还是提供了三种合并的策略tiered,log_byte_size,log_doc。另外,ES也提供了两种Lucene索引段合并的调度器:concurrent和serial。其中各者具体区别,这里暂不赘述,只是抛砖引玉。


分享阅读原文:http://www.cnblogs.com/guguli/p/5218297.html


ElasticsearchIllegalArgumentException 错误

采菊篱下 回复了问题 • 2 人关注 • 1 个回复 • 467 次浏览 • 2016-11-08 19:47 • 来自相关话题

Elasticsearch的UNASSIGNED shareds怎么恢复?

采菊篱下 回复了问题 • 2 人关注 • 2 个回复 • 507 次浏览 • 2016-11-08 10:54 • 来自相关话题

CDH Hadoop + HBase HA 部署详解

采菊篱下 发表了文章 • 0 个评论 • 637 次浏览 • 2016-11-07 21:07 • 来自相关话题

CDH 的部署和 Apache Hadoop 的部署是没有任何区别的。这里着重的是 HA的部署,需要特殊说明的是NameNode HA 需要依赖 Zookeeper
 

准备

Hosts文件配置:
cat > /etc/hosts << _HOSTS_
127.0.0.1 localhost
10.0.2.59 cdh-m1
10.0.2.60 cdh-m2
10.0.2.61 cdh-s1
_HOSTS_各个节点服务情况
cdh-m1 Zookeeper JournalNode NameNode DFSZKFailoverController HMaster
cdh-m2 Zookeeper JournalNode NameNode DFSZKFailoverController HMaster
cdh-s1 Zookeeper JournalNode DataNode HRegionServer对几个新服务说明下: 
JournalNode 用于同步 NameNode 元数据,和 Zookeeper 一样需要 2N+1个节点存活集群才可用。DFSZKFailoverController(ZKFC) 用于主备切换,类似 Keepalived 所扮演的角色。
 
NTP 服务
设置时区
rm -f /etc/localtime
ln -s /usr/share/zoneinfo/UTC /etc/localtime配置NTP Server
yum install -y ntp
cat > /etc/ntp.conf << _NTP_
driftfile /var/lib/ntp/drift

restrict default nomodify
restrict -6 default nomodify

server cn.ntp.org.cn prefer
server news.neu.edu.cn iburst
server dns.sjtu.edu.cn iburst
server 127.127.1.1 iburst

tinker dispersion 100
tinker step 1800
tinker stepout 3600
includefile /etc/ntp/crypto/pw

keys /etc/ntp/keys
_NTP_

# NTP启动时立即同步
cat >> /etc/ntp/step-tickers << _NTP_
server cn.ntp.org.cn prefer
server news.neu.edu.cn iburst
server dns.sjtu.edu.cn iburst
_NTP_

# 同步硬件时钟
cat >> /etc/sysconfig/ntpd << _NTPHW_
SYNC_HWCLOCK=yes
_NTPHW_启动并设置开机自启动
/etc/init.d/ntpd start
chkconfig ntpd on配置 NTP Client
yum install -y ntp
# 注意修改内网NTP Server地址
cat > /etc/ntp.conf << _NTP_
driftfile /var/lib/ntp/drift

restrict default nomodify
restrict -6 default nomodify

restrict 127.0.0.1
restrict -6 ::1

server 10.0.2.59 prefer

tinker dispersion 100
tinker step 1800
tinker stepout 3600
includefile /etc/ntp/crypto/pw

keys /etc/ntp/keys
_NTP_

# NTP启动时立即同步
cat >> /etc/ntp/step-tickers << _NTP_
server 10.0.2.59 prefer
_NTP_

# 同步硬件时钟
cat >> /etc/sysconfig/ntpd << _NTPHW_
SYNC_HWCLOCK=yes
_NTPHW_启动并设置开机自启动
/etc/init.d/ntpd start
chkconfig ntpd on检查 NTP 同步
ntpq -p

# 结果
remote refid st t when poll reach delay offset jitter
==============================================================================
*time7.aliyun.co 10.137.38.86 2 u 17 64 3 44.995 5.178 0.177
news.neu.edu.cn .INIT. 16 u - 64 0 0.000 0.000 0.000
202.120.2.90 .INIT. 16 u - 64 0 0.000 0.000 0.000JDK
创建目录
mkdir -p /data/{install,app,logs,pid,appData}
mkdir /data/appData/tmpcd /data/install
wget -c http://oracle.com/jdk-7u51-linux-x64.gz
tar xf jdk-7u51-linux-x64.gz -C /data/app
cd /data/app
ln -s jdk1.7.0_51 jdk1.7
cat >> /etc/profile << _PATH_
export JAVA_HOME=/data/app/jdk1.7
export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar
export PATH=\$JAVA_HOME/bin:\$PATH
_PATH_
source /etc/profile
创建运行账户
useradd -u 600 run安装包
http://archive.cloudera.com/cdh5/cdh/5/
cd /data/install
wget -c http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.4.5.tar.gz
wget -c http://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz
wget -c http://archive.cloudera.com/cdh5/cdh/5/hbase-1.0.0-cdh5.4.5.tar.gz

安装 Zookeeper

cd /data/install
tar xf zookeeper-3.4.5.tar.gz -C /data/app
cd /data/app
ln -s zookeeper-3.4.5 zookeeper设置环境变量
sed -i '/^export PATH=/i\export ZOOKEEPER_HOME=/data/app/zookeeper' /etc/profile
sed -i 's#export PATH=#&\$ZOOKEEPER_HOME/bin:#' /etc/profile
source /etc/profile删除无用文件
cd $ZOOKEEPER_HOME
rm -rf *xml *txt zookeeper-3.4.5.jar.* src recipes docs dist-maven contrib
rm -f $ZOOKEEPER_HOME/bin/*.cmd $ZOOKEEPER_HOME/bin/*.txt
rm -f $ZOOKEEPER_HOME/conf/zoo_sample.cfg创建数据目录
mkdir -p /data/appData/zookeeper/{data,logs}配置
cat > $ZOOKEEPER_HOME/conf/zoo.cfg << _ZOO_
tickTime=2000
initLimit=10
syncLimit=5
clientPort=2181
dataDir=/data/appData/zookeeper/data
dataLogDir=/data/appData/zookeeper/logs
server.1=cdh-m1:2888:3888
server.2=cdh-m2:2888:3888
server.3=cdh-s1:2888:3888
_ZOO_修改Zookeeper的日志打印方式,与日志路径设置
编辑
$ZOOKEEPER_HOME/bin/zkEnv.sh在27行后加入两个变量
ZOO_LOG_DIR=/data/logs/zookeeper
ZOO_LOG4J_PROP="INFO,ROLLINGFILE"创建 myid文件
# 注意myid与配置文件保持一致
echo 1 >/data/appData/zookeeper/data/myid设置目录权限
chown -R run.run /data/{app,appData,logs}启动、停止
# 启动
runuser - run -c 'zkServer.sh start'
# 停止
runuser - run -c 'zkServer.sh stop'

安装 Hadoop

tar xf hadoop-2.6.0-cdh5.4.5.tar.gz -C /data/app
cd /data/app
ln -s hadoop-2.6.0-cdh5.4.5 hadoop设置环境变量
sed -i '/^export PATH=/i\export HADOOP_HOME=/data/app/hadoop' /etc/profile
sed -i 's#export PATH=#&\$HADOOP_HOME/bin:\$HADOOP_HOME/sbin:#' /etc/profile
source /etc/profile删除无用文件
cd $HADOOP_HOME
rm -rf *txt share/doc src examples* include bin-mapreduce1 cloudera
find . -name "*.cmd"|xargs rm -f新建数据目录
mkdir -p /data/appData/hdfs/{name,edits,data,jn,tmp}配置
切换到配置文件目录
cd $HADOOP_HOME/etc/hadoop编辑 core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- HDFS 集群名称,可指定端口 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hdfs-cdh</value>
</property>

<!-- 临时文件目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/data/appData/hdfs/tmp</value>
</property>

<!-- 回收站设置,0不启用回收站,1440 表示1440分钟后删除 -->
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>

<!-- SequenceFiles在读写中可以使用的缓存大小,单位 bytes 默认 4096 -->
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>

<!-- 可用压缩算法,启用在hdfs-site.xml中,需要编译动态链接库才能用 -->
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
</configuration>编辑 hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定hdfs 集群名称,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>hdfs-cdh</value>
</property>

<!-- 指定 Zookeeper 用于NameNode HA,默认官方配置在core-site.xml中,为了查看清晰配置到hdfs-site.xml也是可用的 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>cdh-m1:2181,cdh-m2:2181,cdh-s1:2181</value>
</property>

<!-- hdfs-cdh 下有两个NameNode,分别为 nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.hdfs-cdh</name>
<value>nn1,nn2</value>
</property>

<!-- nn1 RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.hdfs-cdh.nn1</name>
<value>cdh-m1:9000</value>
</property>

<!-- nn1 HTTP通信地址 -->
<property>
<name>dfs.namenode.http-address.hdfs-cdh.nn1</name>
<value>cdh-m1:50070</value>
</property>

<!-- nn2 RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.hdfs-cdh.nn2</name>
<value>cdh-m2:9000</value>
</property>

<!-- nn2 HTTP通信地址 -->
<property>
<name>dfs.namenode.http-address.hdfs-cdh.nn2</name>
<value>cdh-m2:50070</value>
</property>

<!-- 指定NameNode元数据在JournalNode上的存储路径 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://cdh-m1:8485;cdh-m2:8485;cdh-s1:8485;/hdfs-cdh</value>
</property>

<!-- 开启NameNode失败自动切换 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>

<!-- 配置主备切换实现方式 -->
<property>
<name>dfs.client.failover.proxy.provider.hdfs-cdh</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- 配置主备切换方法,每个方法一行-->
<property>
<name>dfs.ha.fencing.methods</name>
<value>
sshfence
shell(/bin/true)
</value>
</property>

<!-- 指定运行用户的秘钥,需要NameNode双向免密码登录,用于主备自动切换 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/run/.ssh/id_rsa</value>
</property>

<!-- 配置sshfence 超时时间 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>50000</value>
</property>

<!-- NameNode 数据本地存储路径 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>/data/appData/hdfs/name</value>
</property>

<!-- DataNode 数据本地存储路径 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>/data/appData/hdfs/data</value>
</property>

<!-- JournalNode 数据本地存储路径 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/data/appData/hdfs/jn</value>
</property>

<!-- 修改文件存储到edits,定期同步到DataNode -->
<property>
<name>dfs.namenode.edits.noeditlogchannelflush</name>
<value>true</value>
</property>

<!-- edits 数据本地存储路径 -->
<property>
<name>dfs.namenode.edits.dir</name>
<value>/data/appData/hdfs/edits</value>
</property>

<!-- 开启Block Location metadata允许impala知道数据块在哪块磁盘上 默认关闭 -->
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>true</value>
</property>

<!-- 权限检查 默认开启 -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>

<!-- block 大小设置 -->
<property>
<name>dfs.blocksize</name>
<value>64m</value>
</property>
</configuration>小于5个DataNode建议添加如下配置
<!-- 数据副本数量,不能超过DataNode数量,大集群建议使用默认值 默认 3 -->
<property>
<name>dfs.replication</name>
<value>2</value>
</property>

<!-- 当副本写入失败时不分配新节点,小集群适用 -->
<property>
<name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
<value>NEVER</value>
</property>在 hadoop-env.sh 中添加如下变量
export JAVA_HOME=/data/app/jdk1.7
export HADOOP_LOG_DIR=/data/logs/hadoop
export HADOOP_PID_DIR=/data/pid
# SSH端口 可选
export HADOOP_SSH_OPTS="-p 36000"Heap 设置,单位 MB
export HADOOP_HEAPSIZE=1024权限设置
chown -R run.run /data/{app,appData,logs}
chmod 777 /data/pid格式化
格式化只需要执行一次,格式化之前启动Zookeeper
 
切换用户
su - run启动所有 JournalNode
hadoop-daemon.sh start journalnode格式化 Zookeeper(为 ZKFC 创建znode)
hdfs zkfc -formatZKNameNode 主节点格式化并启动
hdfs namenode -format
hadoop-daemon.sh start namenodeNameNode 备节点同步数据并启动
hdfs namenode -bootstrapStandby
hadoop-daemon.sh start namenode启动 ZKFC
hadoop-daemon.sh start zkfc启动 DataNode
hadoop-daemon.sh start datanode启动与停止
切换用户
su - run集群批量启动
需要配置运行用户ssh-key免密码登录,与$HADOOP_HOME/etc/hadoop/slaves
# 启动
start-dfs.sh
# 停止
stop-dfs.sh单服务启动停止
启动HDFS
hadoop-daemon.sh start journalnode
hadoop-daemon.sh start namenode
hadoop-daemon.sh start zkfc
hadoop-daemon.sh start datanode停止HDFS
hadoop-daemon.sh stop datanode
hadoop-daemon.sh stop namenode
hadoop-daemon.sh stop journalnode
hadoop-daemon.sh stop zkfc
测试
HDFS HA 测试
打开 NameNode 状态页:
http://cdh-m1:50010
http://cdh-m2:50010 

在 Overview 后面能看见 active 或 standby,active 为当前 Master,停止 active 上的 NameNode,检查 standby是否为 active。
 
HDFS 测试
hadoop fs -mkdir /test
hadoop fs -put /etc/hosts /test
hadoop fs -ls /test结果:
-rw-r--r-- 2 java supergroup 89 2016-06-15 10:30 /test/hosts
# 其中权限后面的列(这里的2)代表文件总数,即副本数量。HDFS 管理命令
# 动态加载 hdfs-site.xml
hadoop dfsadmin -refreshNodes

HBase安装配置

cd /data/install
tar xf hbase-1.0.0-cdh5.4.5.tar.gz -C /data/app
cd /data/app
ln -s hbase-1.0.0-cdh5.4.5 hbase设置环境变量
sed -i '/^export PATH=/i\export HBASE_HOME=/data/app/hbase' /etc/profile
sed -i 's#export PATH=#&\$HBASE_HOME/bin:#' /etc/profile
source /etc/profile删除无用文件
cd $HBASE_HOME
rm -rf *.txt pom.xml src docs cloudera dev-support hbase-annotations hbase-assembly hbase-checkstyle hbase-client hbase-common hbase-examples hbase-hadoop2-compat hbase-hadoop-compat hbase-it hbase-prefix-tree hbase-protocol hbase-rest hbase-server hbase-shell hbase-testing-util hbase-thrift
find . -name "*.cmd"|xargs rm -f配置
进入配置文件目录
cd $HBASE_HOME/conf编辑 hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- HBase 数据存储路径 -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://hdfs-cdh/hbase</value>
</property>

<!-- 完全分布式模式 -->
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>

<!-- HMaster 节点 -->
<property>
<name>hbase.master</name>
<value>cdh-m1:60000,cdh-m2:60000</value>
</property>

<!-- Zookeeper 节点 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>cdh-m1:2181,cdh-m2:2181,cdh-s1:2181</value>
</property>

<!-- znode 路径,Zookeeper集群中有多个HBase集群需要设置不同znode -->
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>

<!-- HBase 协处理器 -->
<property>
<name>hbase.coprocessor.user.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>
</configuration>在 hbase-env.sh 中添加如下变量
export JAVA_HOME=/data/app/jdk1.7
export HBASE_LOG_DIR=/data/logs/hbase
export HBASE_PID_DIR=/data/pid
export HBASE_MANAGES_ZK=false
# SSH 默认端口 可选
export HBASE_SSH_OPTS="-o ConnectTimeout=1 -p 36000"Heap 设置,单位 MB
export HBASE_HEAPSIZE=1024可选设置 regionservers 中添加所有RegionServer主机名,用于集群批量启动、停止
 
启动与停止
切换用户
su - run集群批量启动
需要配置运行用户ssh-key免密码登录,与$HBASE_HOME/conf/regionservers
# 启动
start-hbase.sh
# 停止
stop-hbase.sh单服务启动停止
HMaster
# 启动
hbase-daemon.sh start master
# 停止
hbase-daemon.sh stop masterHRegionServer
# 启动
hbase-daemon.sh start regionserver
# 停止
hbase-daemon.sh stop regionserver
测试
HBase HA 测试
浏览器打开两个HMaster状态页:
http://cdh-m1:60010
http://cdh-m2:60010 

可以在Master后面看见其中一个主机名,Backup Masters中看见另一个。
停止当前Master,刷新另一个HMaster状态页会发现Master后面已经切换,HA成功。
 
HBase 测试
进入hbase shell 执行:
create 'users','user_id','address','info'
list
put 'users','anton','info:age','24'
get 'users','anton'

# 最终结果
COLUMN CELL
info:age timestamp=1465972035945, value=24
1 row(s) in 0.0170 seconds清除测试数据:
disable 'users'
drop 'users'到这里安装就全部完成,不懂的地方可以留言交流! 查看全部
hadoop.jpg

CDH 的部署和 Apache Hadoop 的部署是没有任何区别的。这里着重的是 HA的部署,需要特殊说明的是NameNode HA 需要依赖 Zookeeper
 


准备


Hosts文件配置:
cat > /etc/hosts << _HOSTS_
127.0.0.1 localhost
10.0.2.59 cdh-m1
10.0.2.60 cdh-m2
10.0.2.61 cdh-s1
_HOSTS_
各个节点服务情况
cdh-m1 Zookeeper JournalNode NameNode DFSZKFailoverController HMaster
cdh-m2 Zookeeper JournalNode NameNode DFSZKFailoverController HMaster
cdh-s1 Zookeeper JournalNode DataNode HRegionServer
对几个新服务说明下: 
  • JournalNode 用于同步 NameNode 元数据,和 Zookeeper 一样需要 2N+1个节点存活集群才可用。
  • DFSZKFailoverController(ZKFC) 用于主备切换,类似 Keepalived 所扮演的角色。

 
NTP 服务
设置时区
rm -f /etc/localtime
ln -s /usr/share/zoneinfo/UTC /etc/localtime
配置NTP Server
yum install -y ntp
cat > /etc/ntp.conf << _NTP_
driftfile /var/lib/ntp/drift

restrict default nomodify
restrict -6 default nomodify

server cn.ntp.org.cn prefer
server news.neu.edu.cn iburst
server dns.sjtu.edu.cn iburst
server 127.127.1.1 iburst

tinker dispersion 100
tinker step 1800
tinker stepout 3600
includefile /etc/ntp/crypto/pw

keys /etc/ntp/keys
_NTP_

# NTP启动时立即同步
cat >> /etc/ntp/step-tickers << _NTP_
server cn.ntp.org.cn prefer
server news.neu.edu.cn iburst
server dns.sjtu.edu.cn iburst
_NTP_

# 同步硬件时钟
cat >> /etc/sysconfig/ntpd << _NTPHW_
SYNC_HWCLOCK=yes
_NTPHW_
启动并设置开机自启动
/etc/init.d/ntpd start
chkconfig ntpd on
配置 NTP Client
yum install -y ntp
# 注意修改内网NTP Server地址
cat > /etc/ntp.conf << _NTP_
driftfile /var/lib/ntp/drift

restrict default nomodify
restrict -6 default nomodify

restrict 127.0.0.1
restrict -6 ::1

server 10.0.2.59 prefer

tinker dispersion 100
tinker step 1800
tinker stepout 3600
includefile /etc/ntp/crypto/pw

keys /etc/ntp/keys
_NTP_

# NTP启动时立即同步
cat >> /etc/ntp/step-tickers << _NTP_
server 10.0.2.59 prefer
_NTP_

# 同步硬件时钟
cat >> /etc/sysconfig/ntpd << _NTPHW_
SYNC_HWCLOCK=yes
_NTPHW_
启动并设置开机自启动
/etc/init.d/ntpd start
chkconfig ntpd on
检查 NTP 同步
ntpq -p

# 结果
remote refid st t when poll reach delay offset jitter
==============================================================================
*time7.aliyun.co 10.137.38.86 2 u 17 64 3 44.995 5.178 0.177
news.neu.edu.cn .INIT. 16 u - 64 0 0.000 0.000 0.000
202.120.2.90 .INIT. 16 u - 64 0 0.000 0.000 0.000
JDK
创建目录
mkdir -p /data/{install,app,logs,pid,appData}
mkdir /data/appData/tmp
cd /data/install
wget -c http://oracle.com/jdk-7u51-linux-x64.gz
tar xf jdk-7u51-linux-x64.gz -C /data/app
cd /data/app
ln -s jdk1.7.0_51 jdk1.7
cat >> /etc/profile << _PATH_
export JAVA_HOME=/data/app/jdk1.7
export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar
export PATH=\$JAVA_HOME/bin:\$PATH
_PATH_
source /etc/profile

创建运行账户
useradd -u 600 run
安装包
http://archive.cloudera.com/cdh5/cdh/5/
cd /data/install
wget -c http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.4.5.tar.gz
wget -c http://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz
wget -c http://archive.cloudera.com/cdh5/cdh/5/hbase-1.0.0-cdh5.4.5.tar.gz


安装 Zookeeper


cd /data/install
tar xf zookeeper-3.4.5.tar.gz -C /data/app
cd /data/app
ln -s zookeeper-3.4.5 zookeeper
设置环境变量
sed -i '/^export PATH=/i\export ZOOKEEPER_HOME=/data/app/zookeeper' /etc/profile
sed -i 's#export PATH=#&\$ZOOKEEPER_HOME/bin:#' /etc/profile
source /etc/profile
删除无用文件
cd $ZOOKEEPER_HOME
rm -rf *xml *txt zookeeper-3.4.5.jar.* src recipes docs dist-maven contrib
rm -f $ZOOKEEPER_HOME/bin/*.cmd $ZOOKEEPER_HOME/bin/*.txt
rm -f $ZOOKEEPER_HOME/conf/zoo_sample.cfg
创建数据目录
mkdir -p /data/appData/zookeeper/{data,logs}
配置
cat > $ZOOKEEPER_HOME/conf/zoo.cfg << _ZOO_
tickTime=2000
initLimit=10
syncLimit=5
clientPort=2181
dataDir=/data/appData/zookeeper/data
dataLogDir=/data/appData/zookeeper/logs
server.1=cdh-m1:2888:3888
server.2=cdh-m2:2888:3888
server.3=cdh-s1:2888:3888
_ZOO_
修改Zookeeper的日志打印方式,与日志路径设置
编辑
$ZOOKEEPER_HOME/bin/zkEnv.sh
在27行后加入两个变量
ZOO_LOG_DIR=/data/logs/zookeeper
ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
创建 myid文件
# 注意myid与配置文件保持一致
echo 1 >/data/appData/zookeeper/data/myid
设置目录权限
chown -R run.run /data/{app,appData,logs}
启动、停止
# 启动
runuser - run -c 'zkServer.sh start'
# 停止
runuser - run -c 'zkServer.sh stop'


安装 Hadoop


tar xf hadoop-2.6.0-cdh5.4.5.tar.gz -C /data/app
cd /data/app
ln -s hadoop-2.6.0-cdh5.4.5 hadoop
设置环境变量
sed -i '/^export PATH=/i\export HADOOP_HOME=/data/app/hadoop' /etc/profile
sed -i 's#export PATH=#&\$HADOOP_HOME/bin:\$HADOOP_HOME/sbin:#' /etc/profile
source /etc/profile
删除无用文件
cd $HADOOP_HOME
rm -rf *txt share/doc src examples* include bin-mapreduce1 cloudera
find . -name "*.cmd"|xargs rm -f
新建数据目录
mkdir -p /data/appData/hdfs/{name,edits,data,jn,tmp}
配置
切换到配置文件目录
cd $HADOOP_HOME/etc/hadoop
编辑 core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- HDFS 集群名称,可指定端口 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hdfs-cdh</value>
</property>

<!-- 临时文件目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/data/appData/hdfs/tmp</value>
</property>

<!-- 回收站设置,0不启用回收站,1440 表示1440分钟后删除 -->
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>

<!-- SequenceFiles在读写中可以使用的缓存大小,单位 bytes 默认 4096 -->
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>

<!-- 可用压缩算法,启用在hdfs-site.xml中,需要编译动态链接库才能用 -->
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
</configuration>
编辑 hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定hdfs 集群名称,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>hdfs-cdh</value>
</property>

<!-- 指定 Zookeeper 用于NameNode HA,默认官方配置在core-site.xml中,为了查看清晰配置到hdfs-site.xml也是可用的 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>cdh-m1:2181,cdh-m2:2181,cdh-s1:2181</value>
</property>

<!-- hdfs-cdh 下有两个NameNode,分别为 nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.hdfs-cdh</name>
<value>nn1,nn2</value>
</property>

<!-- nn1 RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.hdfs-cdh.nn1</name>
<value>cdh-m1:9000</value>
</property>

<!-- nn1 HTTP通信地址 -->
<property>
<name>dfs.namenode.http-address.hdfs-cdh.nn1</name>
<value>cdh-m1:50070</value>
</property>

<!-- nn2 RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.hdfs-cdh.nn2</name>
<value>cdh-m2:9000</value>
</property>

<!-- nn2 HTTP通信地址 -->
<property>
<name>dfs.namenode.http-address.hdfs-cdh.nn2</name>
<value>cdh-m2:50070</value>
</property>

<!-- 指定NameNode元数据在JournalNode上的存储路径 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://cdh-m1:8485;cdh-m2:8485;cdh-s1:8485;/hdfs-cdh</value>
</property>

<!-- 开启NameNode失败自动切换 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>

<!-- 配置主备切换实现方式 -->
<property>
<name>dfs.client.failover.proxy.provider.hdfs-cdh</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- 配置主备切换方法,每个方法一行-->
<property>
<name>dfs.ha.fencing.methods</name>
<value>
sshfence
shell(/bin/true)
</value>
</property>

<!-- 指定运行用户的秘钥,需要NameNode双向免密码登录,用于主备自动切换 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/run/.ssh/id_rsa</value>
</property>

<!-- 配置sshfence 超时时间 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>50000</value>
</property>

<!-- NameNode 数据本地存储路径 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>/data/appData/hdfs/name</value>
</property>

<!-- DataNode 数据本地存储路径 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>/data/appData/hdfs/data</value>
</property>

<!-- JournalNode 数据本地存储路径 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/data/appData/hdfs/jn</value>
</property>

<!-- 修改文件存储到edits,定期同步到DataNode -->
<property>
<name>dfs.namenode.edits.noeditlogchannelflush</name>
<value>true</value>
</property>

<!-- edits 数据本地存储路径 -->
<property>
<name>dfs.namenode.edits.dir</name>
<value>/data/appData/hdfs/edits</value>
</property>

<!-- 开启Block Location metadata允许impala知道数据块在哪块磁盘上 默认关闭 -->
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>true</value>
</property>

<!-- 权限检查 默认开启 -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>

<!-- block 大小设置 -->
<property>
<name>dfs.blocksize</name>
<value>64m</value>
</property>
</configuration>
小于5个DataNode建议添加如下配置
<!-- 数据副本数量,不能超过DataNode数量,大集群建议使用默认值 默认 3 -->
<property>
<name>dfs.replication</name>
<value>2</value>
</property>

<!-- 当副本写入失败时不分配新节点,小集群适用 -->
<property>
<name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
<value>NEVER</value>
</property>
在 hadoop-env.sh 中添加如下变量
export JAVA_HOME=/data/app/jdk1.7
export HADOOP_LOG_DIR=/data/logs/hadoop
export HADOOP_PID_DIR=/data/pid
# SSH端口 可选
export HADOOP_SSH_OPTS="-p 36000"
Heap 设置,单位 MB
export HADOOP_HEAPSIZE=1024
权限设置
chown -R run.run /data/{app,appData,logs}
chmod 777 /data/pid
格式化
格式化只需要执行一次,格式化之前启动Zookeeper
 
切换用户
su - run
启动所有 JournalNode
hadoop-daemon.sh start journalnode
格式化 Zookeeper(为 ZKFC 创建znode)
hdfs zkfc -formatZK
NameNode 主节点格式化并启动
hdfs namenode -format
hadoop-daemon.sh start namenode
NameNode 备节点同步数据并启动
hdfs namenode -bootstrapStandby
hadoop-daemon.sh start namenode
启动 ZKFC
hadoop-daemon.sh start zkfc
启动 DataNode
hadoop-daemon.sh start datanode
启动与停止
切换用户
su - run
集群批量启动
需要配置运行用户ssh-key免密码登录,与$HADOOP_HOME/etc/hadoop/slaves
# 启动
start-dfs.sh
# 停止
stop-dfs.sh
单服务启动停止
启动HDFS
hadoop-daemon.sh start journalnode
hadoop-daemon.sh start namenode
hadoop-daemon.sh start zkfc
hadoop-daemon.sh start datanode
停止HDFS
hadoop-daemon.sh stop datanode
hadoop-daemon.sh stop namenode
hadoop-daemon.sh stop journalnode
hadoop-daemon.sh stop zkfc

测试
HDFS HA 测试
打开 NameNode 状态页:
http://cdh-m1:50010
http://cdh-m2:50010 

在 Overview 后面能看见 active 或 standby,active 为当前 Master,停止 active 上的 NameNode,检查 standby是否为 active。
 
HDFS 测试
hadoop fs -mkdir /test
hadoop fs -put /etc/hosts /test
hadoop fs -ls /test
结果:
-rw-r--r--   2 java supergroup         89 2016-06-15 10:30 /test/hosts
# 其中权限后面的列(这里的2)代表文件总数,即副本数量。
HDFS 管理命令
# 动态加载 hdfs-site.xml
hadoop dfsadmin -refreshNodes


HBase安装配置


cd /data/install
tar xf hbase-1.0.0-cdh5.4.5.tar.gz -C /data/app
cd /data/app
ln -s hbase-1.0.0-cdh5.4.5 hbase
设置环境变量
sed -i '/^export PATH=/i\export HBASE_HOME=/data/app/hbase' /etc/profile
sed -i 's#export PATH=#&\$HBASE_HOME/bin:#' /etc/profile
source /etc/profile
删除无用文件
cd $HBASE_HOME
rm -rf *.txt pom.xml src docs cloudera dev-support hbase-annotations hbase-assembly hbase-checkstyle hbase-client hbase-common hbase-examples hbase-hadoop2-compat hbase-hadoop-compat hbase-it hbase-prefix-tree hbase-protocol hbase-rest hbase-server hbase-shell hbase-testing-util hbase-thrift
find . -name "*.cmd"|xargs rm -f
配置
进入配置文件目录
cd $HBASE_HOME/conf
编辑 hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- HBase 数据存储路径 -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://hdfs-cdh/hbase</value>
</property>

<!-- 完全分布式模式 -->
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>

<!-- HMaster 节点 -->
<property>
<name>hbase.master</name>
<value>cdh-m1:60000,cdh-m2:60000</value>
</property>

<!-- Zookeeper 节点 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>cdh-m1:2181,cdh-m2:2181,cdh-s1:2181</value>
</property>

<!-- znode 路径,Zookeeper集群中有多个HBase集群需要设置不同znode -->
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>

<!-- HBase 协处理器 -->
<property>
<name>hbase.coprocessor.user.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>
</configuration>
在 hbase-env.sh 中添加如下变量
export JAVA_HOME=/data/app/jdk1.7
export HBASE_LOG_DIR=/data/logs/hbase
export HBASE_PID_DIR=/data/pid
export HBASE_MANAGES_ZK=false
# SSH 默认端口 可选
export HBASE_SSH_OPTS="-o ConnectTimeout=1 -p 36000"
Heap 设置,单位 MB
export HBASE_HEAPSIZE=1024
可选设置 regionservers 中添加所有RegionServer主机名,用于集群批量启动、停止
 
启动与停止
切换用户
su - run
集群批量启动
需要配置运行用户ssh-key免密码登录,与$HBASE_HOME/conf/regionservers
# 启动
start-hbase.sh
# 停止
stop-hbase.sh
单服务启动停止
HMaster
# 启动
hbase-daemon.sh start master
# 停止
hbase-daemon.sh stop master
HRegionServer
# 启动
hbase-daemon.sh start regionserver
# 停止
hbase-daemon.sh stop regionserver

测试
HBase HA 测试
浏览器打开两个HMaster状态页:
http://cdh-m1:60010
http://cdh-m2:60010 

可以在Master后面看见其中一个主机名,Backup Masters中看见另一个。
停止当前Master,刷新另一个HMaster状态页会发现Master后面已经切换,HA成功。
 
HBase 测试
进入hbase shell 执行:
create 'users','user_id','address','info'
list
put 'users','anton','info:age','24'
get 'users','anton'

# 最终结果
COLUMN CELL
info:age timestamp=1465972035945, value=24
1 row(s) in 0.0170 seconds
清除测试数据:
disable 'users'
drop 'users'
到这里安装就全部完成,不懂的地方可以留言交流!