Elasticsearch内存分配设置详解

空心菜 发表了文章 0 个评论 11047 次浏览 2016-03-27 02:35 来自相关话题

Elasticsearch默认安装后设置的内存是1GB,对于任何一个现实业务来说,这个设置都太小了。如果你正在使用这个默认堆内存配置,你的集群配置可能会很快发生问题。   这里有两种方式修改Elasticsearch的堆内存(下面就说内 ...查看全部
Elasticsearch默认安装后设置的内存是1GB,对于任何一个现实业务来说,这个设置都太小了。如果你正在使用这个默认堆内存配置,你的集群配置可能会很快发生问题。
 
这里有两种方式修改Elasticsearch的堆内存(下面就说内存好了),最简单的一个方法就是指定ES_HEAP_SIZE环境变量。服务进程在启动时候会读取这个变量,并相应的设置堆的大小。设置命令如下:
export ES_HEAP_SIZE=10g
此外,你也可以通过命令行参数的形式,在程序启动的时候把内存大小传递给它:
./bin/elasticsearch -Xmx10g -Xms10g
备注:确保Xmx和Xms的大小是相同的,其目的是为了能够在java垃圾回收机制清理完堆区后不需要重新分隔计算堆区的大小而浪费资源,可以减轻伸缩堆大小带来的压力。
一般来说设置ES_HEAP_SIZE环境变量,比直接写-Xmx10g  -Xms10g更好一点。


把内存的一半给Lucene


一个常见的问题是配置一个大内存,假设你有一个64G内存的机器,按照正常思维思考,你可能会认为把64G内存都给Elasticsearch比较好,但现实是这样吗, 越大越好?
 
当然,内存对于Elasticsearch来说绝对是重要的,用于更多的内存数据提供更快的操作,而且还有一个内存消耗大户-Lucene。
 
Lucene的设计目的是把底层OS里的数据缓存到内存中。Lucene的段是分别存储到单个文件中的,这些文件都是不会变化的,所以很利于缓存,同时操作系统也会把这些段文件缓存起来,以便更快的访问。
 
Lucene的性能取决于和OS的交互,如果你把所有的内存都分配给Elasticsearch,不留一点给Lucene,那你的全文检索性能会很差的。
 
最后标准的建议是把50%的内存给elasticsearch,剩下的50%也不会没有用处的,Lucene会很快吞噬剩下的这部分内存用于文件缓存。


不要超过32G


这里有另外一个原因不分配大内存给Elasticsearch,事实上jvm在内存小于32G的时候会采用一个内存对象指针压缩技术。
 
在java中,所有的对象都分配在堆上,然后有一个指针引用它。指向这些对象的指针大小通常是CPU的字长的大小,不是32bit就是64bit,这取决于你的处理器,指针指向了你的值的精确位置。
 
对于32位系统,你的内存最大可使用4G。对于64系统可以使用更大的内存。但是64位的指针意味着更大的浪费,因为你的指针本身大了。浪费内存不算,更糟糕的是,更大的指针在主内存和缓存器(例如LLC, L1等)之间移动数据的时候,会占用更多的带宽。
 
Java 使用一个叫内存指针压缩的技术来解决这个问题。它的指针不再表示对象在内存中的精确位置,而是表示偏移量。这意味着32位的指针可以引用40亿个对象,而不是40亿个字节。最终,也就是说堆内存长到32G的物理内存,也可以用32bit的指针表示。
 
一旦你越过那个神奇的30-32G的边界,指针就会切回普通对象的指针,每个对象的指针都变长了,就会使用更多的CPU内存带宽,也就是说你实际上失去了更多的内存。事实上当内存到达40-50GB的时候,有效内存才相当于使用内存对象指针压缩技术时候的32G内存。
 
这段描述的意思就是说:即便你有足够的内存,也尽量不要超过32G,因为它浪费了内存,降低了CPU的性能,还要让GC应对大内存。


1TB内存的机器


32GB是ES一个内存设置限制,那如果你的机器有很大的内存怎么办呢?现在的机器内存普遍增长,你现在都可以看到有300-500GB内存的机器。

首先,我们建议编码使用这样的大型机
其次,如果你已经有了这样的机器,你有两个可选项:
    []你主要做全文检索吗?考虑给Elasticsearch 32G内存,剩下的交给Lucene用作操作系统的文件系统缓存,所有的segment都缓存起来,会加快全文检索。[/][]你需要更多的排序和聚合?你希望更大的堆内存。你可以考虑一台机器上创建两个或者更多ES节点,而不要部署一个使用32+GB内存的节点。仍然要 坚持50%原则,假设 你有个机器有128G内存,你可以创建两个node,使用32G内存。也就是说64G内存给ES的堆内存,剩下的64G给Lucene。[/]

如果你选择第二种,你需要配置cluster.routing.allocation.same_shard.host:true。这会防止同一个shard的主副本存在同一个物理机上(因为如果存在一个机器上,副本的高可用性就没有了)。


swapping是性能的坟墓


这是显而易见的,但是还是有必要说的更清楚一点,内存交换到磁盘对服务器性能来说是致命的。想想看一个内存的操作必须是快速的。
 
 
如果内存交换到磁盘上,一个100微秒的操作可能变成10毫秒,再想想那么多10微秒的操作时延累加起来。不难看出swapping对于性能是多么可怕。
 
最好的办法就是在你的操作系统中完全禁用swapping。这样可以暂时禁用:
swapoff -a
为了永久禁用它,你可能需要修改/etc/fstab文件,这要参考你的操作系统相关文档。
 
如果完全禁用swap,对你来说是不可行的。你可以降低swappiness 的值,这个值决定操作系统交换内存的频率。这可以预防正常情况下发生交换。但仍允许os在紧急情况下发生交换。
 
对于大部分Linux操作系统,可以在sysctl 中这样配置:
vm.swappiness = 1
备注:swappiness设置为1比设置为0要好,因为在一些内核版本,swappness=0会引发OOM(内存溢出)
 
最后,如果上面的方法都不能做到,你需要打开配置文件中的mlockall开关,它的作用就是运行JVM锁住内存,禁止OS交换出去。在elasticsearch.yml配置如下:
bootstrap.mlockall: true


参考地址:https://www.elastic.co/guide/en/elasticsearch/guide/current/heap-sizing.htm


为什么需要学习Docker?

being 发表了文章 0 个评论 3985 次浏览 2016-03-23 23:56 来自相关话题

Docker的历史和现状分析 曾经我们都在做一个称之为PAAS的平台  PAAS平台的范围和内容  ...查看全部


Docker的历史和现状分析


曾经我们都在做一个称之为PAAS的平台 
paas.png

PAAS平台的范围和内容 
fc.png

老一代的PAAS平台的局限性和困境 
    []主要提供应用的部署和托管[/][]针对应用开发者[/][]仅支持特定的IaaS基础技术 [/][]支持单种开发语言和框架 [/][]支持特定的服务,比如自定义的数据存储APIs [/][]没有很好的解决常用中间件的部署问题 [/][]难以解决应用和资源的隔离问题[/]
新一代的PAAS平台 新一代的云应用平台技术则实现全方位的应用生命周期管理,关注开放性、应用的可移植性和云间相互操作性, 其代表者包括Cloud Foundry, OpenShift, Docker, Heroku, MoPaaS等PaaS技术或服务,除了在第一代PaaS 技术对用户在实现应用交付的加速所提供的功能外,具备以下大多数特征: 多语言和框架:支持多语言和框架以及语言框架的扩展机制多服务:开放的核心服务以及服务的扩展机制多云和多IaaS技术:支持多种IaaS技术和多云的部署,包括公有云和私有云 
docker.png
dotCloud 不仅支持诸如 PHP、MySql 等传统技术框架,还包括 Node.js、MongoDB 等新兴技术。基于 dotCloud 提供 的开发工具和技术框架,你可以直接使用dotCloud的SDK编写代码和构建业务服务,并在联网的时候把这些代码 推送到云端,实现自动部署和测试。
dockerqy.png
2014.1 被BLACK DUCK 评选为2013年10大开源新项目“TOP 10 OPEN SOURCE ROOKIE OF THE YEAR” 2014.9获取4000万美元融资,此时已经累计融资6600万美元2015.4月获取了9500万美元融资, 已经确立了在第三代PaaS市场的主导地位。2015.6月DockerCon 2015大会上,Linux 基金会与行业巨头联手打造开放容器技术项目Open Container ProjectDocker生态圈 
stq.png
Docker已经收购了一系列创业公司,包括KiteMatic、Koality和Socketplane。完成新一轮投资后,Docker计划加 快产品推进计划,在本季度推出Docker Hub企业版,并且在接下来几个月在存储、网络和安全三个产品方向发力。
红帽在新的RHEL 7版本中增添了支持Docker的功能,IBM公开拥抱Docker和容器,亚马逊推出了EC2容器服 务,就连公认的竞争对手VMware也宣布支持Docker。
国内,不仅腾讯、阿里、百度、Ucloud、青云等云计算服务提供商已经将Docker用到实践,还有数家Docker初 创企业在获得了天使投资后正在辛勤的开发,准备将产品尽快发布到市场上。
腾讯云计算公司对外宣布成为中国首家支持Docker Machine的云计算厂商,并将自身定位于Docker基础设施的服 务商,迈出构建Docker“航运”基础设施的第一步。同时,在支持Docker Machine前提下,腾讯云也推出了常用 系统的标准版Docker镜像,以方便用户能够一键便捷创建容器。
Docker生态圈,2014-2015发展 
    []贡献者增长了183%;[/][]GitHub上关于Docker的项目增长了515%; [/][]Docker提供的工作机会增长了1720%; [/][]使用Docker构建的应用程序增长了934%(Boot2Docker下载量增 加了1456%); [/][]容器的下载量增加了18082%(这个是根据DockerHub上镜像下载 量来统计的)。[/][]Docker Hub业务的进展:用户数量增长至24万,Repo增长至 15万,Pull的数量超过5亿次[/]

Docker的技术原理介绍 

Docker就是虚拟化的一种轻量级替代技术。Docker的容器技术不依赖任何语言、框架或系统,可以将App变成一种 标准化的、可移植的、自管理的组件,并脱离服务器硬件在任何主流系统中开发、调试和运行
简单的说就是,在 Linux 系统上迅速创建一个容器(类似虚拟机)并在容器上部署和运行应用程序,并通过配置文件 可以轻松实现应用程序的自动化安装、部署和升级,非常方便。因为使用了容器,所以可以很方便的把生产环境和开发环境分开,互不影响,这是docker最普遍的一个玩法。
jsyl.png
Docker相关的核心技术 
hj.png
Docker相关的核心技术之cgroups 
Linux系统中经常有个需求就是希望能限制某个或者某些进程的分配资源。于是就出现了cgroups的概念, cgroup就是controller group ,在这个group中,有分配好的特定比例的cpu时间,IO时间,可用内存大小等。 cgroups是将任意进程进行分组化管理的Linux内核功能。最初由google的工程师提出,后来被整合进Linux内核中。
cgroups中的 重要概念是“子系统”,也就是资源控制器,每种子系统就是一个资源的分配器,比如cpu子系 统是控制cpu时间分配的。首先挂载子系统,然后才有control group的。比如先挂载memory子系统,然后在 memory子系统中创建一个cgroup节点,在这个节点中,将需要控制的进程id写入,并且将控制的属性写入, 这就完成了内存的资源限制。
cgroups 被Linux内核支持,有得天独厚的性能优势,发展势头迅猛。在很多领域可以取代虚拟化技术分割资源。 cgroup默认有诸多资源组,可以限制几乎所有服务器上的资源:cpu mem iops,iobandwide,net,device acess等
Docker相关的核心技术之LXC
LXC是Linux containers的简称,是一种基于容器的操作系统层级的虚拟化技术。借助于namespace的隔离机制和cgroup限额功能,LXC提供了一套统一的API和工具来建立和管理container。LXC跟其他操作系统层次的虚 拟化技术相比,最大的优势在于LXC被整合进内核,不用单独为内核打补丁
LXC 旨在提供一个共享kernel的 OS 级虚拟化方法,在执行时不用重复加载Kernel, 且container的kernel与host 共享,因此可以大大加快container的 启动过程,并显著减少内存消耗,容器在提供隔离的同时,还通过共享这 些资源节省开销,这意味着容器比真正的虚拟化的开销要小得多。 在实际测试中,基于LXC的虚拟化方法的IO和CPU性能几乎接近baremetal的性能。
虽然容器所使用的这种类型的隔离总的来说非常强大,然而是不是像运行在hypervisor上的虚拟机那么强壮仍具有 争议性。如果内核停止,那么所有的容器就会停止运行。
    []性能方面:LXC>>KVM>>XEN[/][]内存利用率:LXC>>KVM>>XEN [/]
    []隔离程度:XEN>>KVM>>LXC [/]
Docker相关的核心技术之AUFS 
什么是AUFS? AuFS是一个能透明覆盖一或多个现有文件系统的层状文件系统。 支持将不同目录挂载到同一 个虚拟文件系统下,可以把不同的目录联合在一起,组成一个单一的目录。这种是一种虚拟的文件系统,文 件系统不用格式化,直接挂载即可。
Docker一直在用AuFS作为容器的文件系统。当一个进程需要修改一个文件时,AuFS创建该文件的一个副本。 AuFS可以把多层合并成文件系统的单层表示。这个过程称为写入复制( copy on write )。
AuFS允许Docker把某些镜像作为容器的基础。例如,你可能有一个可以作为很多不同容器的基础的CentOS 系统镜像。多亏AuFS,只要一个CentOS镜像的副本就够了,这样既节省了存储和内存,也保证更快速的容器部署。
使用AuFS的另一个好处是Docker的版本容器镜像能力。每个新版本都是一个与之前版本的简单差异改动, 有效地保持镜像文件最小化。但,这也意味着你总是要有一个记录该容器从一个版本到另一个版本改动的审计跟踪。
build.png
Docker全生命周期开发模式 
zq.png
Docker正在迅速改变云计 算领域的运作规则,并彻 底颠覆云技术的发展前景 从持续集成/持续交付到微 服务、开源协作乃至 DevOps,Docker一路走来 已经给应用程序开发生命 周期以及云工程技术实践 带来了巨大变革。

Docker的基本概念

Docker Image 
    []Docker Image是一个极度精简版的Linux程序运行环境,比如vi这种基本 的工具没有,官网的Java镜像包括的东西更少,除非是镜像叠加方式的, 如Centos+Java7[/][]Docker Image是需要定制化Build的一个“安装包”,包括基础镜像+应 用的二进制部署包[/][]Docker Image内不建议有运行期需要修改的配置文件[/][]Dockerfile用来创建一个自定义的image,包含了用户指定的软件依赖等。当前目录下包含Dockerfile,使用命令build来创建新的image[/][]Docker Image的最佳实践之一是尽量重用和使用网上公开的基础镜像[/]
Docker Container 
    []Docker Container是Image的实例,共享内核[/][]Docker Container里可以运行不同Os的Image,比如Ubuntu的或者Centos[/][]Docker Container不建议内部开启一个SSHD服务,1.3版本后新增了docker exec命令进入容器排查问题。[/][]Docker Container没有IP地址,通常不会有服务端口暴露,是一个封闭的“盒子/沙箱”[/]
Docker Container的生命周期 
dbzq.png
Docker Daemon 
    []Docker Daemon是创建和运行Container的Linux守护进程,也是Docker 最主要的核心组件[/][]Docker Daemon 可以理解为Docker Container的Container[/][]Docker Daemon可以绑定本地端口并提供Rest API服务,用来远程访问和控制[/]
Docker Registry/Hub 
hub.png
Docker之所以这么吸引人,除了它的新颖的技术外,围绕官方Registry(Docker Hub)的生态圈也是相当吸引人眼球的地方。在Docker Hub上你可以很轻松下载 到大量已经容器化好的应用镜像,即拉即用。这些镜像中,有些是Docker官方维 护的,更多的是众多开发者自发上传分享的。而且 你还可以在Docker Hub中绑定 你的代码托管系统(目前支持Github和Bitbucket)配置自动生成镜像功能,这样 Docker Hub会在你代码更新时自动生成对应的Docker镜像。
问题点:
Docker Hub是dotCloud公司私有的国内曾有公司试图提供镜像服务,但被禁止 目前国内只有一个DaoCloud提供代理缓存服务
Docker Hub - our SaaS service for sharing and managing your application stacks. 
gx.png
    []Linux基本操作和基本知识:磁盘、文件、日志、用户、权限、安全、网络,建议centos/redhat[/][]虚机相关技能:vmware workstation/virtbox熟练使用,虚机clone,组网,host-only网络,nat网络等熟练操作[/]


文章分享:炼数为金-Leader-us 


Flume日志收集分层架构应用实践

Ansible 发表了文章 0 个评论 7259 次浏览 2016-03-19 18:54 来自相关话题

Flume作为一个日志收集工具,非常轻量级,基于一个个Flume Agent,能够构建一个很复杂很强大的日志收集系统,它的灵活性和优势,主要体现在如下几点: []模块化设计:在其Flume Agent内部可以定义三种组件:Source、Channe ...查看全部
Flume作为一个日志收集工具,非常轻量级,基于一个个Flume Agent,能够构建一个很复杂很强大的日志收集系统,它的灵活性和优势,主要体现在如下几点:
    []模块化设计:在其Flume Agent内部可以定义三种组件:Source、Channel、Sink[/][]组合式设计:可以在Flume Agent中根据业务需要组合Source、Channel、Sink三种组件,构建相对复杂的日志流管道[/][]插件式设计:可以通过配置文件来编排收集日志管道的流程,减少对Flume代码的侵入性[/][]可扩展性:我们可以根据自己业务的需要来定制实现某些组件(Source、Channel、Sink)[/][]支持集成各种主流系统和框架:像Hadoop、HBase、Hive、Kafka、ElasticSearch、Thrift、Avro等,都能够很好的和Flume集成[/][]高级特性:Failover、Load balancing、Interceptor等[/]

为什么要对Flume日志收集系统进行分层设计

基于Flume设计实现分层日志收集系统,到底有什么好处呢?我们可以先看一下,如果不分层,会带来哪些问题:
    []如果需要通过Kafka去缓冲上游基于Flume收集而构建的日志流,对于数据平台内部服务器产生的数据还好,但是如果日志数据是跨业务组,甚至是跨部门,那么就需要将Kafka相关信息暴露给外部,这样对Kafka的访问便不是数据平台内部可控的[/][]如果是外部日志进入平台内部HDFS,这样如果需要对Hadoop系统进行升级或例行维护,这种直连的方式会影响到上游部署Flume的日志流的始端日志收集服务[/][]如果数据平台内部某些系统,如Kafka集群、HDFS集群所在节点的机房位置变更,数据迁移,会使得依赖日志数据的外部系统受到不同程度的影响,外部系统需要相关开发或运维人员参与进来[/][]由于收集日志的数据源端可能是外部一些服务器(多个单个的节点),一些业务集群(相互协作的多节点组),也可能是内部一些提供收集服务的服务节点,这些所有的服务器上部署的Flume Agent都处于一层中,比较难于分组管理[/][]由于所有数据源端Flume Agent收集的日志进入数据平台的时候,没有一个统一的类似总线的组件,很难因为某些业务扩展而独立地去升级数据平台内部的接收层服务节点,可能为了升级数据平台内部某个系统或服务而导致影响了其他的接收层服务节点[/]
通过下图我们可以看出,这种单层日志收集系统设计,存在太多的问题,而且系统或服务越多导致整个日志收集系统越难以控制:
flume-without-layered-architecture.png
上图中,无论是外部还是内部,只要部署了Flume Agent的节点,都直接同内部的Kafka集群和Hadoop集群相连,所以在数据平台内部只能尽量保持Kafka和Hadoop集群正常稳定运行,也要为外部日志收集Flume Agent的数据流量的陡增和异常变化做好防控准备。再者,如需停机维护或者升级某一个集群,可能都需要通知外部所有Flume Agent所在节点的业务方,做好应对(停机)准备。
接着看,如果我们基于Flume使用分层的方式来设计日志收集系统,又有哪些优势,如下图所示:
flume-with-layered-architecture.png
上图中,Flume日志收集系统采用两层架构设计:第一层(L1)是日志收集层,第二层(L2)是数据平台缓冲层(汇聚层)。通过这种方式,使得日志收集系统有如下特点:
    []针对数据平台外部的业务系统,根据需要分析的数据业务类型进行分组,属于同一种类型的业务日志,在数据平台前端增加了一个Flume汇聚层节点组,该组节点只影响到它对应的L1层的业务数据[/][]如果Hadoop集群、Kafka需要停机维护或升级,对外部L1层Flume Agent没有影响,只需要在L2层做好数据的接收与缓冲即可,待维护或升级结束,继续将L2层缓存的数据导入到数据存储系统[/][]如果外部某个类型的业务日志数据节点需要扩容,直接在L1层将数据流指向数据平台内部与之相对应的L2层Flume Agent节点组即可,能够对外部因业务变化发生的新增日志收集需求,进行快速地响应和部署[/][]对于数据平台内部,因为收集日志的节点非常可控,可以直接通过L1层Flume Agent使日志数据流入HDFS或Kafka,当然为了架构统一和管理,最好也是通过L2层Flume Agent节点组来汇聚/缓冲L1层Flume Agent收集的日志数据[/]
通过上面分析可见,分层无非是为了使的日志数据源节点的Flume Agent服务与数据平台的存储系统(Kafka/HDFS)进行解耦,同时能够更好地对同类型业务多节点的日志流进行一个聚合操作,并分离开独立管理。另外,可以根据实际业务需要,适当增加Flume系统分层,满足日志流数据的汇聚需要。

应用整体架构

我们看一下,Flume日志收集系统,在我们这个示例应用中处于一个什么位置,我简单画了一下图,加了一些有关数据处理和分析的节点/组件,如下图所示:
flume-platform-overall-architecture.png
这里,简单了解一下上图即可,由于日志收集在整个应用系统中是很重要的一个环节,所以必须保证日志收集系统设计的可靠、可用、灵活、稳定,通过上面在日志收集系统收集日志之后,数据平台所做的大量分析处理,来凸显日志收集系统的重要性,这里其他内容不做过多说明。

Flume分层架构实践

这里,我们主要以实时收集日志为例,说明如何构建一个相对复杂的Flume分层日志收集系统。首先,简要说明一下日志收集需求:
    []手机客户端上报的用户行为事件(App User Event),通过数据平台内部定义好的接口格式,从Nginx日志里面实时流入数据平台,这对应于Flume日志收集系统L1层[/][]通过组织各种活动,来推广某些App的产品特性,会定向向用户推送通知,单独使用推送点击(Push Click)Agent来收集这些点击行为数据[/][]App所依赖的一些基础内容,会以服务的形式开放给外部第三方调用,对于由第三方App带来的用户的行为点击事件(Thirdparty Click),单独使用L1层Flume Agent进行收集[/][]第三方会在App中根据不同的内容,投放广告(Ad),对于广告曝光/点击行为的数据,与上述提到的数据收集单独分离出来,因为该日志数据后期可能会大规模推广,会有爆发性增长,在L1层进行收集[/][]在L2层主要是汇聚或缓冲L1层流入的日志数据[/][]同时,为了防止L2层Flume Agent因为故障或例行停机维护等,所以使用了Flume的Failover特性,亦即L1层每一个Sink同时指向L2层的2个相同的Flume Agent[/][]L1层的Flume Agent在收集日志的过程中应该不允许在Channel中累积过多数据(但是还要防止数据流速过慢导致内存Channel数据溢出),还要能够尽量降低读写磁盘的开销,所以使用内存类型的Channel[/][]L2层为了保证数据能够可靠地缓冲(在允许的一段时间内累积保存数据),如Hadoop或Kafka故障停机或停机维护升级,采用文件类型的Channel,还要尽量调大容量,也不能因为多应用共享磁盘而造成数据处理延迟,所以对于不同的Channel分别使用独立的磁盘[/]
详细分层设计如下图所示:
flume-layered-architecture.png
上图是从实际的整个数据平台中拿出来一部分,简单便于解释说明。有关上图中所涉及到的Flume Agent的配置详情,下面会根据Flume分层的结构(L1层、L2层)来详细配置说明。由于L1层的10.10.1.101和10.10.1.102节点上部署的Flume Agent是对称的,所以下面只拿出其中一个来说明配置,不同的是,这两个节点上Flume Agent的Sink使用Failover功能,分别交叉指向L2层Flume Agent,也能够起到一定的负载均衡的作用。

上游Flume日志收集层

下面,分别针对10.10.1.101节点上的3个Flume Agent的配置内容,分别进行说明如下:
    []L1层:App用户行为事件(App User Event)日志收集[/]
Flume Agent名称为a1,使用Exec Source、Memory Channel、Avro Sink,这里我们的Nginx日志文件始终指向/data/nginx/logs/app_user_events.log,即使日切或小时切文件,使用tail -F就能保证日志内容都被收集。具体配置内容如下所示:
a1.sources = s1a1.channels = mc1a1.sinks = k1 k2# Configure sourcea1.sources.s1.channels = mc1a1.sources.s1.type = execa1.sources.s1.command = tail -F /data/nginx/logs/app_user_events.log# Configure channela1.channels.mc1.type = memorya1.channels.mc1.transactionCapacity = 50000a1.channels.mc1.capacity = 100000# Configure sinksa1.sinks.k1.channel = mc1a1.sinks.k1.type = avroa1.sinks.k1.hostname = 10.10.1.122a1.sinks.k1.port = 44446a1.sinks.k2.channel = mc1a1.sinks.k2.type = avroa1.sinks.k2.hostname = 10.10.1.121a1.sinks.k2.port = 44446# Configure failovera1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2a1.sinkgroups.g1.processor.type = failovera1.sinkgroups.g1.processor.priority.k1 = 9a1.sinkgroups.g1.processor.priority.k2 = 7a1.sinkgroups.g1.processor.maxpenalty = 10000
    []L1层:推送点击事件(Push Click Event)日志收集[/]
a2.sources = s2a2.channels = mc2a2.sinks = k3 k4# Configure sourcea2.sources.s2.channels = mc2a2.sources.s2.type = execa2.sources.s2.command = tail -F /data/nginx/logs/push_click_events.log# Configure channela2.channels.mc2.type = memorya2.channels.mc2.capacity = 50000a2.channels.mc2.transactionCapacity = 100000# Configure sinksa2.sinks.k3.channel = mc2a2.sinks.k3.type = avroa2.sinks.k3.hostname = 10.10.1.121a2.sinks.k3.port = 44447a2.sinks.k4.channel = mc2a2.sinks.k4.type = avroa2.sinks.k4.hostname = 10.10.1.122a2.sinks.k4.port = 44447# Configure failovera2.sinkgroups = g2a2.sinkgroups.g2.sinks = k3 k4a2.sinkgroups.g2.processor.type = failovera2.sinkgroups.g2.processor.priority.k3 = 9a2.sinkgroups.g2.processor.priority.k4 = 7a2.sinkgroups.g2.processor.maxpenalty = 10000
    []L1层:第三方点击事件(Thirdparty Click Event)日志收集[/]
第三方点击事件通过统一的接口上传数据,那么配置起来也比较容易,如下所示:
a3.sources = s3a3.channels = mc3a3.sinks = k5 k6# Configure sourcea3.sources.s3.channels = mc3a3.sources.s3.type = execa3.sources.s3.command = tail -F /data/nginx/logs/thirdparty_click_events.log# Configure channela3.channels.mc3.type = memorya3.channels.mc3.transactionCapacity = 50000a3.channels.mc3.capacity = 100000# Configure sinksa3.sinks.k5.channel = mc3a3.sinks.k5.type = avroa3.sinks.k5.hostname = 10.10.1.121a3.sinks.k5.port = 44446a3.sinks.k6.channel = mc3a3.sinks.k6.type = avroa3.sinks.k6.hostname = 10.10.1.122a3.sinks.k6.port = 44446# Configure failovera3.sinkgroups = g3a3.sinkgroups.g3.sinks = k5 k6a3.sinkgroups.g3.processor.type = failovera3.sinkgroups.g3.processor.priority.k5 = 9a3.sinkgroups.g3.processor.priority.k6 = 7a3.sinkgroups.g3.processor.maxpenalty = 10000
    []L1层:广告点击事件(Ad Click Event)日志收集[/]
广告点击事件日志收集配置,如下所示:
a4.sources = s4a4.channels = mc4a4.sinks = k7 k8# Configure sourcea4.sources.s4.channels = mc4a4.sources.s4.type = execa4.sources.s4.command = tail -F /data/nginx/logs/ad.log# Configure channela4.channels.mc4.type = memorya4.channels.mc4.transactionCapacity = 50000a4.channels.mc4.capacity = 100000# Configure sinksa4.sinks.k7.channel = mc4a4.sinks.k7.type = avroa4.sinks.k7.hostname = 10.10.1.121a4.sinks.k7.port = 44448a4.sinks.k8.channel = mc4a4.sinks.k8.type = avroa4.sinks.k8.hostname = 10.10.1.122a4.sinks.k8.port = 44448# Configure failovera4.sinkgroups = g4a4.sinkgroups.g4.sinks = k7 k8a4.sinkgroups.g4.processor.type = failovera4.sinkgroups.g4.processor.priority.k7 = 10a4.sinkgroups.g4.processor.priority.k8 = 8a4.sinkgroups.g4.processor.maxpenalty = 10000

下游Flume日志收集汇聚层

    []L2层:App用户事件+推送点击事件日志合并收集[/]
这种业务需求是:把App用户事件和推送点击事件合并写入文件,最后都会写入HDFS,从而进一步在Hive中进行离线分析;同时又要使这两种事件分别独立地走实时计算的流程,App用户事件实时计算流程需要实时统计用户使用App过程中行为特征,而推送点击事件实时计算需要针对某一次活动来实时分析和展示用户的参与情况。具体配置内容,如下所示:
a1.sources = s1 s2a1.channels = fc1 fc2 fc3a1.sinks = kk1 fk2 kk3# Configure source:# Configure app user event source: s1 -> fc1+fc2a1.sources.s1.channels = fc1 fc2a1.sources.s1.type = avroa1.sources.s1.bind = 10.10.1.121a1.sources.s1.port = 44446a1.sources.s1.threads = 8# Configure source# Configure push click event source: s2 -> fc2+fc3a1.sources.s2.channels = fc2 fc3a1.sources.s2.type = avroa1.sources.s2.bind = 10.10.1.122a1.sources.s2.port = 44447a1.sources.s2.threads = 4# Configure file channel(/data1)# Configure app user event channel: fc1 ->kk1a1.channels.fc1.type = filea1.channels.fc1.checkpointDir = /data1/flume/channels/app_user_event/checkpointa1.channels.fc1.useDualCheckpoints = truea1.channels.fc1.backupCheckpointDir = /data1/flume/channels/app_user_event/backupa1.channels.fc1.dataDirs = /data1/flume/channels/app_user_event/dataa1.channels.fc1.transactionCapacity = 100000a1.channels.fc1.capacity = 500000a1.channels.fc1.checkpointInterval = 60000a1.channels.fc1.keep-alive = 5a1.channels.fc1.maxFileSize = 5368709120# Configure file channel(/data2)# Configure app user event + push click event: fc2 - > fk2a1.channels.fc2.type = filea1.channels.fc2.checkpointDir = /data2/flume/channels/offline_file_event/checkpointa1.channels.fc2.useDualCheckpoints = truea1.channels.fc2.backupCheckpointDir = /data2/flume/channels/offline_file_event/backupa1.channels.fc2.dataDirs = /data2/flume/channels/offline_file_event/dataa1.channels.fc2.transactionCapacity = 100000a1.channels.fc2.capacity = 500000a1.channels.fc2.checkpointInterval = 60000a1.channels.fc2.keep-alive = 5a1.channels.fc2.maxFileSize = 5368709120# Configure file channel(/data3)# Configure push click channel: fc3 ->kk3a1.channels.fc3.type = filea1.channels.fc3.checkpointDir = /data3/flume/channels/push_click_event/checkpointa1.channels.fc3.useDualCheckpoints = truea1.channels.fc3.backupCheckpointDir = /data3/flume/channels/push_click_event/backupa1.channels.fc3.dataDirs = /data3/flume/channels/push_click_event/dataa1.channels.fc3.transactionCapacity = 100000a1.channels.fc3.capacity = 500000a1.channels.fc3.checkpointInterval = 60000a1.channels.fc3.keep-alive = 5a1.channels.fc3.maxFileSize = 5368709120# Configure sink: RealtimeMessageSink(app user event)a1.sinks.kk1.type = org.shirdrn.flume.sink.RealtimeMessageSinka1.sinks.kk1.channel = fc1a1.sinks.kk1.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092a1.sinks.kk1.topic = json_user_eventa1.sinks.kk1.serializer.class = kafka.serializer.StringEncodera1.sinks.kk1.producer.type = asynca1.sinks.kk1.message.send.max.retries = 3a1.sinks.kk1.client.id = flume_app_user_event_2_1a1.sinks.kk1.event.decoder.count = 8a1.sinks.kk1.output.stat.event.batch.size = 2000a1.sinks.kk1.event.decoder.queue.size = 1000# Configure sink: RichRollingFileSinka1.sinks.fk2.type = org.shirdrn.flume.sink.RichRollingFileSinka1.sinks.fk2.channel = fc2a1.sinks.fk2.batchSize = 100a1.sinks.fk2.serializer = TEXTa1.sinks.fk2.sink.rollInterval = 60a1.sinks.fk2.sink.directory = /data/flume/rolling_filesa1.sinks.fk2.sink.file.prefix = eventa1.sinks.fk2.sink.file.suffix = .loga1.sinks.fk2.sink.file.pattern = yyyyMMddHHmmss# Configure sink: RealtimeMessageSink(push click)a1.sinks.kk3.type = org.shirdrn.flume.sink.RealtimeMessageSinka1.sinks.kk3.channel = fc3a1.sinks.kk3.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092a1.sinks.kk3.topic = json_push_click_eventa1.sinks.kk3.serializer.class = kafka.serializer.StringEncodera1.sinks.kk3.producer.type = asynca1.sinks.kk3.message.send.max.retries = 3a1.sinks.kk3.client.id = flume_push_click_2_1a1.sinks.kk3.event.decoder.count = 4a1.sinks.kk3.output.stat.event.batch.size = 2000a1.sinks.kk3.event.decoder.queue.size = 1000
上面,可以看到我们自己实现的org.shirdrn.flume.sink.RealtimeMessageSink,该Sink主要是使Flume收集的日志写入Kafka中,在Flume 1.5.0版本中还没有内置实现,所以我们自己实现了,并在其中加入了适合我们业务的处理逻辑,比如,将Nginx日志记录行解析,然后根据实时计算需要,过滤掉不需要进入Kafka(最终在Storm集群中处理)事件数据,最后转成JSON字符串的格式,写入到Kafka中的Topic里。通过上面的配置也可以看出,可以配置很多参数,例如解析线程数、队列大小等。
由于我们需要将写入本地文件系统的文件按照我们自己的方式来定义,所以基于Flume内置的file_roll实现进行修改,实现了自己的org.shirdrn.flume.sink.RichRollingFileSink,该Sink主要是对文件名字符串进行格式化,能够通过文件名来获取到文件生成的时间(人类可读格式)。
    []L2层:广告点击事件日志收集[/]
上面的图中,L1层可以根据需要扩展到更多的服务器节点,在L2层根据需要进行汇聚/缓冲,具体配置内容如下所示:
a2.sources = s3a2.channels = fc4a2.sinks = kk4# Configure source: s3 -> fc4a2.sources.s3.channels = fc4a2.sources.s3.type = avroa2.sources.s3.bind = 10.10.1.121a2.sources.s3.port = 44448a2.sources.s3.threads = 2# Configure channel(/data4)# Configure Ad channel: fc4 ->kk4a2.channels.fc4.type = filea2.channels.fc4.checkpointDir = /data4/flume/channels/ad/checkpointa2.channels.fc4.useDualCheckpoints = truea2.channels.fc4.backupCheckpointDir = /data4/flume/channels/ad/backupa2.channels.fc4.dataDirs = /data4/flume/channels/ad/dataa2.channels.fc4.transactionCapacity = 100000a2.channels.fc4.capacity = 500000a2.channels.fc4.checkpointInterval = 60000a2.channels.fc4.keep-alive = 5a2.channels.fc1.maxFileSize = 5368709120# Configure sinks: RealtimeAdKafkaSinka2.sinks.kk4.type = org.shirdrn.flume.sink.RealtimeAdKafkaSinka2.sinks.kk4.channel = fc4a2.sinks.kk4.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092a2.sinks.kk4.topic = json_ad_eventa2.sinks.kk4.serializer.class = kafka.serializer.StringEncodera2.sinks.kk4.producer.type = asynca2.sinks.kk4.message.send.max.retries = 3a2.sinks.kk4.client.id = flume_ad_2_1a2.sinks.kk4.event.decoder.count = 4a2.sinks.kk4.output.stat.event.batch.size = 2500a2.sinks.kk4.event.decoder.queue.size = 5000

实践总结

这里我们简单总结一些内容,如下所示:
    []Flume监控[/]
简单一点的监控,直接在启动的时候,开启一个Web端口,通过端口来获取Flume Agent服务的一些相关数据,命令类似:
bin/flume-ng agent -n a1 -c conf -f conf/config.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
这样便可以在Flume Agent服务节点上,浏览Web端口34545来查看,数据以JSON格式表示,比较重要的一些元数据,如channel容量、当前使用量等等,通过这些数据可以了解当前Flume的工作状态,是否需要升级扩容等等。 
另外,也可以通过Ganglia来收集并分析Flume Agent服务运行状态,能够更加详细地展示Flume Agent服务的状态,因为Ganglia配置相对复杂,这里就不做过多解释,感兴趣可以尝试一下。
    []Flume内存调优[/]
因为Flume使用Java实现的,所以就会遇到有关JVM调优的问题,这个也比较容易。默认情况下,Flume Agent进程的堆内存设置比较小,在日志数据量比较大的情况下就需要修改并调试这些参数,以满足业务需要。设置JVM相关参数,可以修改conf/flume-env.sh文件(也可以直接在启动Flume Agent服务时指定JVM选项参数),例如修改JAVA_OPTS变量,示例如下所示:
JAVA_OPTS="-server -Xms1024m -Xmx4096m -Dcom.sun.management.jmxremote -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=4 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/data/flume/logs/gc-ad.log"
这样,可以方便地修改GC策略,一般由于Flume实时收集日志比较注重实时性,希望能够快速地响应,尽量减少GC导致暂停业务线程被挂起的时间,所以可以将GC设置为ParNew+CMS策略。将GC日志输出,在一定程度上能够更加方便地观察Flume Agent服务运行过程中JVM GC的详细情况,通过诊断来优化服务运行。
    []下游L2层接收消息调优[/]
通常,在开始部署Flume日志收集系统时,上游L1层服务节点比较少,在L2层汇聚时使用默认的配置可能效果也会不错,但是如果L1层Flume Agent越来越多,就能看到L2层处理速度慢下来。L2层的Flume Agent服务一般会远远小于L1层Flume Agent服务数,这种情况下,如果L2层Flume Agent服务使用Avro Source,可以调大Avro接收线程数,示例如下:
a1.sources.s1.type = avroa1.sources.s1.bind = 10.10.1.121a1.sources.s1.port = 44446a1.sources.s1.threads = 8
上面默认情况下threads参数的值1,可以将该值调大,否则的话,L1层就会堆积日志记录,严重可能导致数据丢失。
    []Flume处理业务逻辑约束[/]

Flume的易扩展性使得我们可以根据自己的业务特点来实现一些组件,那么我们在将实际业务逻辑掺杂进Flume中时,需要考虑是否非得必须这么做?如果这么做是否会影响Flume实时传输日志的速度和效率? 
Flume作为一个轻量级的日志收集工具,个人认为最好将相对复杂的业务逻辑(尤其是需要与一些存储系统,如MySQL、Redis交互时)后移,放在Storm集群中去处理,或者自己实现的业务处理集群中,而Flume就让它去做其擅长的事情——路由消息。
当然,有些业务场景可能必须在Flume日志收集层去做,如根据原始非结构化的消息,无法控制不同类型的消息路由到不同的目的地,那么可能需要在收集层做一个简单的解析或格式化,实际上这是在
Flume层做了一个简单的日志分发。无论如何,如果想在Flume层插入业务逻辑处理,尽量避免过于复杂的处理而影响整个日志传输速度,如果后端有实时推荐需求,日志中事件的实时性大大延迟,就会影响实施个性化推荐。


分享阅读原文:http://shiyanjun.cn/archives/1497.html
文章作者:时延军


下一代分布式消息系统Apache Kafka

chris 发表了文章 0 个评论 3317 次浏览 2016-03-18 01:12 来自相关话题

一、开篇思考 消息语义:持久性 安全 淘汰 交付 路由 批量 消息过滤 排队标准 已收通知 IO: ...查看全部
ka1.png


一、开篇思考


ka2.png

消息语义:
持久性 安全 淘汰 交付 路由 批量 消息过滤 排队标准 已收通知
ka3.png

IO:
ka4.png

机械磁盘慢
现代操作系统优化 
    []使用read-ahead和write-behind技术,预读取成块数据,将微小琐碎的逻辑写入组织成一次较大的物理写入[/][]常用空闲内存用作磁盘缓存[/][]线性的访问磁盘,很多时候比随机的内存访问快得多[/]

JVM 2个事实
    []Java对象占用空间非常大,差不多要存储的数据的两倍甚至更高[/][]随着堆中数据量的增加,GC(垃圾回收)变得越来越困难[/]

JVM 1个假设
    []在64G内存的机器上,不得不使用到50G~56G的内存空间[/][]当系统重启的时候,又必须要将数据刷到内存中(每分钟1GB内存),即使冷刷新(在使用数据的时候发现没有再刷到内存)也会导致最初的时候性能非常慢[/]
图解零拷贝
lkb.jpg
lkb2.png
lkb3.png
设计常量复杂度的磁盘操纵B树的复杂度是O(logN),通常被认为就是常量复杂度但对于磁盘操作来说并非如此:磁盘进行一次搜索需要10ms,每个磁盘在同一时间只能进行一次搜索,并发处理困难 对树结构的性能的观察结果表明:其性能往往随着数据的增长而线性下降,数据增长一倍,速度就会降低一倍[list=1][] 线性访问减小磁盘寻道[/][] 压缩数据减小IO压力[/][] 使用零拷贝(zero copy)技术[/]

二、Kafka特性 & 原理

一个高性能分布式(Distribution),可分区(Partitioned),可备份(Replicated),基于Zookeeper协调的发布/订阅消息队列系统
快速持久化
以时间复杂度为O(1)的方式提供消息持久化能力
高吞吐率
在一台普通的服务器上可以达到10W/s级的消息处理
分布式负载均衡
Broker/Producer/Consumer 都支持分布式和负载均衡
水平扩展
支持在线平滑水平扩展
kafka名词解释:[list=1][]Broker & Controller & Producer & Consumer & Consumer Group 2. Topic & Partition & Segment & Offset[/][]Replication & Replication Leader & Replication Follower[/][]Assigned Replications & Preferred Replication[/][]Message & Message Set[/]
wltp.png
主题解析
topic.png
部分文件实现
sfile.png
Broker & Topic Partition
broker.png
消息交付
message.png
zk.png
同步语义[list=1][]每一个Broker节点必须维护和Zookeeper的连接Session,Zookeeper通过心跳机 制检查每个结点的连接 [/][]Follower Broker节点必须及时同步Leader Broker节点,不能落后Leader Broker 节点太多 [/]副本和提交log[list=1][]当且仅当Message被所有的Replication写入到Log中,才算"Committed"[/][]只有Committed的Message,才会被Consumer读取  [/]Persistence & Efficiency [list=1][]每一个Follower都只从Leader Pull数据[/][]每一个Follower收到数据后,立即向Leader发送ACK,而非等到数据写入Log后[/]Consumer  & Partition [list=1][]同一Consumer Group中Consumer竞争Partition,即队列语义 [/][]不同Consumer Group中Consumer共享Partition,即主题语义 [/]消息传递语义[list=1][]At most once - 消息可能会丢,但绝不会重复传输[/][]At least once - 消息绝不会丢,但可能会重复传输[/][]Exactly once - 每一条消息肯定会被传输一次且仅传输一次,理想状态 [/]Leader Election算法 [list=1][]Leader Election[/][]In-Sync Replicas Approach VS. Majority Vote[/][]某一个Partition所有Replication不工作[/]Controller思考 [list=1][]选举Broker Leader最简单最直观的方案[/][]该选举Broker Leader的方案引入了哪些问题 [/]Partition 思考[list=1][]Partition的数据结构,逻辑上/物理上的存储结构[/][]Broker&Topic&Partition关系[/][]Partition&Consumer&Consumer Group关系 [/]Producer思考[list=1][]Load balancing[/][]Asynchronous send[/]Consumer思考[list=1][]Push VS. Pull[/][]Offset归属,存储[/][]触发Partition Rebalance的条件及问题 [/][]减轻了Broker设计的复杂度[/]

三、Kafka设计 & 实现

Broker内部
brokerint.png
Broker故障转移
brokerfa.png
插件删除Topic
datopic.png
LeaderAndIsrRequest 响应
request.png
Partition重新分配案例
partionfp.png
Replication工具[list=1][]Topic Tool[/][]Replica verification Tool[/][]Preferred Replica Leader Election Tool[/][]Kafka Reassign Partitions Tool[/][]State Change Log Merge Tool[/]
Preferred Replica Leader Election Tool
在8个Broker的kafka集群上,创建topic为my_topic,replication-factor为3,partition-factor为8,Partition/Replica分布
etool.png

手动停止Broker 1/2/4,Partition/Replica分布
rptool.png

手动启动Broker 1,Partition/Replica分布
p1tool.png

运行该工具后,Partition/Replica分布
tttool.png

手动启动Broker 2/4,Partition/Replica分布
24tool.png

运行该工具后,Partition/Replica分布
jgtool.png

将Topic的所有Partition重新分配到Broker 4/5/6/7上,初始Partition/Replica分布
4567tool.png

将Topic的所有Partition重新分配到Broker 4/5/6/7上,最终Partition/Replica分布
zztool.png


分享作者:王先明


基于Zookeeper的服务注册与发现案例分析

空心菜 发表了文章 0 个评论 4253 次浏览 2016-03-16 17:22 来自相关话题

背景 大多数系统都是从一个单一系统开始起步的,随着公司业务的快速发展,这个单一系统变得越来越庞大,带来几个问题: []随着访问量的不断攀升,纯粹通过提升机器的性能来已经不能解决问题,系统无法进行有效的水平扩展[/][]维护 ...查看全部


背景


大多数系统都是从一个单一系统开始起步的,随着公司业务的快速发展,这个单一系统变得越来越庞大,带来几个问题:
    []随着访问量的不断攀升,纯粹通过提升机器的性能来已经不能解决问题,系统无法进行有效的水平扩展[/][]维护这个单一系统,变得越来越复杂[/][]同时,随着业务场景的不同以及大研发的招兵买马带来了不同技术背景的工程师,在原有达达Python技术栈的基础上,引入了Java技术栈。[/]

如何来解决这些问题?业务服务化是个有效的手段来解决大规模系统的性能瓶颈和复杂性。通过系统拆分将原有的单一庞大系统拆分成小系统,它带来了如下好处:
    []原来系统的压力得到很好的分流,有效地解决了原先系统的瓶颈,同时带来了更好的扩展性[/][]独立的代码库,更少的业务逻辑,系统的维护性得到极大的增强[/]

同时,也带来了一系列问题:
    []随着系统服务的越来越多,如何来管理这些服务?[/][]如何分发请求到提供同一服务的多台主机上(负载均衡如何来做)[/][]如果提供服务的Endpoint发生变化,如何将这些信息通知服务的调用方?[/]

最初的解决方案

Linkedin的创始人里德霍夫曼曾经说过:
成立一家初创公司就像把自己从悬崖上扔下来,在降落过程中去组装一架飞机。
这对于初创公司达达也是一样的,业务在以火箭般的速度发展着。技术在业务发展中作用就是保障业务的稳定运行,快速地“组装一架飞机”。所以,在业务服务化的早期,我们采用了Nginx+本地hosts文件的方式进行内部服务的注册与发现,架构图如下:
zknginx.png
各系统组件角色如下:[list=1][]服务消费者通过本地hosts中的服务提供者域名与Nginx的IP绑定信息来调用服务[/][]Nginx用来对服务提供者提供的服务进行存活检查和负载均衡[/][]服务提供者提供服务给服务消费者访问,并通过Nginx来进行请求分发[/]这在内部系统比较少,访问量比较小的情况下,解决了服务的注册,发现与负载均衡等问题。但是,随着内部服务越来愈多,访问量越来越大的情况下,该架构的隐患逐渐暴露出来:
    []最明显的问题是Nginx存在单点故障(SPOF),同时随着访问量的提升,会成为一个性能瓶颈[/][]随着内部服务的越来越多,不同的服务消费方需要配置不同的hosts,很容易在增加新的主机时忘记配置hosts导致服务不能调用问题,增加了运维负担[/][]服务的配置信息分散在各个主机hosts中,难以保持一致性,不便于服务的管理[/][]服务主机的发布和下线需要手工的修改Nginx upstream配置,修改的配置需要上线,不利于服务的快速部署[/]

如何解决

在谈如何来解决之前,现梳理一下服务注册与发现的目标:
    []服务的注册信息应该统一保存,方便于服务的管理[/][]自动通过服务的名称去发现服务,而不必了解这个服务提供的end-point到底是哪台主机[/][]支持服务的负载均衡及fail-over[/][]增加或移除某个服务的end-point时,对于服务的消费者来说是透明的[/][]支持Python和Java[/]
备选方案一: DNS
DNS作为服务注册发现的一种方案,它比较简单。只要在DNS服务上,配置一个DNS名称与IP对应关系即可。定位一个服务只需要连接到DNS服务器上,随机返回一个IP地址即可。由于存在DNS缓存,所以DNS服务器本身不会成为一个瓶颈。
这种基于Pull的方式不能及时获取服务的状态的更新(例如:服务的IP更新等)。如果服务的提供者出现故障,由于DNS缓存的存在,服务的调用方会仍然将请求转发给出现故障的服务提供方;反之亦然。
备选方案二:DubboDubbo是阿里巴巴推出的分布式服务框架,致力于解决服务的注册与发现,编排,治理。它的优点如下:[list=1][]功能全面,易于扩展[/][]支持各种序列化协议(JSON,Hession,java序列化等)[/][]支持各种RPC协议(HTTP,Java RMI,Dubbo自身的RPC协议等)[/][]支持多种负载均衡算法[/][]其他高级特性:服务编排,服务治理,服务监控等[/]缺点如下:[list=1][]只支持Java,对于Python没有相应的支持[/][]虽然已经开源,但是没有成熟的社区来运营和维护,未来升级可能是个麻烦[/][]重量级的解决方案带来新的复杂性[/]备选方案三:ZookeeperZookeeper是什么?按照Apache官网的描述是:
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.
参照官网的定义,它能够做:[list=1][]作为配置信息的存储的中心服务器[/][]命名服务[/][]分布式的协调[/][]Mater选举等[/]在定义中特别提到了命名服务。在调研之后,Zookeeper作为服务注册与发现的解决方案;它有如下优点:[list=1][]它提供的简单API[/][]已有互联网公司(例如:Pinterest,Airbnb)使用它来进行服务注册与发现[/][]支持多语言的客户端[/][]通过Watcher机制实现Push模型,服务注册信息的变更能够及时通知服务消费方[/]缺点是:[list=1][]引入新的Zookeeper组件,带来新的复杂性和运维问题[/][]需自己通过它提供的API来实现服务注册与发现逻辑(包含Python与Java版本)[/]针对对上述几个方案的优缺点权衡之后,决定采用了基于Zookeeper实现自己的服务注册与发现。

基于Zookeeper的服务注册与发现架构

zkregsine.png
在此架构中有三类角色:服务提供者,服务注册中心,服务消费者。服务提供者服务提供者作为服务的提供方将自身的服务信息注册到服务注册中心中。服务信息包含:
    []隶属于哪个系统[/][]服务的IP,端口[/][]服务的请求URL[/][]服务的权重等等[/]
服务注册中心服务注册中心主要提供所有服务注册信息的中心存储,同时负责将服务注册信息的更新通知实时的Push给服务消费者(主要是通过Zookeeper的Watcher机制来实现的)。服务消费者服务消费者主要职责如下:[list=1][]服务消费者在启动时从服务注册中心获取需要的服务注册信息[/][]将服务注册信息缓存在本地[/][]监听服务注册信息的变更,如接收到服务注册中心的服务变更通知,则在本地缓存中更新服务的注册信息[/][]根据本地缓存中的服务注册信息构建服务调用请求,并根据负载均衡策略(随机负载均衡,Round-Robin负载均衡等)来转发请求[/][]对服务提供方的存活进行检测,如果出现服务不可用的服务提供方,将从本地缓存中剔除[/]
服务消费者只在自己初始化以及服务变更时会依赖服务注册中心,在此阶段的单点故障通过Zookeeper集群来进行保障。在整个服务调用过程中,服务消费者不依赖于任何第三方服务。

实现机制介绍​

Zookeeper数据模型介绍在整个服务注册与发现的设计中,最重要是如何来存储服务的注册信息。在设计基于Zookeeper的服务注册结构之前,我们先来看一下Zookeeper的数据模型。Zookeeper的数据模型如下图所示:
zluo.png
Zookeeper数据模型结构与Unix文件系统很类似,是一个树状层次结构。每个节点叫做Znode,节点可以拥有子节点,同时允许将少量数据存储在该节点下。客户端可以通过监听节点的数据变更和子节点变更来实时获取Znode的变更(Wather机制)。
服务注册结构
dd.png
服务注册结构如上图所示。
    []/dada来标示公司名称dada,同时能方便与其它应用的目录区分开(例如:Kafka的brokers注册信息放置在/brokers下)[/][]/dada/services将所有服务提供者都放置该目录下[/][]/dada/services/category1目录定义具体的服务提供者的id:category1,同时该Znode节点中允许存放该服务提供者的一些元数据信息,例如:名称,服务提供者的Owner,上下文路径(Java Web项目),健康检查路径等。该信息可以根据实际需要进行自由扩展。[/][]/dada/services/category1/helloworld节点定义了服务提供者category1下的一个服务:helloworld。其中helloworld为该服务的ID,同时允许将该服务的元数据信息存储在该Znode下,例如图中标示的:服务名称,服务描述,服务路径,服务的调用的schema,服务的调用的HTTP METHOD等。该信息可以根据实际需要进行自由扩展。[/][]/dada/services/category1/helloworld/providers节点定义了服务提供者的父节点。在这里其实可以将服务提供者的IP和端口直接放置在helloworld节点下,在这里单独放一个节点,是为了将来可以将服务消费者的消息挂载在helloworld节点下,进行一些扩展,例如命名为:/dada/services/category1/helloworld/consumers。[/][]/dada/services/category__1/helloworld/providers/192.168.1.1:8080该节点定义了服务提供者的IP和端口,同时在节点中定义了该服务提供者的权重。[/]

实现机制

由于目前服务注册通过我们的服务注册中心UI来进行注册,这部分逻辑比较简单,即通过UI界面来构造上述定义的服务注册结构。下面着重介绍一下我们的服务发现是如何工作的:
lj.png
在上述类图中,类ServiceDiscovery主要通过Zookeeper的API(Python/Java版本)来获取服务信息,同时对服务注册结构中的每个服务的providers节点增加Watcher,来监控节点变化。获取的服务注册信息保存在变量service_repos中。通过在初始化时设置LoadBalanceStrategy的实现(Round-Robin算法,Radmon算法)来实现服务提供者的负载均衡。主要方法:[list=1][]init获取Zookeeper的服务注册信息,并缓存在service_repos[/][]get_service_repos方法获取实例变量service_repos[/][]get_service_endpoint根据init构建好的service_repos,以及lb_strategy提供的负载均衡策略返回某个服务的URL地址[/][]update_service_repos通过Zookeeper的Watcher机制来实时更新本地缓存service_repos[/][]heartbeat_monitor是一个心跳检测线程,用来进行服务提供者的健康存活检测,如果出现问题,将该服务提供者从该服务的提供者列表中移除;反之,则加入到服务的提供者列表中[/]
LoadBalanceStrategy定义了根据服务提供者的信息返回对应的服务Host和IP,即决定由那台主机+端口来提供服务。
RoundRobinStrategy和RandomStrategy分别实现了Round-Robin和随机的负载均衡算法


未来展望


目前达达基于Zookeeper的服务注册与发现的架构还处于初期,很多功能还未完善,例如:服务的路由功能,与部署平台的集成,服务的监控等等。
当然基于Zookeeper还能做其它许多事情,例如:实时动态配置系统。目前,我们已经基于Zookeeper实现了实时动态配置系统。


分享原文地址:https://tech.imdada.cn/2015/12/03/service-registry-and-discovery-with-zk/
作者:杨 骏  达达CTO


Elasticsearch数据迁移与备份

空心菜 发表了文章 5 个评论 6567 次浏览 2016-03-08 23:43 来自相关话题

虽然ES提供了replicas shards的机制来保证数据的完整性不会因为几个节点的奔溃而被破坏,但是定期的数据备份以备不时之需依然重要。此外,通过备份与恢复也可实现数据在不同集群间的迁移(直接复制data目录下的索引文件的做法我尝试过,但没有成功)。 ...查看全部
虽然ES提供了replicas shards的机制来保证数据的完整性不会因为几个节点的奔溃而被破坏,但是定期的数据备份以备不时之需依然重要。此外,通过备份与恢复也可实现数据在不同集群间的迁移(直接复制data目录下的索引文件的做法我尝试过,但没有成功)。
 
备份的方式在官方文档里有清楚的交代:先创建仓库(repository),再往仓库里添加一个快照(snapshot),查看备份状态,搞定。虽然官方文档很轻描淡写,但我在第一步就卡住了,创建仓库时需要一个共享文件系统(每个ES节点都需要能访问),我只是想把数据从线上集群迁移到线下进行更全面的测试,为了这么点事去找系统部走流程等待共享服务器是多么头疼啊……
 
一阵Google之后,决定使用sshfs在ES集群中每个节点的相同位置挂载一个共享目录,以下是操作命令:
// 在每个节点上安装sshfs
yum install fuse sshfs

// 选定一个节点的一个目录作为共享目录(不要放在系统盘所在目录)
mkdir /data0/es_backup

// 在每个节点的相同位置创建目录,并挂载共享目录
mkdir /mnt/backup
sshfs root@192.168.x.x:/data0/es_backup /mnt/backup -o allow_other

// 测试运行ES的用户是否有对共享目录的写权限
sudo -u elasticsearch touch /mnt/backup/test
这里最大的坑是写权限问题,我试过在创建/mnt/backup时把owner改成elasticsearch或者在挂载的时候用-o uid= gid= 这样参数更改目录的owner,然并卵……折腾了一下午。最后总算在Stackoverflow找到了这个参数-o allow_other,但其实这样做比较粗鲁,机器上的任何用户都可以访问这个目录了,有更优雅实现方式的同学请赐教。
 
解决了共享目录的问题之后,就可以像官方文档一样轻描淡写啦:
// 在_plugin/marvel/sense里

// 创建仓库
PUT _snapshot/my_backup
{
"type": "fs",
"settings": {
"location": "/mnt/backup",
"compress": true
}
}

// 针对具体的index创建快照备份
PUT _snapshot/my_backup/snapshot_test
{
"indices": "index_1, index_2"
}

// 查看备份状态
GET _snapshot/my_backup/snapshot_test/_status
现在可以开始进行迁移了:
// 备份创建好之后,在共享目录/root/backup里是这样的:
-rw-r--r-- 1 root root 31 12月 15 22:14 index
drwxr-xr-x 3 root root 4096 12月 15 22:14 indices
-rw-r--r-- 1 root root 83 12月 15 22:14 metadata-snapshot_test
-rw-r--r-- 1 root root 181 12月 15 22:14 snapshot-snapshot_test

// 在迁移目标的集群上重复上面创建仓库的操作

// 将源集群的备份内容(/root/backup里的所有文件),复制到迁移目标的集群仓库目录里

// 在sense中使用RESTful API进行备份的恢复
POST _snapshot/my_backup/snapshot_test/_restore

// 查看恢复的状态
GET _snapshot/my_backup/snapshot_test/_status
以上就是参照官方文档实施的ES数据备份与迁移,希望对大家有帮助,欢迎留言与交流。


分享阅读原文:http://logos.name/archives/515


使用./bin/graceful_stop.sh had1停止一个hbase regionserver失败

空心菜 回复了问题 3 人关注 1 个回复 6945 次浏览 2016-03-02 21:40 来自相关话题

Zookeeper基本概念详解

空心菜 发表了文章 0 个评论 5091 次浏览 2016-03-02 01:31 来自相关话题

根据如上思维导图,我们来展开对Zookeeper的基本的一些概念解释。 一、集群角色 LeaderLeader服务器是整个Zookeeper集群工作机制中的核心 FollowerFollower服务器是Zoo ...查看全部
zk_logic.png

根据如上思维导图,我们来展开对Zookeeper的基本的一些概念解释。


一、集群角色


Leader
Leader服务器是整个Zookeeper集群工作机制中的核心 
Follower
Follower服务器是Zookeeper集群状态的跟随者
Observer
Observer服务器充当一个观察者的角色
Leader,Follower 设计模式;Observer 观察者设计模式


二、会话


会话是指客户端和ZooKeeper服务器的连接,ZooKeeper中的会话叫Session,客户端靠与服务器建立一个TCP的长连接;
来维持一个Session,客户端在启动的时候首先会与服务器建立一个TCP连接,通过这个连接,客户端能够通过心跳检测与服务器保持有效的会话,也能向ZK服务器发送请求并获得响应。


三、数据节点


Zookeeper中的节点有两类:
    []集群中的一台机器称为一个节点[/][]数据模型中的数据单元Znode,分为持久节点和临时节点[/]

 
Zookeeper的数据模型是一棵树,树的节点就是Znode,Znode中可以保存信息。
如下图所示:
tree.png
ZK大致数据结构跟上图是一致的,如上图所示这个图就像一棵树,这个树有个根节点,然后其下有些子节点,然后每个子节点其下又可以有子节点,大多数的开发就是跟zk的这些数据节点打交道,来读写这些数据节点,来完成任务。


四、版本


ZK中的版本,是用来记录节点数据或者是节点的子节点列表或者是权限信息的修改次数,注意是这里是修改次数。如果一个节点的version是1,那就代表说这个节点从创建以来被修改了一次,那么这个版本怎么用呢,典型的我们可以利用版本来实现分布式的锁服务。我们知道在数据库中,一般有两种锁,一种是悲观锁一种是乐观锁。


悲观锁
悲观锁又叫悲观并发锁,是数据库中一种非常严格的锁策略,具有强烈的排他性,能够避免不同事务对同一数据并发更新造成的数据不一致性,在上一个事务没有完成之前,下一个事务不能访问相同的资源,适合数据更新竞争非常激烈的场景;
乐观锁
相比悲观锁,乐观锁使用的场景会更多,悲观锁认为事务访问相同数据的时候一定会出现相互的干扰,所以简单粗暴的使用排他访问的方式,而乐观锁认为不同事务访问相同资源是很少出现相互干扰的情况,因此在事务处理期间不需要进行并发控制,当然乐观锁也是锁,它还是会有并发的控制!对于数据库我们通常的做法是在每个表中增加一个version版本字段,事务修改数据之前先读出数据,当然版号也顺势读取出来,然后把这个读取出来的版本号加入到更新语句的条件中,比如,读取出来的版本号是1,我们修改数据的语句可以这样写,update 某某表 set 字段一=某某值 where id=1 and version=1,那如果更新失败了说明以后其他事务已经修改过数据了,那系统需要抛出异常给客户端,让客户端自行处理,客户端可以选择重试。
锁,ZK中版本有类式的作用。
ZK的版本类型有三种:version cversion aversion
table.png


五、Watcher


Watcher我们可以理解为他是一个事件监听器
ZooKeeper允许用户在指定节点上注册一些Watcher,当数据节点发生变化的时候,ZooKeeper服务器会把这个变化的通知发送给感兴趣的客户端。
watcher.png
两个客户端都在zookeeper集群中注册了watcher(事件监听器),那么当zk中的节点数据发生变化的时候,zk会把这一变化的通知发送给客户端,当客户端收到这个变化通知的时候,它可以再回到zk中,去取得这个数据的详细信息。


六、ACL权限控制


 
ACL是Access Control Lists 的简写, ZooKeeper采用ACL策略来进行权限控制,有以下权限:
    []CREATE: 创建子节点的权限[/][]READ: 获取节点数据和子节点列表的权限[/][]WRITE: 更新节点数据的权限[/][]DELETE: 删除子节点的权限[/][]ADMIN: 设置节点ACL的权限[/]

上面的权限有点类似我们信息系统的权限管理,我们在开发系统的时候一般也会对数据做这些权限管理,一个zk集群可能会服务很多的业务,尤其是一些大公司,zk集群的节点中会保存重要的信息,那么这些信息通常只能对一部分的访问者开放,通过acl我们可以对某些节点的访问进行授权,从而来保证数据的安全。

Zookeeper介绍

空心菜 发表了文章 0 个评论 5081 次浏览 2016-02-28 18:32 来自相关话题

根据如上思维导图,我来展开对Zookeeper的介绍 一、Zookeeper背景 随着互联网技术的高速发展,企业对计算机系统的计算、存储能力要求越来越高,最简单的证明就是出现了一些诸如:高并发,海量存储这样的 ...查看全部
zookeeper.png

根据如上思维导图,我来展开对Zookeeper的介绍


一、Zookeeper背景


随着互联网技术的高速发展,企业对计算机系统的计算、存储能力要求越来越高,最简单的证明就是出现了一些诸如:高并发,海量存储这样的词汇。在这样的背景下,单纯依靠少量高性能主机来完成计算任务也就不能满足现有大部分企业的需求了,企业的IT架构逐步从集中式向分布式过度,所谓的分布式是指:把一个计算任务分解成若干个计算单元,并且分配到若干不同的计算机中去执行,然后汇总计算结果的过程。
这好比公司里面的某个团队,接到公司派发的任务,首先团队的主管,要把任务进行拆分,然后安排下去,划分给团队中不同的人去完成,并随时跟进任务的进展。如果团队主管离职了,那我们可能就会在团队中挑选一个对业务比较熟悉的人来接管主管位置。最后各个组员把任务完成,主管进行汇总,并上报给公司。在团队内部需要制定多个工作流程,来保证工作的有序开展。在分布式系统中同样需要设置这么一个协作规范。zookeeper可以很好的帮助我们来实现这个目的。


二、Zookeeper是什么?


ZooKeeper是一个开放源码的分布式协调服务,由知名互联网公司雅虎创建,是基于Google Chubby开源实现。(Google chubby是google公司开源的一个锁服务。)
ZooKeeper是一个高性能的分布式数据一致性解决方案,它将那些复杂的、容易出错的分布式一致性服务封装起来了,构成了一个高效可靠的源语集,并提供一系列简单易用的接口给用户。
ZooKeeper致力于提供一个高性能、高可用、且具有严格的顺序访问控制能力的分布式协调服务。分布式应用可以基于它实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知,集群管理、Master 选举、分布式锁和分布式队列等功能。
Zookeeper知识点:
1、源代码开放
开源意味着我们可以免费的获取和使用zk,并且可以深入研究zk的源代码,甚至可以根据自己业务特性和要求进行二次开发修改。
2、是分布式协调服务,它解决分布式数据一致性问题
A、顺序一致性
所谓的顺序一致性是指从一个客户端发起一个请求,最终会严格按照发起的顺序应用到zk中。
B、原子性
原子性是指所有事物请求的处理结果在整个集群的所有机器上的应用情况是一致的。
C、单一视图
单一视图是指无任客户端连接到哪个zk的服务器,它看到的服务端数据都是一致的。
D、可靠性
可靠性是指一旦服务端成功的应用了一个事物并完成了对客户端的响应,那么这个事物所引起的服务端状态的变更会一直保留下来,除非有另外一个事物又对它进行了修改。
E、实时性
实时性是指zk保证在一段时间内客户端一定能从服务端读取最新的数据状态。
3、高性能
zk具有很高的吞吐量,一个三台服务器的集群可以达到12w-13w的QPS。
4、我们可以通过调用zk提供的接口解决一些分布式易用中的实际问题。


三、Zookeeper的典型应用场景


Zookeeper包括但不限于如下应用场景
 
3.1、数据发布/订阅
顾名思义就是一方把数据发布出来,另一方通过某种方式可以得到这些数据;
通常数据订阅有两种方式:推送模式和拉取模式
推送模式一般是服务器主动向客户端推送信息, 拉取模式是客户端主动去服务器获取数据(通常是采用定时轮询的方式),ZK采用两种方式相结合;
发布者将数据发布到ZK集群节点上,订阅者通过一定的方法告诉服务器,我对哪个节点的数据感兴趣,那服务器在这些节点的数据发生变化时,就通知客户端,客户端得到通知后可以去服务器获取数据信息。
3.2、负载均衡
zk_arch.png

实现过程:
1、首先DB在启动的时候先把自己在ZK上注册成一个临时节点,ZK的节点后面我们会讲到有两种,一种是永久节点,一类是临时节点临时节点在服务器出现问题的时候,节点会自动的从ZK上删除,那么这样ZK上的服务器列表就是最新的可用的列表。
2、客户端在需要读写数据库的时候首先它去ZooKeeper得到所有可用的DB的连接信息(一张列表),得到可用的数据列表。
3、客户端随机的算法,随机选择一个与之建立连接,每次会跟不同的数据库连接,就达到简单的复杂均衡。
4、当客户端发现连接不可用的时候可再次从ZK上获取可用的DB连接信息,当然也可以在刚获取的那个列表里移除掉不可用的连接后再随机选择一个DB与之连接。
3.3、命名服务
顾名思义,就是提供名称的服务,例如数据库表格ID,一般用得比较多的有两种ID,一种是自动增长的ID,一种是UUID(9291d71a-0354-4d8e-acd8-64f7393c64ae),两种ID各自都有缺陷,自动增长的ID局限在单库单表中使用,不能在分布式中使用,UUID可以在分布式中使用但是由于ID没有规律难于理解,我们可以借用ZK来生成一个顺序增长的,可以在集群环境下使用的,命名易于理解的ID。
3.4、分布式协调/通知
心跳检测,在分布式系统中,我们常常需要知道某个机器是否可用,传统的开发中,可以通过Ping某个主机来实现,Ping得通说明对方是可用的,相反是不可用的;
ZK中我们让所有的机其都注册一个临时节点,我们判断一个机器是否可用,我们只需要判断这个节点在ZK中是否存在就可以了,不需要直接去连接需要检查的机器 ,降低系统的复杂度。


四、Zookeeper的优势


    []源代码开放[/][]已经被证实是高性能,易用稳定的工业级产品。[/][]有着广泛的应用:Hadoop,HBase,Storm,Solr。[/]

转载请注明来自开源技术社区http://openskill.cn/article/281

hbase RegionServer节点启动失败

空心菜 回复了问题 2 人关注 2 个回复 7109 次浏览 2016-02-27 16:21 来自相关话题