Hadoop 2.6.0修改配置PID文件路径

being 发表了文章 0 个评论 4615 次浏览 2016-08-02 19:09 来自相关话题

修改原因 Hadoop启动后的PID文件默认配置是保存在 /tmp 目录下的,而linux下 /tmp 目录会定时清理,所以在集群运行一段时间后如果在停Hadoop相关服务是会出现类似:no datanode to stop 的错误提 ...查看全部


修改原因


Hadoop启动后的PID文件默认配置是保存在 /tmp 目录下的,而linux下 /tmp 目录会定时清理,所以在集群运行一段时间后如果在停Hadoop相关服务是会出现类似:no datanode to stop 的错误提示,一般生产环境中我们需要重新修改PID的保存路径。关于Linux定期清理tmp目录参考我之前的文章:http://openskill.cn/article/413
 


Hadoop修改


1、HDFS
增加或修改:$HADOOP_HOME/etc/hadoop/hadoop-env.sh如下:
# 修改为你想存放的路径
export HADOOP_PID_DIR=/data/hadoop/pids
export HADOOP_SECURE_DN_PID_DIR=${HADOOP_PID_DIR}

2、MapReduce
增加或修改:$HADOOP_HOME/etc/hadoop/mapred-env.sh如下:
# 修改mapred的pid存放路径
export HADOOP_MAPRED_PID_DIR=/data/hadoop/pids
**ps: 自定义存储目录需要先创建好。
 


Hbase 修改


增加或修改:$HBASE_HOME/conf/hbase-env.sh 如下:
#hbase PID存放路径配置
export HBASE_PID_DIR=/data/hadoop/pids


命名规则说明


我阅读了一下stop-all.sh stop-dfs.sh,stop-yarn.sh脚本,发现原理都是通过一个pid文件来停止集群的。
 
这些进程的pid文件默认都是保存在系统的/tmp目录下面,Linux系统(Centos/RHEL等)每个一段时间就会清楚/tmp下面的内容,如果/tmp下没有相关的pid文件停止就会出错“no datanode to stop”
 
当我重启出现问题的时候我怕强制kill -9杀进程的话会破坏集群,于是我想到一个方法,按照pid文件的命名规则重新在/tmp目录下面创建这些pid文件,在翻看了一堆sbin目录下的脚本之后,找到了它们的命名规则。

比如hadoop相关进程的pid文件命名规则为:
pid=$HADOOP_PID_DIR/hadoop-$HADOOP_IDENT_STRING-$command.pid
Yarn进程相关的PID文件:
pid=$YARN_PID_DIR/yarn-$YARN_IDENT_STRING-$command.pid
默认情况下$HADOOP_PID_DIR和$YARN_PID_DIR都为/tmp,$HADOOP_IDENT_STRING和$YARN_IDENT_STRING都为当前系统登录的用户名,比如我的用户名为root,$command为当前执行的命令:
 
比如执行了一个 hadoop-daemon.sh stop namenode,这时候就会去找/tmp/hadoop-root-namenode.pid文件拿到namenode进程的pid号,来停止namenode进程。
 
了解原理之后,于是我就开始手动创建这些文件,我首先jps把所有进程的pid都记录下来了,然后在/tmp目录按照命名规则创建好了这些进程的pid文件,然后再重新执行stop-all.sh命令,ok可以成功关闭集群了。这是我处理的一个过程,最后为了避免这种情况,我就做了如上修改的操作!

Kafka consumer处理大消息数据问题分析

空心菜 发表了文章 0 个评论 11608 次浏览 2016-07-22 22:34 来自相关话题

案例分析 今天在处理kafka consumer的程序的时候,发现如下错误:ERROR [2016-07-22 07:16:02,466] com.flow.kafka.consumer.main.KafkaConsumer: Une ...查看全部


案例分析


今天在处理kafka consumer的程序的时候,发现如下错误:
ERROR [2016-07-22 07:16:02,466] com.flow.kafka.consumer.main.KafkaConsumer: Unexpected Error Occurred
! kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic codeTopic partition 3 at fetch offset 94. Increase the fetch size, or decrease the maximum message size the broker will allow.
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:91) ~[pip-kafka-consumer.jar:na]
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) ~[pip-kafka-consumer.jar:na]
! at com.flow.kafka.consumer.main.KafkaConsumer$KafkaRiverFetcher.run(KafkaConsumer.java:291) ~[original-pip-kafka-consumer.jar:na]
! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
! at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
如上log可以看出,问题就是有一个较大的消息数据在codeTopic的partition 3上,然后consumer未能消费,提示我可以减小broker允许进入的消息数据的大小,或者增大consumer程序消费数据的大小。
 
从log上来看一目了然,如果要解决当前问题的话,1、减小broker消息体大小(设置message.max.bytes
参数)。;2、增大consumer获取数据信息大小(设置fetch.message.max.bytes参数)。默认broker消息体大小为1000000字节即为1M大小。
消费者方面:fetch.message.max.bytes——>这将决定消费者可以获取的数据大小。
broker方面:replica.fetch.max.bytes——>这将允许broker的副本发送消息在集群并确保消息被正确地复制。如果这是太小,则消息不会被复制,因此,消费者永远不会看到的消息,因为消息永远不会承诺(完全复制)。
broker方面:message.max.bytes——>可以接受数据生产者最大消息数据大小。

由我的场景来看较大的消息体已经进入到了kafka,我这里要解决这个问题,只需要增加consumer的fetch.message.max.bytes数值就好。我单独把那条数据消费出来,写到一个文件中发现那条消息大小为1.5M左右,为了避免再次发生这种问题我把consumer程序的fetch.message.max.bytes参数调节为了3072000即为3M,重启consumer程序,查看log一切正常,解决这个消费错误到此结束,下面介绍一下kafka针对大数据处理的思考。


kafka的设计初衷


Kafka设计的初衷是迅速处理小量的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?
 
针对这个问题,可以参考如下建议:
    []  最好的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。[/][]  第二个方法是,将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。[/][]  第三,Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。[/]

 
不过如果上述方法都不是你需要的,而你最终还是希望传送大的消息,那么,则可以在kafka中设置下面一些参数:
broker 配置:
    []message.max.bytes (默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。[/][]log.segment.bytes (默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。[/][]replica.fetch.max.bytes (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。[/]
 Consumer 配置:
    []fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。[/][]性能: 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。[/][]可用的内存和分区数:Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保 分区数最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数最大需要内存空间 不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。[/][]垃圾回收:到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。[/]

具体使用情况可以视具体情况而定。
参考:http://www.4byte.cn/question/288008/kafka-sending-a-15mb-message.html

Flume(NG)架构设计要点及配置实践

Ansible 发表了文章 0 个评论 2739 次浏览 2016-07-14 21:34 来自相关话题

Flume NG是一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。经过架构重 ...查看全部
Flume NG是一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡。
 


架构设计要点


Flume的架构主要有一下几个核心概念:
    []Event:一个数据单元,带有一个可选的消息头[/][]Flow:Event从源点到达目的点的迁移的抽象[/][]Client:操作位于源点处的Event,将其发送到Flume Agent[/][]Agent:一个独立的Flume进程,包含组件Source、Channel、Sink[/][]Source:用来消费传递到该组件的Event[/][]Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event[/][]Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)[/]
 Flume NG架构,如图所示:
FlumeArch.png
外部系统产生日志,直接通过Flume的Agent的Source组件将事件(如日志行)发送到中间临时的channel组件,最后传递给Sink组件,HDFS Sink组件可以直接把数据存储到HDFS集群上。一个最基本Flow的配置,格式如下:
# list the sources, sinks and channels for the agent.sources =  .sinks =  .channels =  # set channel for source.sources..channels =   ....sources..channels =   ...# set channel for sink.sinks..channel = .sinks..channel = 
尖括号里面的,我们可以根据实际需求或业务来修改名称。下面详细说明:表示配置一个Agent的名称,一个Agent肯定有一个名称。与是Agent的Source组件的名称,消费传递过来的Event。与是Agent的Channel组件的名称。与是Agent的Sink组件的名称,从Channel中消费(移除)Event。上面配置内容中,第一组中配置Source、Sink、Channel,它们的值可以有1个或者多个;第二组中配置Source将把数据存储(Put)到哪一个Channel中,可以存储到1个或多个Channel中,同一个Source将数据存储到多个Channel中,实际上是Replication;第三组中配置Sink从哪一个Channel中取(Task)数据,一个Sink只能从一个Channel中取数据。下面,根据官网文档,我们展示几种Flow Pipeline,各自适应于什么样的应用场景: 
    []多个Agent顺序连接[/]
Order.png
可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。 
    []多个Agent的数据汇聚到同一个Agent[/]
fulu.png
这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。 
    []多路(Multiplexing)Agent[/]
multip.png
这种模式,有两种方式,一种是用来复制(Replication),另一种是用来分流(Multiplexing)。Replication方式,可以将最前端的数据源复制多份,分别传递到多个channel中,每个channel接收到的数据都是相同的,配置格式,如下所示:
# List the sources, sinks and channels for the agent.sources = .sinks =  .channels =  # set list of channels for source (separated by space).sources..channels =  # set channel for sinks.sinks..channel = .sinks..channel = .sources..selector.type = replicating
上面指定了selector的type的值为replication,其他的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,然后数据被传递到Sink1和Sink2。Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel,配置格式,如下所示:
# Mapping for multiplexing selector.sources..selector.type = multiplexing.sources..selector.header = .sources..selector.mapping. = .sources..selector.mapping. =  .sources..selector.mapping. = #....sources..selector.default = 
上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的值,即header的值:如果header的值为Value1、Value2,数据从Source1路由到Channel1;如果header的值为Value2、Value3,数据从Source1路由到Channel2。 
    []实现load balance功能[/]
loadbalance.png
Load balancing Sink Processor能够实现load balance功能,上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上,示例配置,如下所示:
a1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2 k3a1.sinkgroups.g1.processor.type = load_balancea1.sinkgroups.g1.processor.backoff = truea1.sinkgroups.g1.processor.selector = round_robina1.sinkgroups.g1.processor.selector.maxTimeOut=10000
    []实现failover功能[/]
Failover Sink Processor能够实现failover功能,具体流程类似load balance,但是内部处理机制与load balance完全不同:Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。如果一个Sink能够成功处理Event,则会加入到一个Pool中,否则会被移出Pool并计算失败次数,设置一个惩罚因子,示例配置如下所示:
a1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2 k3a1.sinkgroups.g1.processor.type = failovera1.sinkgroups.g1.processor.priority.k1 = 5a1.sinkgroups.g1.processor.priority.k2 = 7a1.sinkgroups.g1.processor.priority.k3 = 6a1.sinkgroups.g1.processor.maxpenalty = 20000

基本功能

我们看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),了解它的功能集合,能够让我们在应用中更好地选择使用哪一种方案。说明Flume NG的功能,实际还是围绕着Agent的三个组件Source、Channel、Sink来看它能够支持哪些技术或协议。我们不再对各个组件支持的协议详细配置进行说明,通过列表的方式分别对三个组件进行概要说明: 
    []Flume Source[/]
souce.png
 
    []Flume Channel[/]
channel.png
 
    []Flume Sink[/]
sink.png
另外还有Channel Selector、Sink Processor、Event Serializer、Interceptor等组件,可以参考官网提供的用户手册。

应用实践

安装Flume NG非常简单,我们使用最新的1.5.0.1版本,执行如下命令:
cd /usr/localwget http://mirror.bit.edu.cn/apache/flume/1.5.0.1/apache-flume-1.5.0.1-bin.tar.gztar xvzf apache-flume-1.5.0.1-bin.tar.gzcd apache-flume-1.5.0.1-bin
如果需要使用到Hadoop集群,保证Hadoop相关的环境变量都已经正确配置,并且Hadoop集群可用。下面,通过一些实际的配置实例,来了解Flume的使用。为了简单期间,channel我们使用Memory类型的channel。 
    []Avro Source+Memory Channel+Logger Sink[/]
使用apache-flume-1.5.0.1自带的例子,使用Avro Source接收外部数据源,Logger作为sink,即通过Avro RPC调用,将数据缓存在channel中,然后通过Logger打印出调用发送的数据。配置Agent,修改配置文件conf/flume-conf.properties,内容如下:
# Define a memory channel called ch1 on agent1agent1.channels.ch1.type = memory# Define an Avro source called avro-source1 on agent1 and tell it# to bind to 0.0.0.0:41414. Connect it to channel ch1.agent1.sources.avro-source1.channels = ch1agent1.sources.avro-source1.type = avroagent1.sources.avro-source1.bind = 0.0.0.0agent1.sources.avro-source1.port = 41414# Define a logger sink that simply logs all events it receives# and connect it to the other end of the same channel.agent1.sinks.log-sink1.channel = ch1agent1.sinks.log-sink1.type = logger# Finally, now that we've defined all of our components, tell# agent1 which ones we want to activate.agent1.channels = ch1agent1.channels.ch1.capacity = 1000agent1.sources = avro-source1agent1.sinks = log-sink1
首先,启动Agent进程:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1
然后,启动Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console
    []Avro Source+Memory Channel+HDFS Sink[/]
配置Agent,修改配置文件conf/flume-conf-hdfs.properties,内容如下:
# Define a source, channel, sinkagent1.sources = avro-source1agent1.channels = ch1agent1.sinks = hdfs-sink# Configure channelagent1.channels.ch1.type = memoryagent1.channels.ch1.capacity = 1000000agent1.channels.ch1.transactionCapacity = 500000# Define an Avro source called avro-source1 on agent1 and tell it# to bind to 0.0.0.0:41414. Connect it to channel ch1.agent1.sources.avro-source1.channels = ch1agent1.sources.avro-source1.type = avroagent1.sources.avro-source1.bind = 0.0.0.0agent1.sources.avro-source1.port = 41414# Define a logger sink that simply logs all events it receives# and connect it to the other end of the same channel.agent1.sinks.hdfs-sink1.channel = ch1agent1.sinks.hdfs-sink1.type = hdfsagent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_fileagent1.sinks.hdfs-sink1.hdfs.fileSuffix = .logagent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576agent1.sinks.hdfs-sink1.rollInterval = 0agent1.sinks.hdfs-sink1.hdfs.rollCount = 0agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500agent1.sinks.hdfs-sink1.hdfs.round = trueagent1.sinks.hdfs-sink1.hdfs.roundUnit = minuteagent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = trueagent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1agent1.sinks.hdfs-sink1.fileType = SequenceFileagent1.sinks.hdfs-sink1.writeFormat = TEXT
首先,启动Agent:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-hdfs.properties -Dflume.root.logger=INFO,console -n agent1
然后,启动Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console
可以查看同步到HDFS上的数据:
hdfs dfs -ls /data/flume
结果示例,如下所示:
-rw-r--r--   3 shirdrn supergroup    1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log-rw-r--r--   3 shirdrn supergroup    1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log-rw-r--r--   3 shirdrn supergroup     259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log
    []Spooling Directory Source+Memory Channel+HDFS Sink[/]
配置Agent,修改配置文件flume-conf-spool.properties,内容如下:
# Define source, channel, sinkagent1.sources = spool-source1agent1.channels = ch1agent1.sinks = hdfs-sink1# Configure channelagent1.channels.ch1.type = memoryagent1.channels.ch1.capacity = 1000000agent1.channels.ch1.transactionCapacity = 500000# Define and configure an Spool directory sourceagent1.sources.spool-source1.channels = ch1agent1.sources.spool-source1.type = spooldiragent1.sources.spool-source1.spoolDir = /home/shirdrn/data/agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)?agent1.sources.spool-source1.batchSize = 50agent1.sources.spool-source1.inputCharset = UTF-8# Define and configure a hdfs sinkagent1.sinks.hdfs-sink1.channel = ch1agent1.sinks.hdfs-sink1.type = hdfsagent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%Sagent1.sinks.hdfs-sink1.hdfs.fileSuffix = .logagent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576agent1.sinks.hdfs-sink1.hdfs.rollCount = 0agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500agent1.sinks.hdfs-sink1.hdfs.round = trueagent1.sinks.hdfs-sink1.hdfs.roundUnit = minuteagent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = trueagent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1agent1.sinks.hdfs-sink1.fileType = SequenceFileagent1.sinks.hdfs-sink1.writeFormat = TEXTagent1.sinks.hdfs-sink1.rollInterval = 0
启动Agent进程,执行如下命令:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1
可以查看HDFS上同步过来的数据:
hdfs dfs -ls /data/flume
结果示例,如下所示:
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log-rw-r--r--   3 shirdrn supergroup       1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log
    []Exec Source+Memory Channel+File Roll Sink[/]

配置Agent,修改配置文件flume-conf-file.properties,内容如下:
# Define source, channel, sink
agent1.sources = tail-source1
agent1.channels = ch1
agent1.sinks = file-sink1

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define and configure an Exec source
agent1.sources.tail-source1.channels = ch1
agent1.sources.tail-source1.type = exec
agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log
agent1.sources.tail-source1.shell = /bin/sh -c
agent1.sources.tail-source1.batchSize = 50

# Define and configure a File roll sink
# and connect it to the other end of the same channel.
agent1.sinks.file-sink1.channel = ch1
agent1.sinks.file-sink1.type = file_roll
agent1.sinks.file-sink1.batchSize = 100
agent1.sinks.file-sink1.serializer = TEXT
agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data
启动Agent进程,执行如下命令:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-file.properties -Dflume.root.logger=INFO,console -n agent1
可以查看File Roll Sink对应的本地文件系统目录/home/shirdrn/sink_data下,示例如下所示:
-rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1
-rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2
-rw-rw-r-- 1 shirdrn shirdrn 0 Sep 17 11:37 1410924990039-3
-rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4
-rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5
有关Flume NG更多配置及其说明,请参考官方用户手册,非常详细。

参考链接:
http://flume.apache.org/FlumeUserGuide.html 
https://blogs.apache.org/flume/entry/flume_ng_architecture
 


分享阅读原文:http://shiyanjun.cn/archives/915.html​ 
作者:时延军


Elasticsearch Monitoring

空心菜 发表了文章 0 个评论 3259 次浏览 2016-07-02 14:02 来自相关话题

集群查看集群状态 curl -s '127.0.0.1:9200/_cluster/health?pretty' 查看集群中所有索引的状态 curl -s '127.0.0.1:92 ...查看全部
集群
查看集群状态

curl -s '127.0.0.1:9200/_cluster/health?pretty'

查看集群中所有索引的状态

curl -s '127.0.0.1:9200/_cluster/health?pretty&level=indices'

查看集群中所有索引分片的状态

curl -s '127.0.0.1:9200/_cluster/health?pretty&level=shards'

节点
查看集群中所有节点的配置信息

curl -s '127.0.0.1:9200/_nodes/?pretty'

查看集群中指定节点的配置信息

curl -s '127.0.0.1:9200/_nodes/{ip|name}?pretty'

查看集群中所有节点的状态信息

curl -s '127.0.0.1:9200/_nodes/stats/?pretty'

查看集群中指定节点的状态信息

curl -s '127.0.0.1:9200/_nodes/{ip|name}/stats/?pretty'

查看集群中所有节点的指定状态信息

curl -s '127.0.0.1:9200/_nodes/stats/{indices|os|process|jvm|thread_pool|fs}?pretty'

查看集群中指定节点的指定状态信息

curl -s '127.0.0.1:9200/_nodes/{ip|name}/stats/{indices|os|process|jvm|thread_pool|fs}?pretty'

cat
curl -s '127.0.0.1:9200/_cat/?'

索引
查看指定索引的状态信息

curl -s 127.0.0.1:9200/_cat/indices/{index}?v

查看指定索引的分片信息

curl -s 127.0.0.1:9200/_cat/shards/{index}?v

查看指定索引的段信息

curl -s 127.0.0.1:9200/_cat/segments/{index}?v
原文作者:erica.zhou
原文地址:http://www.z-dig.com/elasticsearch-monitoring.html

Elasticsearch 2.3.1配置文件说明

Rock 发表了文章 0 个评论 3011 次浏览 2016-06-28 10:46 来自相关话题

elasticsearch的config文件夹里面有两个配置文件:elasticsearch.yml和logging.yml,第一个是es的基本配置文件,第二个是日志配置文件,es也是使用log4j来记录日志的,所以logging.yml里的设置按普通log4 ...查看全部
elasticsearch的config文件夹里面有两个配置文件:elasticsearch.yml和logging.yml,第一个是es的基本配置文件,第二个是日志配置文件,es也是使用log4j来记录日志的,所以logging.yml里的设置按普通log4j配置文件来设置就行了。下面主要讲解下elasticsearch.yml这个文件中可配置的东西。

cluster.name:elasticsearch
配置es的集群名称,默认是elasticsearch,es会自动发现在同一网段下的es,如果在同一网段下有多个集群,就可以用这个属性来区分不同的集群。

node.name:"FranzKafka"
节点名,默认随机指定一个name列表中名字,该列表在es的jar包中config文件夹里name.txt文件中,其中有很多作者添加的有趣名字。

node.master:true
指定该节点是否有资格被选举成为node,默认是true,es是默认集群中的第一台机器为master,如果这台机挂了就会重新选举master。

node.data:true
指定该节点是否存储索引数据,默认为true。

index.number_of_shards:5
设置默认索引分片个数,默认为5片。

index.number_of_replicas:1
设置默认索引副本个数,默认为1个副本。

path.conf:/path/to/conf
设置配置文件的存储路径,默认是es根目录下的config文件夹。

path.data:/path/to/data
设置索引数据的存储路径,默认是es根目录下的data文件夹,可以设置多个存储路径,用逗号隔开,例:
path.data:/path/to/data1,/path/to/data2

path.work:/path/to/work
设置临时文件的存储路径,默认是es根目录下的work文件夹。

path.logs:/path/to/logs
设置日志文件的存储路径,默认是es根目录下的logs文件夹

path.plugins:/path/to/plugins
设置插件的存放路径,默认是es根目录下的plugins文件夹

bootstrap.mlockall:true
设置为true来锁住内存。因为当jvm开始swapping时es的效率会降低,所以要保证它不swap,可以把ES_MIN_MEM和ES_MAX_MEM两个环境变量设置成同一个值,并且保证机器有足够的内存分配给es。同时也要允许elasticsearch的进程可以锁住内存,linux下可以通过`ulimit-l unlimited`命令。

network.bind_host:192.168.0.1
设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0。


network.publish_host:192.168.0.1
设置其它节点和该节点交互的ip地址,如果不设置它会自动判断,值必须是个真实的ip地址。

network.host:192.168.0.1
这个参数是用来同时设置bind_host和publish_host上面两个参数。

transport.tcp.port:9300
设置节点间交互的tcp端口,默认是9300。

transport.tcp.compress:true
设置是否压缩tcp传输时的数据,默认为false,不压缩。

http.port:9200
设置对外服务的http端口,默认为9200。

http.max_content_length:100mb
设置内容的最大容量,默认100mb

http.enabled:false
是否使用http协议对外提供服务,默认为true,开启。

gateway.type:local
gateway的类型,默认为local即为本地文件系统,可以设置为本地文件系统,分布式文件系统,hadoop的HDFS,和amazon的s3服务器,其它文件系统的设置方法下次再详细说。

gateway.recover_after_nodes:1
设置集群中N个节点启动时进行数据恢复,默认为1。

gateway.recover_after_time:5m
设置初始化数据恢复进程的超时时间,默认是5分钟。

gateway.expected_nodes:2
设置这个集群中节点的数量,默认为2,一旦这N个节点启动,就会立即进行数据恢复。

cluster.routing.allocation.node_initial_primaries_recoveries:4
初始化数据恢复时,并发恢复线程的个数,默认为4。

cluster.routing.allocation.node_concurrent_recoveries:2
添加删除节点或负载均衡时并发恢复线程的个数,默认为4。

indices.recovery.max_size_per_sec:0
设置数据恢复时限制的带宽,如入100mb,默认为0,即无限制。

indices.recovery.concurrent_streams:5
设置这个参数来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5。

discovery.zen.minimum_master_nodes:1
设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。默认为1,对于大的集群来说,可以设置大一点的值(2-4)

discovery.zen.ping.timeout:3s
设置集群中自动发现其它节点时ping连接超时时间,默认为3秒,对于比较差的网络环境可以高点的值来防止自动发现时出错。

discovery.zen.ping.multicast.enabled:false
设置是否打开多播发现节点,默认是true。

discovery.zen.ping.unicast.hosts:["host1", "host2:port","host3[portX-portY]"]
设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点

Kafka Offset Monitor页面字段解释

Ansible 发表了文章 0 个评论 4780 次浏览 2016-06-24 17:43 来自相关话题

如上图所示: id:kafka broker实例的id号 host:kafka 实例的ip地址 port:kafka 实例服务端口 Topic:Topic的名字 Partition:Topic包含的分 ...查看全部
kafkatopic.png

如上图所示:
id:kafka broker实例的id号
host:kafka 实例的ip地址
port:kafka 实例服务端口
Topic:Topic的名字
Partition:Topic包含的分区,上图中pythonTopic中包含4个分区(默认分区从0开始)
Offset:Kafka Consumer已经消费分区上的消息数
logSize:已经写到该分区的消息数
Lag:未读取也就是未消费的消息数(Lag = logSize - Offset)
Owner:改分区位于哪个Broker上,上图中只有一个Broker(consumer group name + consumer hostname + broker.id)
Created:分区创建时间
Last Seen:Offset 和 logSize 数字最后一次刷新时间间隔
有时候刚启动kafka的时候,可能发现Lag为负值,这明显是不对的,但是当Producer 和 Consumer对kafka进行写和读的时候,数字会显示正常,具体为什么会出现负值,我也不从而知,我估计是bug。

Python可用的分布式协调系统(ZooKeeper,Consul, etcd)介绍

push 发表了文章 0 个评论 8417 次浏览 2016-06-19 22:52 来自相关话题

随着大数据时代的到来,分布式是解决大数据问题的一个主要手段,随着越来越多的分布式的服务,如何在分布式的系统中对这些服务做协调变成了一个很棘手的问题。今天我们就来看看如何使用Python,利用开源对分布式服务做协调。   在对分布式的应用 ...查看全部
随着大数据时代的到来,分布式是解决大数据问题的一个主要手段,随着越来越多的分布式的服务,如何在分布式的系统中对这些服务做协调变成了一个很棘手的问题。今天我们就来看看如何使用Python,利用开源对分布式服务做协调。
 
在对分布式的应用做协调的时候,主要会碰到以下的应用场景:
    []业务发现(service discovery)[/]
           找到分布式系统中存在那些可用的服务和节点
    []名字服务 (name service)[/]
           通过给定的名字知道到对应的资源
    []配置管理 (configuration management)[/]
           如何在分布式的节点中共享配置文件,保证一致性。
    []故障发现和故障转移 (failure detection and failover)[/]
           当某一个节点出故障的时候,如何检测到并通知其它节点, 或者把想用的服务转移到其它的可用节点
    []领导选举(leader election)[/]
           如何在众多的节点中选举一个领导者,来协调所有的节点
    []分布式的锁 (distributed exclusive lock)[/]
           如何通过锁在分布式的服务中进行同步
    []消息和通知服务 (message queue and notification)[/]
           如何在分布式的服务中传递消息,以通知的形式对事件作出主动的响应有许多的开源软件试图解决以上的全部或者部分问题,例如ZooKeeper,consul,doozerd等等,我们现在就看看它们是如何做的。

ZooKeeper

 Zookeeper 是使用最广泛,也是最有名的解决分布式服务的协调问题的开源软件了,它最早和Hadoop一起开发,后来成为了Apache的顶级项目,很多开源的项目都在使用ZooKeeper,例如大名鼎鼎的Kafka。 Zookeeper本身是一个分布式的应用,通过对共享的数据的管理来实现对分布式应用的协调。ZooKeeper使用一个树形目录作为数据模型,这个目录和文件目录类似,目录上的每一个节点被称作ZNodes。
zk.png
ZooKeeper提供基本的API来操纵和控制Znodes,包括对节点的创建,删除,设置和获取数据,获得子节点等。除了这些基本的操作,ZooKeeper还提供了一些配方(Recipe),其实就是一些常见的用例,例如锁,两阶段提交,领导选举等等。ZooKeeper本身是用Java开发的,所以对Java的支持是最自然的。它同时还提供了C语言的绑定。 Kazoo 是一个非常成熟的Zookeeper Python客户端,我们这就看看如果使用Python来调用ZooKeeper。(注意,运行以下的例子,需要在本地启动ZooKeeper的服务) 基本操作 以下的例子现实了对Znode的基本操作,首先要创建一个客户端的连接,并启动客户端。然后我们可以利用该客户端对Znode做增删改,取内容的操作。最后退出客户端。
from kazoo.client import KazooClientimport logginglogging.basicConfig()zk = KazooClient(hosts='127.0.0.1:2181')zk.start()# Ensure a path, create if necessaryzk.ensure_path("/test/zk1")# Create a node with datazk.create("/test/zk1/node", b"a test value")# Determine if a node existsif zk.exists("/test/zk1"):    print "the node exist"# Print the version of a node and its datadata, stat = zk.get("/test/zk1")print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))# List the childrenchildren = zk.get_children("/test/zk1")print("There are %s children with names %s" % (len(children), children))zk.stop()
通过对ZNode的操作,我们可以完成一些分布式服务协调的基本需求,包括名字服务,配置服务,分组等等。 故障检测(Failure Detection)在分布式系统中,一个最基本的需求就是当某一个服务出问题的时候,能够通知其它的节点或者某个管理节点。ZooKeeper提供ephemeral Node的概念,当创建该Node的服务退出或者异常中止的时候,该Node会被删除,所以我们就可以利用这种行为来监控服务运行状态。以下是worker的代码
from kazoo.client import KazooClientimport timeimport logginglogging.basicConfig()zk = KazooClient(hosts='127.0.0.1:2181')zk.start()# Ensure a path, create if necessaryzk.ensure_path("/test/failure_detection")# Create a node with datazk.create("/test/failure_detection/worker",          value=b"a test value", ephemeral=True)while True:    print "I am alive!"    time.sleep(3)zk.stop()
以下的monitor 代码,监控worker服务是否运行。
from kazoo.client import KazooClientimport timeimport logginglogging.basicConfig()zk = KazooClient(hosts='127.0.0.1:2181')zk.start()# Determine if a node existswhile True:    if zk.exists("/test/failure_detection/worker"):        print "the worker is alive!"    else:        print "the worker is dead!"        break    time.sleep(3)zk.stop()
领导选举Kazoo直接提供了领导选举的API,使用起来非常方便。
from kazoo.client import KazooClientimport timeimport uuidimport logginglogging.basicConfig()my_id = uuid.uuid4()def leader_func():    print "I am the leader {}".format(str(my_id))    while True:        print "{} is working! ".format(str(my_id))        time.sleep(3)zk = KazooClient(hosts='127.0.0.1:2181')zk.start()election = zk.Election("/electionpath")# blocks until the election is won, then calls# leader_func()election.run(leader_func)zk.stop()
你可以同时运行多个worker,其中一个会获得Leader,当你杀死当前的leader后,会有一个新的leader被选出。 分布式锁锁的概念大家都熟悉,当我们希望某一件事在同一时间只有一个服务在做,或者某一个资源在同一时间只有一个服务能访问,这个时候,我们就需要用到锁。
from kazoo.client import KazooClientimport timeimport uuidimport logginglogging.basicConfig()my_id = uuid.uuid4()def work():    print "{} is working! ".format(str(my_id))zk = KazooClient(hosts='127.0.0.1:2181')zk.start()lock = zk.Lock("/lockpath", str(my_id))print "I am {}".format(str(my_id))while True:    with lock:        work()    time.sleep(3)    zk.stop()
当你运行多个worker的时候,不同的worker会试图获取同一个锁,然而只有一个worker会工作,其它的worker必须等待获得锁后才能执行。 监视ZooKeeper提供了监视(Watch)的功能,当节点的数据被修改的时候,监控的function会被调用。我们可以利用这一点进行配置文件的同步,发消息,或其他需要通知的功能。
from kazoo.client import KazooClientimport timeimport logginglogging.basicConfig()zk = KazooClient(hosts='127.0.0.1:2181')zk.start()@zk.DataWatch('/path/to/watch')def my_func(data, stat):    if data:        print "Data is %s" % data        print "Version is %s" % stat.version     else :        print "data is not available"while True:    time.sleep(10)zk.stop()
除了我们上面列举的内容外,Kazoo还提供了许多其他的功能,例如:计数,租约,队列等等,大家有兴趣可以参考它的文档。 

Consul

Consul 是用Go开发的分布式服务协调管理的工具,它提供了服务发现,健康检查,Key/Value存储等功能,并且支持跨数据中心的功能。Consul提供ZooKeeper类似的功能,它的基于HTTP的API可以方便的和各种语言进行绑定。自然Python也在列。与Zookeeper有所差异的是Consul通过基于Client/Server架构的Agent部署来支持跨Data Center的功能。
ConsulArch.png
Consul在Cluster伤的每一个节点都运行一个Agent,这个Agent可以使Server或者Client模式。Client负责到Server的高效通信,相对为无状态的。 Server负责包括选举领导节点,维护cluster的状态,对所有的查询做响应,跨数据中心的通信等等。 KV基本操作 类似于Zookeeper,Consul支持对KV的增删查改的操作。
import consulc = consul.Consul()# set data for key fooc.kv.put('foo', 'bar')# poll a key for updatesindex = Nonewhile True:    index, data = c.kv.get('foo', index=index)    print data['Value']    c.kv.delete('foo')
这里和ZooKeeper对Znode的操作几乎是一样的。 服务发现(Service Discovery)和健康检查(Health Check) Consul的另一个主要的功能是用于对分布式的服务做管理,用户可以注册一个服务,同时还提供对服务做健康检测的功能。首先,用户需要定义一个服务。
{  "service": {    "name": "redis",    "tags": ["master"],    "address": "127.0.0.1",    "port": 8000,    "checks": [      {        "script": "/usr/local/bin/check_redis.py",        "interval": "10s"      }    ]  }}
其中,服务的名字是必须的,其它的字段可以自选,包括了服务的地址,端口,相应的健康检查的脚本。当用户注册了一个服务后,就可以通过Consul来查询该服务,获得该服务的状态。 Consul支持三种Check的模式:
    []调用一个外部脚本(Script),在该模式下,consul定时会调用一个外部脚本,通过脚本的返回内容获得对应服务的健康状态。[/][]调用HTTP,在该模式下,consul定时会调用一个HTTP请求,返回2XX,则为健康;429 (Too many request)是警告。其它均为不健康[/][]主动上报,在该模式下,服务需要主动调用一个consul提供的HTTP PUT请求,上报健康状态。[/]
 Python API提供对应的接口,大家可以参考 http://python-consul.readthedocs.org/en/latest/
    []Consul.Agent.Service[/][]Consul.Agent.Check[/]
 Consul的Health Check和Zookeeper的Failure Detection略有不同,ZooKeeper可以利用ephemeral Node来检测服务的状态,Consul的Health Check,通过调用脚本,HTTP或者主动上报的方式检查服务的状态,更为灵活,可以获得等多的信息,但是也需要做更多的工作。 故障检测(Failure Detection) Consul提供Session的概念,利用Session可以检查服务是否存活。对每一个服务我们都可以创建一个session对象,注意这里我们设置了ttl,consul会以ttl的数值为间隔时间,持续的对session的存活做检查。对应的在服务中,我们需要持续的renew session,保证session是合法的。
import consulimport timec = consul.Consul()s = c.session.create(name="worker",behavior='delete',ttl=10)print "session id is {}".format(s)while True:    c.session.renew(s)    print "I am alive ..."    time.sleep(3)
Moniter代码用于监控worker相关联的session的状态,但发现worker session已经不存在了,就做出响应的处理。
import consulimport timedef is_session_exist(name, sessions):    for s in sessions:        if s['Name'] == name:            return True    return Falsec = consul.Consul()while True:    index, sessions = c.session.list()    if is_session_exist('worker', sessions):        print "worker is alive ..."    else:        print 'worker is dead!'        break    time.sleep(3)
这里注意,因为是基于ttl(最小10秒)的检测,从业务中断到被检测到,至少有10秒的时延,对应需要实时响应的情景,并不适用。Zookeeper使用ephemeral Node的方式时延相对短一点,但也非实时。 领导选举和分布式的锁 无论是Consul本身还是Python客户端,都不直接提供Leader Election的功能,但是这篇文档介绍了如何利用Consul的KV存储来实现Leader Election,利用Consul的KV功能,可以很方便的实现领导选举和锁的功能。当对某一个Key做put操作的时候,可以创建一个session对象,设置一个acquire标志为该 session,这样就获得了一个锁,获得所得客户则是被选举的leader。代码如下:
import consulimport timec = consul.Consul()def request_lead(namespace, session_id):    lock = c.kv.put(leader_namespace,"leader check", acquire=session_id)    return lockdef release_lead(session_id):    c.session.destroy(session_id)def whois_lead(namespace):    index,value = c.kv.get(namespace)    session = value.get('Session')    if session is None:        print 'No one is leading, maybe in electing'    else:        index, value = c.session.info(session)        print '{} is leading'.format(value['ID'])def work_non_block():    print "working"def work_block():    while True:        print "working"        time.sleep(3)leader_namespace = 'leader/test'[size=16] initialize leader key/value node[/size]leader_index, leader_node = c.kv.get(leader_namespace)if leader_node is None:    c.kv.put(leader_namespace,"a leader test")while True:    whois_lead(leader_namespace)    session_id = c.session.create(ttl=10)    if request_lead(leader_namespace,session_id):        print "I am now the leader"        work_block()        release_lead(session_id)    else:        print "wait leader elected!"    time.sleep(3)
利用同样的机制,可以方便的实现锁,信号量等分布式的同步操作。 监视Consul的Agent提供了Watch的功能,然而Python客户端并没有相应的接口。  

Etcd

Etcd 是另一个用GO开发的分布式协调应用,它提供一个分布式的Key/Value存储来进行共享的配置管理和服务发现。 同样的etcd使用基于HTTP的API,可以灵活的进行不同语言的绑定,我们用的是这个客户端https://github.com/jplana/python-etcd 基本操作
import etcdclient = etcd.Client() client.write('/nodes/n1', 1)print client.read('/nodes/n1').value
etcd对节点的操作和ZooKeeper类似,不过etcd不支持ZooKeeper的ephemeral Node的概念,要监控服务的状态似乎比较麻烦。 分布式锁etcd支持分布式锁,以下是一个例子。
import syssys.path.append("../../")import etcdimport uuidimport timemy_id = uuid.uuid4()def work():    print "I get the lock {}".format(str(my_id))client = etcd.Client() lock = etcd.Lock(client, '/customerlock', ttl=60)with lock as my_lock:    work()    lock.is_locked()  # True    lock.renew(60)lock.is_locked()  # False
老版本的etcd支持leader election,但是在最新版该功能被deprecated了,参见https://coreos.com/etcd/docs/0.4.7/etcd-modules/ 其它 我们针对分布式协调的功能讨论了三个不同的开源应用,其实还有许多其它的选择,我这里就不一一介绍,大家有兴趣可以访问以下的链接:           Netflix开发的定位服务,应用于fail over和load balance的功能           基于ZooKeeper的更高层次的封装           基于GO的高可靠,分布式的数据存储,过去两年已经不活跃           基于Python开发的,面向对象的接口的分布式应用协调的工具           serf提供轻量级的cluster成员管理,故障检测(failure detection)和协调。开发基于GO语言。Consul使用了serf提供的功能           基于ruby的ZooKeeper实现,过去三年不活跃
            基于日志的分布式协调的框架,使用Java开发
 


总结


ZooKeeper无疑是分布式协调应用的最佳选择,功能全,社区活跃,用户群体很大,对所有典型的用例都有很好的封装,支持不同语言的绑定。缺点是,整个应用比较重,依赖于Java,不支持跨数据中心。

Consul作为使用Go语言开发的分布式协调,对业务发现的管理提供很好的支持,他的HTTP API也能很好的和不同的语言绑定,并支持跨数据中心的应用。缺点是相对较新,适合喜欢尝试新事物的用户。

etcd是一个更轻量级的分布式协调的应用,提供了基本的功能,更适合一些轻量级的应用来使用。
 
参考
如果大家对于分布式系统的协调想要进行更多的了解,可以阅读一下的链接:
http://stackoverflow.com/questions/6047917/zookeeper-alternatives-cluster-coordination-service 

http://txt.fliglio.com/2014/05/encapsulated-services-with-consul-and-confd/ 

http://txt.fliglio.com/2013/12/service-discovery-with-docker-docker-links-and-beyond/ 

http://www.serfdom.io/intro/vs-zookeeper.html 

http://devo.ps/blog/zookeeper-vs-doozer-vs-etcd/ 

https://www.digitalocean.com/community/articles/how-to-set-up-a-serf-cluster-on-several-ubuntu-vps 

http://www.slideshare.net/JyrkiPulliainen/taming-pythons-with-zoo-keeper-ep2013?qid=e1267f58-090d-4147-9909-ec673525e76b&v=qf1&b=&from_search=8 

http://muratbuffalo.blogspot.com/2014/09/paper-summary-tango-distributed-data.html 

https://developer.yahoo.com/blogs/hadoop/apache-zookeeper-making-417.html 

http://www.knewton.com/tech/blog/2014/12/eureka-shouldnt-use-zookeeper-service-discovery 

http://codahale.com/you-cant-sacrifice-partition-tolerance/
原文地址:http://my.oschina.net/taogang/blog/410864

大数据与物联网

Nock 发表了文章 0 个评论 2751 次浏览 2016-06-05 11:31 来自相关话题

物联网是新一代信息技术的重要组成部分,其英文名称是"The Internet of things"。顾名思义,"物联网就是物物相连的互联网"。这有两层意思: []物联网的核心和基础仍然 是互联网,是在互联网基础上的延伸和扩展的网络;[/][]其用户 ...查看全部
物联网是新一代信息技术的重要组成部分,其英文名称是"The Internet of things"。顾名思义,"物联网就是物物相连的互联网"。这有两层意思:
    []物联网的核心和基础仍然 是互联网,是在互联网基础上的延伸和扩展的网络;[/][]其用户端延伸和扩展到了任何物 品与物品之间,进行信息交换和通信。物联网就是"物物相连的互联网"。[/]

 
物联网通过智能感知、识别技术与普适计算、泛在网络的融合应用,被称为继计算机、互联网之后世界信息 产业发展的第三次浪潮。物联网是互联网的应用拓展,与其说物联网是网络,不如说物联网是业务和应用。因此,应用创新是物联网发展的核心,以用户体验为核心的创新 2.0 是物联网发展的灵魂。 
 
物联网架构可分为三层,包括感知层、网络层和应用层:
    []感知层:由各种传感器构成,包括温湿度传感器、二维码标签、RFID标签和读写 器、摄像头、GPS 等感知终端。感知层是物联网识别物体、采集信息的来源; [/][]网络层:由各种网络,包括互联网、广电网、网络管理系统和云计算平台等组成, 是整个物联网的中枢,负责传递和处理感知层获取的信息;[/][]应用层:是物联网和用户的接口,它与行业需求结合,实现物联网的智能应用。 物联网用途广泛,遍及智能交通、环境保护、政府工作、公共安全、平安家居、智能消防、工业监测、环境监测、路灯照明管控、景观照明管控、楼宇照明管控、广场照明管控、 老人护理、个人健康、花卉栽培、水系监测、食品溯源、敌情侦查和情报搜集等多个领域。 [/]

 
国际电信联盟于 2005 年的报告曾

强制清除Elasticsearch中已删除的文件

空心菜 发表了文章 0 个评论 11039 次浏览 2016-06-05 01:14 来自相关话题

Elasticsearch是建立在Apache Lucene基础上的实时分布式搜索引擎,Lucene为了提高搜索的实时性,采用不可 ...查看全部

Elasticsearch是建立在Apache Lucene基础上的实时分布式搜索引擎,Lucene为了提高搜索的实时性,采用不可再修改(immutable)方式将文档存储在一个个segment中。


也就是说,一个segment在写入到存储系统之后,将不可以再修改。那么Lucene是如何从一个segment中删除一个被索引的文档呢?


简单的讲,当用户发出命令删除一个被索引的文档#ABC时,该文档并不会被马上从相应的存储它的segment中删除掉,而是通过一个特殊的文件来标记该文档已被删除。


当用户再次搜索到#ABC时,Elasticsearch在segment中仍能找到#ABC,但由于#ABC文档已经被标记为删除,所以Lucene会从发回给用户的搜索结果中剔除#ABC,所以给用户感觉的是#ABC已经被删除了。


Elasticseach会有后台线程根据Lucene的合并规则定期进行Segment Merging合并操作,一般不需要用户担心或者采取任何行动。


被删除的文档在segment合并时,才会被真正删除掉。在此之前,它仍然会占用着JVM heap和操作系统的文件cache等资源。在某些情况下,我们需要强制Elasticsearch进行segment merging,已释放其占用的大量系统资源。


 POST /{index}/_optimize?only_expunge_deletes=true&wait_for_completion=true

_optimize命令可强制进行segment合并,并删除所有标记为删除的文档。Segment merging要消耗CPU,以及大量的I/O资源,所以一定要在你的ElasticSearch集群处于维护窗口期间,并且有足够的I/O空间的(如:SSD)的条件下进行;否则很可能造成集群崩溃和数据丢失。


下图展示了我们在进行强制expunge时,所观察到的CPU和磁盘I/O的使用情况。该集群运行在微软的Azure云平台IaaS虚拟机上,所有的数据节点都采用 D13 虚拟机,数据存储在本地的SSD磁盘中。该集群是一个备份集群,为了保证合并顺利进行,在此期间暂停了所有对其进行的写操作,仅有少量的读操作。这里需要注意: expunge操作是一种不得已而为之的操作,即在Elasticsearch不能有效自动清除删除文件的情况下才执行该操作。同时建议在此操作期间,最好停止对集群的所有读/写操作,并暂停止shard的自动分配 ( cluster.routing.allocation.enable= none ),以防有节点被踢出后shard自动分配造成的数据丢失。


下面两个设置可以用于控制清除时的处理速度,其中给出值是默认值,可以根据需求进行调整,具体请参见Merge


此外, 还可以临时将所有索引的replica设置为0,这样只用针对Primary进行expunge,以减小I/O压力。


PUT /{index}/_settings
{
"settings": {
"index.merge.policy.expunge_deletes_allowed": "10",
"index.merge.policy.max_merge_at_once_explicit" : "30"
}
}

参考资料:Lucene‘s Handling of Deleted Documents
分享阅读:http://blog.csdn.net/quicknet/article/details/46421505

Elasticsearch 使用optimize强制合并segment测试

空心菜 发表了文章 1 个评论 10166 次浏览 2016-06-03 17:43 来自相关话题

首先我操作如下状态的索引:health index pri rep docs.count docs.deleted store.size pri.store.size green javaindex_20160 ...查看全部
首先我操作如下状态的索引:
health index                  pri rep docs.count docs.deleted store.size pri.store.size 
green javaindex_20160518 5 1 23330821 0 15.8gb 7.9gb

合并前:
[root@betaing nock]# curl -s "http://localhost:9200/_cat/segments/javaindex_20160518?v&h=shard,segment,size,size.memory" | awk '{sum += $NF} END {print sum}'
456833526
合并操作:
curl -XPOST 'http://localhost:9200/javaindex_20160518/_optimize?max_num_segments=1'
合并后:
[root@betaing nock]# curl -s "http://localhost:9200/_cat/segments/javaindex_20160518?v&h=shard,segment,size,size.memory" | awk '{sum += $NF} END {print sum}'
369622567
合并后减少的segment memory为:
>>> print (456833526 - 369622567)
87210959 ----> 87.2M 缩减大小
百分比:
>>> print (456833526 - 369622567) / 456833526.0 
0.190903149696 ----> 19% 缩减百分比

换个更大的索引测试一遍,同样也是合并为一个segment
索引大小:
health index                  pri rep docs.count docs.deleted store.size pri.store.size 
green javaindex_20160520 5 1 103324505 0 70.3gb 35.1gb
合并前:
[root@betaing nock]# curl -s "http://localhost:9200/_cat/segments/javaindex_20160520?v&h=shard,segment,size,size.memory" | awk '{sum += $NF} END {print sum}'
1698117764
合并操作:
curl -XPOST 'http://localhost:9200/javaindex_20160520/_optimize?max_num_segments=1'
合并后:
[root@betaing index]# curl -s "http://localhost:9200/_cat/segments/javaindex_20160520?v&h=shard,segment,size,size.memory" | awk '{sum += $NF} END {print sum}'
1622962469
>>> print ( 1698117764 - 1622962469 ) / 1698117764.0
0.0442579994116

压缩后释放了4.4%的内存,大小就是75.2M
总结:
从上面的例子,可以看出来索引越大,反而释放的segment memory效率越低!
 
下面我们针对单个索引,做个合并segment个数不同来对比一下效率:
合并前:
health index                  pri rep docs.count docs.deleted store.size pri.store.size 
green phpindex_20160526 5 1 260338401 0 96.5gb 48.2gb
[root@betaing nock]# curl -s "http://localhost:9200/_cat/segments/phpindex_20160526?v&h=shard,segment,size,size.memory" | awk '{sum += $NF} END {print sum}'
3955994758
合并为10个segment:
curl -XPOST 'http://localhost:9200/phpindex_20160526/_optimize?max_num_segments=10'
[root@betaing nock]# curl -s "http://localhost:9200/_cat/segments/phpindex_20160526?v&h=shard,segment,size,size.memory" | awk '{sum += $NF} END {print sum}'
3929919062
>>> print ( 3955994758 - 3929919062 ) / 3955994758.0
0.00659143846115
[quote]>> print ( 3955994758 - 3929919062 )
26075696
合并后memory减少了26M,百分比为0.66%
 
合并为5个segment:
curl -XPOST 'http://localhost:9200/phpindex_20160526/_optimize?max_num_segments=5'
{"_shards":{"total":10,"successful":10,"failed":0}}
[root@betaing nock]# curl -s "http://localhost:9200/_cat/segments/phpindex_20160526?v&h=shard,segment,size,size.memory" | awk '{sum += $NF} END {print sum}'
3899949448
合并后效率:
>>> print ( 3955994758 - 3899949448 ) / 3955994758.0
0.0141671851022
>>> print ( 3955994758 - 3899949448 )
56045310
合并后减少了56M segment memory,效率为1.42%[/quote]

合并为1个segment:
[root@betaing nock]# curl -XPOST 'http://localhost:9200/phpindex_20160526/_optimize?max_num_segments=1'
{"_shards":{"total":10,"successful":10,"failed":0}}
[root@betaing nock]# curl -s "http://localhost:9200/_cat/segments/phpindex_20160526?v&h=shard,segment,size,size.memory" | awk '{sum += $NF} END {print sum}'
3892073433
合并后效率:
>>> print ( 3955994758 - 3892073433 ) / 3955994758.0
0.0161580914309
[quote]>> print ( 3955994758 - 3892073433 )
63921325
合并后减少了64M segment memory,效率为1.6%
 
总结:
随着合并数的减少,释放的segment memory增加,效率增大,但是不是成倍的。
 
性能如下:
Cpu.png
[/quote]