Flume
Flume(NG)架构设计要点及配置实践
大数据 Ansible 发表了文章 0 个评论 2605 次浏览 2016-07-14 21:34
架构设计要点
Flume的架构主要有一下几个核心概念:
- []Event:一个数据单元,带有一个可选的消息头[/][]Flow:Event从源点到达目的点的迁移的抽象[/][]Client:操作位于源点处的Event,将其发送到Flume Agent[/][]Agent:一个独立的Flume进程,包含组件Source、Channel、Sink[/][]Source:用来消费传递到该组件的Event[/][]Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event[/][]Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)[/]
# list the sources, sinks and channels for the agent尖括号里面的,我们可以根据实际需求或业务来修改名称。下面详细说明:表示配置一个Agent的名称,一个Agent肯定有一个名称。与是Agent的Source组件的名称,消费传递过来的Event。与是Agent的Channel组件的名称。与是Agent的Sink组件的名称,从Channel中消费(移除)Event。上面配置内容中,第一组中配置Source、Sink、Channel,它们的值可以有1个或者多个;第二组中配置Source将把数据存储(Put)到哪一个Channel中,可以存储到1个或多个Channel中,同一个Source将数据存储到多个Channel中,实际上是Replication;第三组中配置Sink从哪一个Channel中取(Task)数据,一个Sink只能从一个Channel中取数据。下面,根据官网文档,我们展示几种Flow Pipeline,各自适应于什么样的应用场景:.sources = .sinks = .channels = # set channel for source .sources. .channels = ... .sources. .channels = ...# set channel for sink .sinks. .channel = .sinks. .channel =
- []多个Agent顺序连接[/]
- []多个Agent的数据汇聚到同一个Agent[/]
- []多路(Multiplexing)Agent[/]
# List the sources, sinks and channels for the agent上面指定了selector的type的值为replication,其他的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,然后数据被传递到Sink1和Sink2。Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel,配置格式,如下所示:.sources = .sinks = .channels = # set list of channels for source (separated by space) .sources. .channels = # set channel for sinks .sinks. .channel = .sinks. .channel = .sources. .selector.type = replicating
# Mapping for multiplexing selector上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的值,即header的值:如果header的值为Value1、Value2,数据从Source1路由到Channel1;如果header的值为Value2、Value3,数据从Source1路由到Channel2。.sources. .selector.type = multiplexing .sources. .selector.header = .sources. .selector.mapping. = .sources. .selector.mapping. = .sources. .selector.mapping. = #... .sources. .selector.default =
- []实现load balance功能[/]
a1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2 k3a1.sinkgroups.g1.processor.type = load_balancea1.sinkgroups.g1.processor.backoff = truea1.sinkgroups.g1.processor.selector = round_robina1.sinkgroups.g1.processor.selector.maxTimeOut=10000
- []实现failover功能[/]
a1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2 k3a1.sinkgroups.g1.processor.type = failovera1.sinkgroups.g1.processor.priority.k1 = 5a1.sinkgroups.g1.processor.priority.k2 = 7a1.sinkgroups.g1.processor.priority.k3 = 6a1.sinkgroups.g1.processor.maxpenalty = 20000
我们看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),了解它的功能集合,能够让我们在应用中更好地选择使用哪一种方案。说明Flume NG的功能,实际还是围绕着Agent的三个组件Source、Channel、Sink来看它能够支持哪些技术或协议。我们不再对各个组件支持的协议详细配置进行说明,通过列表的方式分别对三个组件进行概要说明:基本功能
- []Flume Source[/]
- []Flume Channel[/]
- []Flume Sink[/]
安装Flume NG非常简单,我们使用最新的1.5.0.1版本,执行如下命令:应用实践
cd /usr/localwget http://mirror.bit.edu.cn/apache/flume/1.5.0.1/apache-flume-1.5.0.1-bin.tar.gztar xvzf apache-flume-1.5.0.1-bin.tar.gzcd apache-flume-1.5.0.1-bin如果需要使用到Hadoop集群,保证Hadoop相关的环境变量都已经正确配置,并且Hadoop集群可用。下面,通过一些实际的配置实例,来了解Flume的使用。为了简单期间,channel我们使用Memory类型的channel。
- []Avro Source+Memory Channel+Logger Sink[/]
# Define a memory channel called ch1 on agent1agent1.channels.ch1.type = memory# Define an Avro source called avro-source1 on agent1 and tell it# to bind to 0.0.0.0:41414. Connect it to channel ch1.agent1.sources.avro-source1.channels = ch1agent1.sources.avro-source1.type = avroagent1.sources.avro-source1.bind = 0.0.0.0agent1.sources.avro-source1.port = 41414# Define a logger sink that simply logs all events it receives# and connect it to the other end of the same channel.agent1.sinks.log-sink1.channel = ch1agent1.sinks.log-sink1.type = logger# Finally, now that we've defined all of our components, tell# agent1 which ones we want to activate.agent1.channels = ch1agent1.channels.ch1.capacity = 1000agent1.sources = avro-source1agent1.sinks = log-sink1首先,启动Agent进程:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1然后,启动Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console
- []Avro Source+Memory Channel+HDFS Sink[/]
# Define a source, channel, sinkagent1.sources = avro-source1agent1.channels = ch1agent1.sinks = hdfs-sink# Configure channelagent1.channels.ch1.type = memoryagent1.channels.ch1.capacity = 1000000agent1.channels.ch1.transactionCapacity = 500000# Define an Avro source called avro-source1 on agent1 and tell it# to bind to 0.0.0.0:41414. Connect it to channel ch1.agent1.sources.avro-source1.channels = ch1agent1.sources.avro-source1.type = avroagent1.sources.avro-source1.bind = 0.0.0.0agent1.sources.avro-source1.port = 41414# Define a logger sink that simply logs all events it receives# and connect it to the other end of the same channel.agent1.sinks.hdfs-sink1.channel = ch1agent1.sinks.hdfs-sink1.type = hdfsagent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_fileagent1.sinks.hdfs-sink1.hdfs.fileSuffix = .logagent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576agent1.sinks.hdfs-sink1.rollInterval = 0agent1.sinks.hdfs-sink1.hdfs.rollCount = 0agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500agent1.sinks.hdfs-sink1.hdfs.round = trueagent1.sinks.hdfs-sink1.hdfs.roundUnit = minuteagent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = trueagent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1agent1.sinks.hdfs-sink1.fileType = SequenceFileagent1.sinks.hdfs-sink1.writeFormat = TEXT首先,启动Agent:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-hdfs.properties -Dflume.root.logger=INFO,console -n agent1然后,启动Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console可以查看同步到HDFS上的数据:
hdfs dfs -ls /data/flume结果示例,如下所示:
-rw-r--r-- 3 shirdrn supergroup 1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log-rw-r--r-- 3 shirdrn supergroup 1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log-rw-r--r-- 3 shirdrn supergroup 259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log
- []Spooling Directory Source+Memory Channel+HDFS Sink[/]
# Define source, channel, sinkagent1.sources = spool-source1agent1.channels = ch1agent1.sinks = hdfs-sink1# Configure channelagent1.channels.ch1.type = memoryagent1.channels.ch1.capacity = 1000000agent1.channels.ch1.transactionCapacity = 500000# Define and configure an Spool directory sourceagent1.sources.spool-source1.channels = ch1agent1.sources.spool-source1.type = spooldiragent1.sources.spool-source1.spoolDir = /home/shirdrn/data/agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)?agent1.sources.spool-source1.batchSize = 50agent1.sources.spool-source1.inputCharset = UTF-8# Define and configure a hdfs sinkagent1.sinks.hdfs-sink1.channel = ch1agent1.sinks.hdfs-sink1.type = hdfsagent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%Sagent1.sinks.hdfs-sink1.hdfs.fileSuffix = .logagent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576agent1.sinks.hdfs-sink1.hdfs.rollCount = 0agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500agent1.sinks.hdfs-sink1.hdfs.round = trueagent1.sinks.hdfs-sink1.hdfs.roundUnit = minuteagent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = trueagent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1agent1.sinks.hdfs-sink1.fileType = SequenceFileagent1.sinks.hdfs-sink1.writeFormat = TEXTagent1.sinks.hdfs-sink1.rollInterval = 0启动Agent进程,执行如下命令:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1可以查看HDFS上同步过来的数据:
hdfs dfs -ls /data/flume结果示例,如下所示:
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log-rw-r--r-- 3 shirdrn supergroup 1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log
- []Exec Source+Memory Channel+File Roll Sink[/]
配置Agent,修改配置文件flume-conf-file.properties,内容如下:
# Define source, channel, sink启动Agent进程,执行如下命令:
agent1.sources = tail-source1
agent1.channels = ch1
agent1.sinks = file-sink1
# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000
# Define and configure an Exec source
agent1.sources.tail-source1.channels = ch1
agent1.sources.tail-source1.type = exec
agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log
agent1.sources.tail-source1.shell = /bin/sh -c
agent1.sources.tail-source1.batchSize = 50
# Define and configure a File roll sink
# and connect it to the other end of the same channel.
agent1.sinks.file-sink1.channel = ch1
agent1.sinks.file-sink1.type = file_roll
agent1.sinks.file-sink1.batchSize = 100
agent1.sinks.file-sink1.serializer = TEXT
agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-file.properties -Dflume.root.logger=INFO,console -n agent1可以查看File Roll Sink对应的本地文件系统目录/home/shirdrn/sink_data下,示例如下所示:
-rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1有关Flume NG更多配置及其说明,请参考官方用户手册,非常详细。
-rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2
-rw-rw-r-- 1 shirdrn shirdrn 0 Sep 17 11:37 1410924990039-3
-rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4
-rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5
参考链接:
http://flume.apache.org/FlumeUserGuide.html
https://blogs.apache.org/flume/entry/flume_ng_architecture
分享阅读原文:http://shiyanjun.cn/archives/915.html
作者:时延军
Flume日志收集分层架构应用实践
大数据 Ansible 发表了文章 0 个评论 7191 次浏览 2016-03-19 18:54
- []模块化设计:在其Flume Agent内部可以定义三种组件:Source、Channel、Sink[/][]组合式设计:可以在Flume Agent中根据业务需要组合Source、Channel、Sink三种组件,构建相对复杂的日志流管道[/][]插件式设计:可以通过配置文件来编排收集日志管道的流程,减少对Flume代码的侵入性[/][]可扩展性:我们可以根据自己业务的需要来定制实现某些组件(Source、Channel、Sink)[/][]支持集成各种主流系统和框架:像Hadoop、HBase、Hive、Kafka、ElasticSearch、Thrift、Avro等,都能够很好的和Flume集成[/][]高级特性:Failover、Load balancing、Interceptor等[/]
基于Flume设计实现分层日志收集系统,到底有什么好处呢?我们可以先看一下,如果不分层,会带来哪些问题:为什么要对Flume日志收集系统进行分层设计
- []如果需要通过Kafka去缓冲上游基于Flume收集而构建的日志流,对于数据平台内部服务器产生的数据还好,但是如果日志数据是跨业务组,甚至是跨部门,那么就需要将Kafka相关信息暴露给外部,这样对Kafka的访问便不是数据平台内部可控的[/][]如果是外部日志进入平台内部HDFS,这样如果需要对Hadoop系统进行升级或例行维护,这种直连的方式会影响到上游部署Flume的日志流的始端日志收集服务[/][]如果数据平台内部某些系统,如Kafka集群、HDFS集群所在节点的机房位置变更,数据迁移,会使得依赖日志数据的外部系统受到不同程度的影响,外部系统需要相关开发或运维人员参与进来[/][]由于收集日志的数据源端可能是外部一些服务器(多个单个的节点),一些业务集群(相互协作的多节点组),也可能是内部一些提供收集服务的服务节点,这些所有的服务器上部署的Flume Agent都处于一层中,比较难于分组管理[/][]由于所有数据源端Flume Agent收集的日志进入数据平台的时候,没有一个统一的类似总线的组件,很难因为某些业务扩展而独立地去升级数据平台内部的接收层服务节点,可能为了升级数据平台内部某个系统或服务而导致影响了其他的接收层服务节点[/]
上图中,无论是外部还是内部,只要部署了Flume Agent的节点,都直接同内部的Kafka集群和Hadoop集群相连,所以在数据平台内部只能尽量保持Kafka和Hadoop集群正常稳定运行,也要为外部日志收集Flume Agent的数据流量的陡增和异常变化做好防控准备。再者,如需停机维护或者升级某一个集群,可能都需要通知外部所有Flume Agent所在节点的业务方,做好应对(停机)准备。接着看,如果我们基于Flume使用分层的方式来设计日志收集系统,又有哪些优势,如下图所示: 上图中,Flume日志收集系统采用两层架构设计:第一层(L1)是日志收集层,第二层(L2)是数据平台缓冲层(汇聚层)。通过这种方式,使得日志收集系统有如下特点:
- []针对数据平台外部的业务系统,根据需要分析的数据业务类型进行分组,属于同一种类型的业务日志,在数据平台前端增加了一个Flume汇聚层节点组,该组节点只影响到它对应的L1层的业务数据[/][]如果Hadoop集群、Kafka需要停机维护或升级,对外部L1层Flume Agent没有影响,只需要在L2层做好数据的接收与缓冲即可,待维护或升级结束,继续将L2层缓存的数据导入到数据存储系统[/][]如果外部某个类型的业务日志数据节点需要扩容,直接在L1层将数据流指向数据平台内部与之相对应的L2层Flume Agent节点组即可,能够对外部因业务变化发生的新增日志收集需求,进行快速地响应和部署[/][]对于数据平台内部,因为收集日志的节点非常可控,可以直接通过L1层Flume Agent使日志数据流入HDFS或Kafka,当然为了架构统一和管理,最好也是通过L2层Flume Agent节点组来汇聚/缓冲L1层Flume Agent收集的日志数据[/]
通过上面分析可见,分层无非是为了使的日志数据源节点的Flume Agent服务与数据平台的存储系统(Kafka/HDFS)进行解耦,同时能够更好地对同类型业务多节点的日志流进行一个聚合操作,并分离开独立管理。另外,可以根据实际业务需要,适当增加Flume系统分层,满足日志流数据的汇聚需要。
我们看一下,Flume日志收集系统,在我们这个示例应用中处于一个什么位置,我简单画了一下图,加了一些有关数据处理和分析的节点/组件,如下图所示:应用整体架构
这里,简单了解一下上图即可,由于日志收集在整个应用系统中是很重要的一个环节,所以必须保证日志收集系统设计的可靠、可用、灵活、稳定,通过上面在日志收集系统收集日志之后,数据平台所做的大量分析处理,来凸显日志收集系统的重要性,这里其他内容不做过多说明。
这里,我们主要以实时收集日志为例,说明如何构建一个相对复杂的Flume分层日志收集系统。首先,简要说明一下日志收集需求:Flume分层架构实践
- []手机客户端上报的用户行为事件(App User Event),通过数据平台内部定义好的接口格式,从Nginx日志里面实时流入数据平台,这对应于Flume日志收集系统L1层[/][]通过组织各种活动,来推广某些App的产品特性,会定向向用户推送通知,单独使用推送点击(Push Click)Agent来收集这些点击行为数据[/][]App所依赖的一些基础内容,会以服务的形式开放给外部第三方调用,对于由第三方App带来的用户的行为点击事件(Thirdparty Click),单独使用L1层Flume Agent进行收集[/][]第三方会在App中根据不同的内容,投放广告(Ad),对于广告曝光/点击行为的数据,与上述提到的数据收集单独分离出来,因为该日志数据后期可能会大规模推广,会有爆发性增长,在L1层进行收集[/][]在L2层主要是汇聚或缓冲L1层流入的日志数据[/][]同时,为了防止L2层Flume Agent因为故障或例行停机维护等,所以使用了Flume的Failover特性,亦即L1层每一个Sink同时指向L2层的2个相同的Flume Agent[/][]L1层的Flume Agent在收集日志的过程中应该不允许在Channel中累积过多数据(但是还要防止数据流速过慢导致内存Channel数据溢出),还要能够尽量降低读写磁盘的开销,所以使用内存类型的Channel[/][]L2层为了保证数据能够可靠地缓冲(在允许的一段时间内累积保存数据),如Hadoop或Kafka故障停机或停机维护升级,采用文件类型的Channel,还要尽量调大容量,也不能因为多应用共享磁盘而造成数据处理延迟,所以对于不同的Channel分别使用独立的磁盘[/]
上图是从实际的整个数据平台中拿出来一部分,简单便于解释说明。有关上图中所涉及到的Flume Agent的配置详情,下面会根据Flume分层的结构(L1层、L2层)来详细配置说明。由于L1层的10.10.1.101和10.10.1.102节点上部署的Flume Agent是对称的,所以下面只拿出其中一个来说明配置,不同的是,这两个节点上Flume Agent的Sink使用Failover功能,分别交叉指向L2层Flume Agent,也能够起到一定的负载均衡的作用。
下面,分别针对10.10.1.101节点上的3个Flume Agent的配置内容,分别进行说明如下:上游Flume日志收集层
- []L1层:App用户行为事件(App User Event)日志收集[/]
a1.sources = s1a1.channels = mc1a1.sinks = k1 k2# Configure sourcea1.sources.s1.channels = mc1a1.sources.s1.type = execa1.sources.s1.command = tail -F /data/nginx/logs/app_user_events.log# Configure channela1.channels.mc1.type = memorya1.channels.mc1.transactionCapacity = 50000a1.channels.mc1.capacity = 100000# Configure sinksa1.sinks.k1.channel = mc1a1.sinks.k1.type = avroa1.sinks.k1.hostname = 10.10.1.122a1.sinks.k1.port = 44446a1.sinks.k2.channel = mc1a1.sinks.k2.type = avroa1.sinks.k2.hostname = 10.10.1.121a1.sinks.k2.port = 44446# Configure failovera1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2a1.sinkgroups.g1.processor.type = failovera1.sinkgroups.g1.processor.priority.k1 = 9a1.sinkgroups.g1.processor.priority.k2 = 7a1.sinkgroups.g1.processor.maxpenalty = 10000
- []L1层:推送点击事件(Push Click Event)日志收集[/]
a2.sources = s2a2.channels = mc2a2.sinks = k3 k4# Configure sourcea2.sources.s2.channels = mc2a2.sources.s2.type = execa2.sources.s2.command = tail -F /data/nginx/logs/push_click_events.log# Configure channela2.channels.mc2.type = memorya2.channels.mc2.capacity = 50000a2.channels.mc2.transactionCapacity = 100000# Configure sinksa2.sinks.k3.channel = mc2a2.sinks.k3.type = avroa2.sinks.k3.hostname = 10.10.1.121a2.sinks.k3.port = 44447a2.sinks.k4.channel = mc2a2.sinks.k4.type = avroa2.sinks.k4.hostname = 10.10.1.122a2.sinks.k4.port = 44447# Configure failovera2.sinkgroups = g2a2.sinkgroups.g2.sinks = k3 k4a2.sinkgroups.g2.processor.type = failovera2.sinkgroups.g2.processor.priority.k3 = 9a2.sinkgroups.g2.processor.priority.k4 = 7a2.sinkgroups.g2.processor.maxpenalty = 10000
- []L1层:第三方点击事件(Thirdparty Click Event)日志收集[/]
a3.sources = s3a3.channels = mc3a3.sinks = k5 k6# Configure sourcea3.sources.s3.channels = mc3a3.sources.s3.type = execa3.sources.s3.command = tail -F /data/nginx/logs/thirdparty_click_events.log# Configure channela3.channels.mc3.type = memorya3.channels.mc3.transactionCapacity = 50000a3.channels.mc3.capacity = 100000# Configure sinksa3.sinks.k5.channel = mc3a3.sinks.k5.type = avroa3.sinks.k5.hostname = 10.10.1.121a3.sinks.k5.port = 44446a3.sinks.k6.channel = mc3a3.sinks.k6.type = avroa3.sinks.k6.hostname = 10.10.1.122a3.sinks.k6.port = 44446# Configure failovera3.sinkgroups = g3a3.sinkgroups.g3.sinks = k5 k6a3.sinkgroups.g3.processor.type = failovera3.sinkgroups.g3.processor.priority.k5 = 9a3.sinkgroups.g3.processor.priority.k6 = 7a3.sinkgroups.g3.processor.maxpenalty = 10000
- []L1层:广告点击事件(Ad Click Event)日志收集[/]
a4.sources = s4a4.channels = mc4a4.sinks = k7 k8# Configure sourcea4.sources.s4.channels = mc4a4.sources.s4.type = execa4.sources.s4.command = tail -F /data/nginx/logs/ad.log# Configure channela4.channels.mc4.type = memorya4.channels.mc4.transactionCapacity = 50000a4.channels.mc4.capacity = 100000# Configure sinksa4.sinks.k7.channel = mc4a4.sinks.k7.type = avroa4.sinks.k7.hostname = 10.10.1.121a4.sinks.k7.port = 44448a4.sinks.k8.channel = mc4a4.sinks.k8.type = avroa4.sinks.k8.hostname = 10.10.1.122a4.sinks.k8.port = 44448# Configure failovera4.sinkgroups = g4a4.sinkgroups.g4.sinks = k7 k8a4.sinkgroups.g4.processor.type = failovera4.sinkgroups.g4.processor.priority.k7 = 10a4.sinkgroups.g4.processor.priority.k8 = 8a4.sinkgroups.g4.processor.maxpenalty = 10000
下游Flume日志收集汇聚层
- []L2层:App用户事件+推送点击事件日志合并收集[/]
a1.sources = s1 s2a1.channels = fc1 fc2 fc3a1.sinks = kk1 fk2 kk3# Configure source:# Configure app user event source: s1 -> fc1+fc2a1.sources.s1.channels = fc1 fc2a1.sources.s1.type = avroa1.sources.s1.bind = 10.10.1.121a1.sources.s1.port = 44446a1.sources.s1.threads = 8# Configure source# Configure push click event source: s2 -> fc2+fc3a1.sources.s2.channels = fc2 fc3a1.sources.s2.type = avroa1.sources.s2.bind = 10.10.1.122a1.sources.s2.port = 44447a1.sources.s2.threads = 4# Configure file channel(/data1)# Configure app user event channel: fc1 ->kk1a1.channels.fc1.type = filea1.channels.fc1.checkpointDir = /data1/flume/channels/app_user_event/checkpointa1.channels.fc1.useDualCheckpoints = truea1.channels.fc1.backupCheckpointDir = /data1/flume/channels/app_user_event/backupa1.channels.fc1.dataDirs = /data1/flume/channels/app_user_event/dataa1.channels.fc1.transactionCapacity = 100000a1.channels.fc1.capacity = 500000a1.channels.fc1.checkpointInterval = 60000a1.channels.fc1.keep-alive = 5a1.channels.fc1.maxFileSize = 5368709120# Configure file channel(/data2)# Configure app user event + push click event: fc2 - > fk2a1.channels.fc2.type = filea1.channels.fc2.checkpointDir = /data2/flume/channels/offline_file_event/checkpointa1.channels.fc2.useDualCheckpoints = truea1.channels.fc2.backupCheckpointDir = /data2/flume/channels/offline_file_event/backupa1.channels.fc2.dataDirs = /data2/flume/channels/offline_file_event/dataa1.channels.fc2.transactionCapacity = 100000a1.channels.fc2.capacity = 500000a1.channels.fc2.checkpointInterval = 60000a1.channels.fc2.keep-alive = 5a1.channels.fc2.maxFileSize = 5368709120# Configure file channel(/data3)# Configure push click channel: fc3 ->kk3a1.channels.fc3.type = filea1.channels.fc3.checkpointDir = /data3/flume/channels/push_click_event/checkpointa1.channels.fc3.useDualCheckpoints = truea1.channels.fc3.backupCheckpointDir = /data3/flume/channels/push_click_event/backupa1.channels.fc3.dataDirs = /data3/flume/channels/push_click_event/dataa1.channels.fc3.transactionCapacity = 100000a1.channels.fc3.capacity = 500000a1.channels.fc3.checkpointInterval = 60000a1.channels.fc3.keep-alive = 5a1.channels.fc3.maxFileSize = 5368709120# Configure sink: RealtimeMessageSink(app user event)a1.sinks.kk1.type = org.shirdrn.flume.sink.RealtimeMessageSinka1.sinks.kk1.channel = fc1a1.sinks.kk1.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092a1.sinks.kk1.topic = json_user_eventa1.sinks.kk1.serializer.class = kafka.serializer.StringEncodera1.sinks.kk1.producer.type = asynca1.sinks.kk1.message.send.max.retries = 3a1.sinks.kk1.client.id = flume_app_user_event_2_1a1.sinks.kk1.event.decoder.count = 8a1.sinks.kk1.output.stat.event.batch.size = 2000a1.sinks.kk1.event.decoder.queue.size = 1000# Configure sink: RichRollingFileSinka1.sinks.fk2.type = org.shirdrn.flume.sink.RichRollingFileSinka1.sinks.fk2.channel = fc2a1.sinks.fk2.batchSize = 100a1.sinks.fk2.serializer = TEXTa1.sinks.fk2.sink.rollInterval = 60a1.sinks.fk2.sink.directory = /data/flume/rolling_filesa1.sinks.fk2.sink.file.prefix = eventa1.sinks.fk2.sink.file.suffix = .loga1.sinks.fk2.sink.file.pattern = yyyyMMddHHmmss# Configure sink: RealtimeMessageSink(push click)a1.sinks.kk3.type = org.shirdrn.flume.sink.RealtimeMessageSinka1.sinks.kk3.channel = fc3a1.sinks.kk3.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092a1.sinks.kk3.topic = json_push_click_eventa1.sinks.kk3.serializer.class = kafka.serializer.StringEncodera1.sinks.kk3.producer.type = asynca1.sinks.kk3.message.send.max.retries = 3a1.sinks.kk3.client.id = flume_push_click_2_1a1.sinks.kk3.event.decoder.count = 4a1.sinks.kk3.output.stat.event.batch.size = 2000a1.sinks.kk3.event.decoder.queue.size = 1000
上面,可以看到我们自己实现的org.shirdrn.flume.sink.RealtimeMessageSink,该Sink主要是使Flume收集的日志写入Kafka中,在Flume 1.5.0版本中还没有内置实现,所以我们自己实现了,并在其中加入了适合我们业务的处理逻辑,比如,将Nginx日志记录行解析,然后根据实时计算需要,过滤掉不需要进入Kafka(最终在Storm集群中处理)事件数据,最后转成JSON字符串的格式,写入到Kafka中的Topic里。通过上面的配置也可以看出,可以配置很多参数,例如解析线程数、队列大小等。
由于我们需要将写入本地文件系统的文件按照我们自己的方式来定义,所以基于Flume内置的file_roll实现进行修改,实现了自己的org.shirdrn.flume.sink.RichRollingFileSink,该Sink主要是对文件名字符串进行格式化,能够通过文件名来获取到文件生成的时间(人类可读格式)。
- []L2层:广告点击事件日志收集[/]
a2.sources = s3a2.channels = fc4a2.sinks = kk4# Configure source: s3 -> fc4a2.sources.s3.channels = fc4a2.sources.s3.type = avroa2.sources.s3.bind = 10.10.1.121a2.sources.s3.port = 44448a2.sources.s3.threads = 2# Configure channel(/data4)# Configure Ad channel: fc4 ->kk4a2.channels.fc4.type = filea2.channels.fc4.checkpointDir = /data4/flume/channels/ad/checkpointa2.channels.fc4.useDualCheckpoints = truea2.channels.fc4.backupCheckpointDir = /data4/flume/channels/ad/backupa2.channels.fc4.dataDirs = /data4/flume/channels/ad/dataa2.channels.fc4.transactionCapacity = 100000a2.channels.fc4.capacity = 500000a2.channels.fc4.checkpointInterval = 60000a2.channels.fc4.keep-alive = 5a2.channels.fc1.maxFileSize = 5368709120# Configure sinks: RealtimeAdKafkaSinka2.sinks.kk4.type = org.shirdrn.flume.sink.RealtimeAdKafkaSinka2.sinks.kk4.channel = fc4a2.sinks.kk4.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092a2.sinks.kk4.topic = json_ad_eventa2.sinks.kk4.serializer.class = kafka.serializer.StringEncodera2.sinks.kk4.producer.type = asynca2.sinks.kk4.message.send.max.retries = 3a2.sinks.kk4.client.id = flume_ad_2_1a2.sinks.kk4.event.decoder.count = 4a2.sinks.kk4.output.stat.event.batch.size = 2500a2.sinks.kk4.event.decoder.queue.size = 5000
这里我们简单总结一些内容,如下所示:实践总结
- []Flume监控[/]
bin/flume-ng agent -n a1 -c conf -f conf/config.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
这样便可以在Flume Agent服务节点上,浏览Web端口34545来查看,数据以JSON格式表示,比较重要的一些元数据,如channel容量、当前使用量等等,通过这些数据可以了解当前Flume的工作状态,是否需要升级扩容等等。
另外,也可以通过Ganglia来收集并分析Flume Agent服务运行状态,能够更加详细地展示Flume Agent服务的状态,因为Ganglia配置相对复杂,这里就不做过多解释,感兴趣可以尝试一下。
- []Flume内存调优[/]
JAVA_OPTS="-server -Xms1024m -Xmx4096m -Dcom.sun.management.jmxremote -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=4 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/data/flume/logs/gc-ad.log"这样,可以方便地修改GC策略,一般由于Flume实时收集日志比较注重实时性,希望能够快速地响应,尽量减少GC导致暂停业务线程被挂起的时间,所以可以将GC设置为ParNew+CMS策略。将GC日志输出,在一定程度上能够更加方便地观察Flume Agent服务运行过程中JVM GC的详细情况,通过诊断来优化服务运行。
- []下游L2层接收消息调优[/]
a1.sources.s1.type = avroa1.sources.s1.bind = 10.10.1.121a1.sources.s1.port = 44446a1.sources.s1.threads = 8上面默认情况下threads参数的值1,可以将该值调大,否则的话,L1层就会堆积日志记录,严重可能导致数据丢失。
- []Flume处理业务逻辑约束[/]
Flume的易扩展性使得我们可以根据自己的业务特点来实现一些组件,那么我们在将实际业务逻辑掺杂进Flume中时,需要考虑是否非得必须这么做?如果这么做是否会影响Flume实时传输日志的速度和效率?
Flume作为一个轻量级的日志收集工具,个人认为最好将相对复杂的业务逻辑(尤其是需要与一些存储系统,如MySQL、Redis交互时)后移,放在Storm集群中去处理,或者自己实现的业务处理集群中,而Flume就让它去做其擅长的事情——路由消息。
当然,有些业务场景可能必须在Flume日志收集层去做,如根据原始非结构化的消息,无法控制不同类型的消息路由到不同的目的地,那么可能需要在收集层做一个简单的解析或格式化,实际上这是在
Flume层做了一个简单的日志分发。无论如何,如果想在Flume层插入业务逻辑处理,尽量避免过于复杂的处理而影响整个日志传输速度,如果后端有实时推荐需求,日志中事件的实时性大大延迟,就会影响实施个性化推荐。
分享阅读原文:http://shiyanjun.cn/archives/1497.html
文章作者:时延军
Flume(NG)架构设计要点及配置实践
大数据 Ansible 发表了文章 0 个评论 2605 次浏览 2016-07-14 21:34
架构设计要点
Flume的架构主要有一下几个核心概念:
- []Event:一个数据单元,带有一个可选的消息头[/][]Flow:Event从源点到达目的点的迁移的抽象[/][]Client:操作位于源点处的Event,将其发送到Flume Agent[/][]Agent:一个独立的Flume进程,包含组件Source、Channel、Sink[/][]Source:用来消费传递到该组件的Event[/][]Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event[/][]Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)[/]
# list the sources, sinks and channels for the agent尖括号里面的,我们可以根据实际需求或业务来修改名称。下面详细说明:表示配置一个Agent的名称,一个Agent肯定有一个名称。与是Agent的Source组件的名称,消费传递过来的Event。与是Agent的Channel组件的名称。与是Agent的Sink组件的名称,从Channel中消费(移除)Event。上面配置内容中,第一组中配置Source、Sink、Channel,它们的值可以有1个或者多个;第二组中配置Source将把数据存储(Put)到哪一个Channel中,可以存储到1个或多个Channel中,同一个Source将数据存储到多个Channel中,实际上是Replication;第三组中配置Sink从哪一个Channel中取(Task)数据,一个Sink只能从一个Channel中取数据。下面,根据官网文档,我们展示几种Flow Pipeline,各自适应于什么样的应用场景:.sources = .sinks = .channels = # set channel for source .sources. .channels = ... .sources. .channels = ...# set channel for sink .sinks. .channel = .sinks. .channel =
- []多个Agent顺序连接[/]
- []多个Agent的数据汇聚到同一个Agent[/]
- []多路(Multiplexing)Agent[/]
# List the sources, sinks and channels for the agent上面指定了selector的type的值为replication,其他的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,然后数据被传递到Sink1和Sink2。Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel,配置格式,如下所示:.sources = .sinks = .channels = # set list of channels for source (separated by space) .sources. .channels = # set channel for sinks .sinks. .channel = .sinks. .channel = .sources. .selector.type = replicating
# Mapping for multiplexing selector上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的值,即header的值:如果header的值为Value1、Value2,数据从Source1路由到Channel1;如果header的值为Value2、Value3,数据从Source1路由到Channel2。.sources. .selector.type = multiplexing .sources. .selector.header = .sources. .selector.mapping. = .sources. .selector.mapping. = .sources. .selector.mapping. = #... .sources. .selector.default =
- []实现load balance功能[/]
a1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2 k3a1.sinkgroups.g1.processor.type = load_balancea1.sinkgroups.g1.processor.backoff = truea1.sinkgroups.g1.processor.selector = round_robina1.sinkgroups.g1.processor.selector.maxTimeOut=10000
- []实现failover功能[/]
a1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2 k3a1.sinkgroups.g1.processor.type = failovera1.sinkgroups.g1.processor.priority.k1 = 5a1.sinkgroups.g1.processor.priority.k2 = 7a1.sinkgroups.g1.processor.priority.k3 = 6a1.sinkgroups.g1.processor.maxpenalty = 20000
我们看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),了解它的功能集合,能够让我们在应用中更好地选择使用哪一种方案。说明Flume NG的功能,实际还是围绕着Agent的三个组件Source、Channel、Sink来看它能够支持哪些技术或协议。我们不再对各个组件支持的协议详细配置进行说明,通过列表的方式分别对三个组件进行概要说明:基本功能
- []Flume Source[/]
- []Flume Channel[/]
- []Flume Sink[/]
安装Flume NG非常简单,我们使用最新的1.5.0.1版本,执行如下命令:应用实践
cd /usr/localwget http://mirror.bit.edu.cn/apache/flume/1.5.0.1/apache-flume-1.5.0.1-bin.tar.gztar xvzf apache-flume-1.5.0.1-bin.tar.gzcd apache-flume-1.5.0.1-bin如果需要使用到Hadoop集群,保证Hadoop相关的环境变量都已经正确配置,并且Hadoop集群可用。下面,通过一些实际的配置实例,来了解Flume的使用。为了简单期间,channel我们使用Memory类型的channel。
- []Avro Source+Memory Channel+Logger Sink[/]
# Define a memory channel called ch1 on agent1agent1.channels.ch1.type = memory# Define an Avro source called avro-source1 on agent1 and tell it# to bind to 0.0.0.0:41414. Connect it to channel ch1.agent1.sources.avro-source1.channels = ch1agent1.sources.avro-source1.type = avroagent1.sources.avro-source1.bind = 0.0.0.0agent1.sources.avro-source1.port = 41414# Define a logger sink that simply logs all events it receives# and connect it to the other end of the same channel.agent1.sinks.log-sink1.channel = ch1agent1.sinks.log-sink1.type = logger# Finally, now that we've defined all of our components, tell# agent1 which ones we want to activate.agent1.channels = ch1agent1.channels.ch1.capacity = 1000agent1.sources = avro-source1agent1.sinks = log-sink1首先,启动Agent进程:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1然后,启动Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console
- []Avro Source+Memory Channel+HDFS Sink[/]
# Define a source, channel, sinkagent1.sources = avro-source1agent1.channels = ch1agent1.sinks = hdfs-sink# Configure channelagent1.channels.ch1.type = memoryagent1.channels.ch1.capacity = 1000000agent1.channels.ch1.transactionCapacity = 500000# Define an Avro source called avro-source1 on agent1 and tell it# to bind to 0.0.0.0:41414. Connect it to channel ch1.agent1.sources.avro-source1.channels = ch1agent1.sources.avro-source1.type = avroagent1.sources.avro-source1.bind = 0.0.0.0agent1.sources.avro-source1.port = 41414# Define a logger sink that simply logs all events it receives# and connect it to the other end of the same channel.agent1.sinks.hdfs-sink1.channel = ch1agent1.sinks.hdfs-sink1.type = hdfsagent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_fileagent1.sinks.hdfs-sink1.hdfs.fileSuffix = .logagent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576agent1.sinks.hdfs-sink1.rollInterval = 0agent1.sinks.hdfs-sink1.hdfs.rollCount = 0agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500agent1.sinks.hdfs-sink1.hdfs.round = trueagent1.sinks.hdfs-sink1.hdfs.roundUnit = minuteagent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = trueagent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1agent1.sinks.hdfs-sink1.fileType = SequenceFileagent1.sinks.hdfs-sink1.writeFormat = TEXT首先,启动Agent:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-hdfs.properties -Dflume.root.logger=INFO,console -n agent1然后,启动Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console可以查看同步到HDFS上的数据:
hdfs dfs -ls /data/flume结果示例,如下所示:
-rw-r--r-- 3 shirdrn supergroup 1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log-rw-r--r-- 3 shirdrn supergroup 1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log-rw-r--r-- 3 shirdrn supergroup 259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log
- []Spooling Directory Source+Memory Channel+HDFS Sink[/]
# Define source, channel, sinkagent1.sources = spool-source1agent1.channels = ch1agent1.sinks = hdfs-sink1# Configure channelagent1.channels.ch1.type = memoryagent1.channels.ch1.capacity = 1000000agent1.channels.ch1.transactionCapacity = 500000# Define and configure an Spool directory sourceagent1.sources.spool-source1.channels = ch1agent1.sources.spool-source1.type = spooldiragent1.sources.spool-source1.spoolDir = /home/shirdrn/data/agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)?agent1.sources.spool-source1.batchSize = 50agent1.sources.spool-source1.inputCharset = UTF-8# Define and configure a hdfs sinkagent1.sinks.hdfs-sink1.channel = ch1agent1.sinks.hdfs-sink1.type = hdfsagent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%Sagent1.sinks.hdfs-sink1.hdfs.fileSuffix = .logagent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576agent1.sinks.hdfs-sink1.hdfs.rollCount = 0agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500agent1.sinks.hdfs-sink1.hdfs.round = trueagent1.sinks.hdfs-sink1.hdfs.roundUnit = minuteagent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = trueagent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1agent1.sinks.hdfs-sink1.fileType = SequenceFileagent1.sinks.hdfs-sink1.writeFormat = TEXTagent1.sinks.hdfs-sink1.rollInterval = 0启动Agent进程,执行如下命令:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1可以查看HDFS上同步过来的数据:
hdfs dfs -ls /data/flume结果示例,如下所示:
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log-rw-r--r-- 3 shirdrn supergroup 1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log
- []Exec Source+Memory Channel+File Roll Sink[/]
配置Agent,修改配置文件flume-conf-file.properties,内容如下:
# Define source, channel, sink启动Agent进程,执行如下命令:
agent1.sources = tail-source1
agent1.channels = ch1
agent1.sinks = file-sink1
# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000
# Define and configure an Exec source
agent1.sources.tail-source1.channels = ch1
agent1.sources.tail-source1.type = exec
agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log
agent1.sources.tail-source1.shell = /bin/sh -c
agent1.sources.tail-source1.batchSize = 50
# Define and configure a File roll sink
# and connect it to the other end of the same channel.
agent1.sinks.file-sink1.channel = ch1
agent1.sinks.file-sink1.type = file_roll
agent1.sinks.file-sink1.batchSize = 100
agent1.sinks.file-sink1.serializer = TEXT
agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-file.properties -Dflume.root.logger=INFO,console -n agent1可以查看File Roll Sink对应的本地文件系统目录/home/shirdrn/sink_data下,示例如下所示:
-rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1有关Flume NG更多配置及其说明,请参考官方用户手册,非常详细。
-rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2
-rw-rw-r-- 1 shirdrn shirdrn 0 Sep 17 11:37 1410924990039-3
-rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4
-rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5
参考链接:
http://flume.apache.org/FlumeUserGuide.html
https://blogs.apache.org/flume/entry/flume_ng_architecture
分享阅读原文:http://shiyanjun.cn/archives/915.html
作者:时延军
Flume日志收集分层架构应用实践
大数据 Ansible 发表了文章 0 个评论 7191 次浏览 2016-03-19 18:54
- []模块化设计:在其Flume Agent内部可以定义三种组件:Source、Channel、Sink[/][]组合式设计:可以在Flume Agent中根据业务需要组合Source、Channel、Sink三种组件,构建相对复杂的日志流管道[/][]插件式设计:可以通过配置文件来编排收集日志管道的流程,减少对Flume代码的侵入性[/][]可扩展性:我们可以根据自己业务的需要来定制实现某些组件(Source、Channel、Sink)[/][]支持集成各种主流系统和框架:像Hadoop、HBase、Hive、Kafka、ElasticSearch、Thrift、Avro等,都能够很好的和Flume集成[/][]高级特性:Failover、Load balancing、Interceptor等[/]
基于Flume设计实现分层日志收集系统,到底有什么好处呢?我们可以先看一下,如果不分层,会带来哪些问题:为什么要对Flume日志收集系统进行分层设计
- []如果需要通过Kafka去缓冲上游基于Flume收集而构建的日志流,对于数据平台内部服务器产生的数据还好,但是如果日志数据是跨业务组,甚至是跨部门,那么就需要将Kafka相关信息暴露给外部,这样对Kafka的访问便不是数据平台内部可控的[/][]如果是外部日志进入平台内部HDFS,这样如果需要对Hadoop系统进行升级或例行维护,这种直连的方式会影响到上游部署Flume的日志流的始端日志收集服务[/][]如果数据平台内部某些系统,如Kafka集群、HDFS集群所在节点的机房位置变更,数据迁移,会使得依赖日志数据的外部系统受到不同程度的影响,外部系统需要相关开发或运维人员参与进来[/][]由于收集日志的数据源端可能是外部一些服务器(多个单个的节点),一些业务集群(相互协作的多节点组),也可能是内部一些提供收集服务的服务节点,这些所有的服务器上部署的Flume Agent都处于一层中,比较难于分组管理[/][]由于所有数据源端Flume Agent收集的日志进入数据平台的时候,没有一个统一的类似总线的组件,很难因为某些业务扩展而独立地去升级数据平台内部的接收层服务节点,可能为了升级数据平台内部某个系统或服务而导致影响了其他的接收层服务节点[/]
上图中,无论是外部还是内部,只要部署了Flume Agent的节点,都直接同内部的Kafka集群和Hadoop集群相连,所以在数据平台内部只能尽量保持Kafka和Hadoop集群正常稳定运行,也要为外部日志收集Flume Agent的数据流量的陡增和异常变化做好防控准备。再者,如需停机维护或者升级某一个集群,可能都需要通知外部所有Flume Agent所在节点的业务方,做好应对(停机)准备。接着看,如果我们基于Flume使用分层的方式来设计日志收集系统,又有哪些优势,如下图所示: 上图中,Flume日志收集系统采用两层架构设计:第一层(L1)是日志收集层,第二层(L2)是数据平台缓冲层(汇聚层)。通过这种方式,使得日志收集系统有如下特点:
- []针对数据平台外部的业务系统,根据需要分析的数据业务类型进行分组,属于同一种类型的业务日志,在数据平台前端增加了一个Flume汇聚层节点组,该组节点只影响到它对应的L1层的业务数据[/][]如果Hadoop集群、Kafka需要停机维护或升级,对外部L1层Flume Agent没有影响,只需要在L2层做好数据的接收与缓冲即可,待维护或升级结束,继续将L2层缓存的数据导入到数据存储系统[/][]如果外部某个类型的业务日志数据节点需要扩容,直接在L1层将数据流指向数据平台内部与之相对应的L2层Flume Agent节点组即可,能够对外部因业务变化发生的新增日志收集需求,进行快速地响应和部署[/][]对于数据平台内部,因为收集日志的节点非常可控,可以直接通过L1层Flume Agent使日志数据流入HDFS或Kafka,当然为了架构统一和管理,最好也是通过L2层Flume Agent节点组来汇聚/缓冲L1层Flume Agent收集的日志数据[/]
通过上面分析可见,分层无非是为了使的日志数据源节点的Flume Agent服务与数据平台的存储系统(Kafka/HDFS)进行解耦,同时能够更好地对同类型业务多节点的日志流进行一个聚合操作,并分离开独立管理。另外,可以根据实际业务需要,适当增加Flume系统分层,满足日志流数据的汇聚需要。
我们看一下,Flume日志收集系统,在我们这个示例应用中处于一个什么位置,我简单画了一下图,加了一些有关数据处理和分析的节点/组件,如下图所示:应用整体架构
这里,简单了解一下上图即可,由于日志收集在整个应用系统中是很重要的一个环节,所以必须保证日志收集系统设计的可靠、可用、灵活、稳定,通过上面在日志收集系统收集日志之后,数据平台所做的大量分析处理,来凸显日志收集系统的重要性,这里其他内容不做过多说明。
这里,我们主要以实时收集日志为例,说明如何构建一个相对复杂的Flume分层日志收集系统。首先,简要说明一下日志收集需求:Flume分层架构实践
- []手机客户端上报的用户行为事件(App User Event),通过数据平台内部定义好的接口格式,从Nginx日志里面实时流入数据平台,这对应于Flume日志收集系统L1层[/][]通过组织各种活动,来推广某些App的产品特性,会定向向用户推送通知,单独使用推送点击(Push Click)Agent来收集这些点击行为数据[/][]App所依赖的一些基础内容,会以服务的形式开放给外部第三方调用,对于由第三方App带来的用户的行为点击事件(Thirdparty Click),单独使用L1层Flume Agent进行收集[/][]第三方会在App中根据不同的内容,投放广告(Ad),对于广告曝光/点击行为的数据,与上述提到的数据收集单独分离出来,因为该日志数据后期可能会大规模推广,会有爆发性增长,在L1层进行收集[/][]在L2层主要是汇聚或缓冲L1层流入的日志数据[/][]同时,为了防止L2层Flume Agent因为故障或例行停机维护等,所以使用了Flume的Failover特性,亦即L1层每一个Sink同时指向L2层的2个相同的Flume Agent[/][]L1层的Flume Agent在收集日志的过程中应该不允许在Channel中累积过多数据(但是还要防止数据流速过慢导致内存Channel数据溢出),还要能够尽量降低读写磁盘的开销,所以使用内存类型的Channel[/][]L2层为了保证数据能够可靠地缓冲(在允许的一段时间内累积保存数据),如Hadoop或Kafka故障停机或停机维护升级,采用文件类型的Channel,还要尽量调大容量,也不能因为多应用共享磁盘而造成数据处理延迟,所以对于不同的Channel分别使用独立的磁盘[/]
上图是从实际的整个数据平台中拿出来一部分,简单便于解释说明。有关上图中所涉及到的Flume Agent的配置详情,下面会根据Flume分层的结构(L1层、L2层)来详细配置说明。由于L1层的10.10.1.101和10.10.1.102节点上部署的Flume Agent是对称的,所以下面只拿出其中一个来说明配置,不同的是,这两个节点上Flume Agent的Sink使用Failover功能,分别交叉指向L2层Flume Agent,也能够起到一定的负载均衡的作用。
下面,分别针对10.10.1.101节点上的3个Flume Agent的配置内容,分别进行说明如下:上游Flume日志收集层
- []L1层:App用户行为事件(App User Event)日志收集[/]
a1.sources = s1a1.channels = mc1a1.sinks = k1 k2# Configure sourcea1.sources.s1.channels = mc1a1.sources.s1.type = execa1.sources.s1.command = tail -F /data/nginx/logs/app_user_events.log# Configure channela1.channels.mc1.type = memorya1.channels.mc1.transactionCapacity = 50000a1.channels.mc1.capacity = 100000# Configure sinksa1.sinks.k1.channel = mc1a1.sinks.k1.type = avroa1.sinks.k1.hostname = 10.10.1.122a1.sinks.k1.port = 44446a1.sinks.k2.channel = mc1a1.sinks.k2.type = avroa1.sinks.k2.hostname = 10.10.1.121a1.sinks.k2.port = 44446# Configure failovera1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2a1.sinkgroups.g1.processor.type = failovera1.sinkgroups.g1.processor.priority.k1 = 9a1.sinkgroups.g1.processor.priority.k2 = 7a1.sinkgroups.g1.processor.maxpenalty = 10000
- []L1层:推送点击事件(Push Click Event)日志收集[/]
a2.sources = s2a2.channels = mc2a2.sinks = k3 k4# Configure sourcea2.sources.s2.channels = mc2a2.sources.s2.type = execa2.sources.s2.command = tail -F /data/nginx/logs/push_click_events.log# Configure channela2.channels.mc2.type = memorya2.channels.mc2.capacity = 50000a2.channels.mc2.transactionCapacity = 100000# Configure sinksa2.sinks.k3.channel = mc2a2.sinks.k3.type = avroa2.sinks.k3.hostname = 10.10.1.121a2.sinks.k3.port = 44447a2.sinks.k4.channel = mc2a2.sinks.k4.type = avroa2.sinks.k4.hostname = 10.10.1.122a2.sinks.k4.port = 44447# Configure failovera2.sinkgroups = g2a2.sinkgroups.g2.sinks = k3 k4a2.sinkgroups.g2.processor.type = failovera2.sinkgroups.g2.processor.priority.k3 = 9a2.sinkgroups.g2.processor.priority.k4 = 7a2.sinkgroups.g2.processor.maxpenalty = 10000
- []L1层:第三方点击事件(Thirdparty Click Event)日志收集[/]
a3.sources = s3a3.channels = mc3a3.sinks = k5 k6# Configure sourcea3.sources.s3.channels = mc3a3.sources.s3.type = execa3.sources.s3.command = tail -F /data/nginx/logs/thirdparty_click_events.log# Configure channela3.channels.mc3.type = memorya3.channels.mc3.transactionCapacity = 50000a3.channels.mc3.capacity = 100000# Configure sinksa3.sinks.k5.channel = mc3a3.sinks.k5.type = avroa3.sinks.k5.hostname = 10.10.1.121a3.sinks.k5.port = 44446a3.sinks.k6.channel = mc3a3.sinks.k6.type = avroa3.sinks.k6.hostname = 10.10.1.122a3.sinks.k6.port = 44446# Configure failovera3.sinkgroups = g3a3.sinkgroups.g3.sinks = k5 k6a3.sinkgroups.g3.processor.type = failovera3.sinkgroups.g3.processor.priority.k5 = 9a3.sinkgroups.g3.processor.priority.k6 = 7a3.sinkgroups.g3.processor.maxpenalty = 10000
- []L1层:广告点击事件(Ad Click Event)日志收集[/]
a4.sources = s4a4.channels = mc4a4.sinks = k7 k8# Configure sourcea4.sources.s4.channels = mc4a4.sources.s4.type = execa4.sources.s4.command = tail -F /data/nginx/logs/ad.log# Configure channela4.channels.mc4.type = memorya4.channels.mc4.transactionCapacity = 50000a4.channels.mc4.capacity = 100000# Configure sinksa4.sinks.k7.channel = mc4a4.sinks.k7.type = avroa4.sinks.k7.hostname = 10.10.1.121a4.sinks.k7.port = 44448a4.sinks.k8.channel = mc4a4.sinks.k8.type = avroa4.sinks.k8.hostname = 10.10.1.122a4.sinks.k8.port = 44448# Configure failovera4.sinkgroups = g4a4.sinkgroups.g4.sinks = k7 k8a4.sinkgroups.g4.processor.type = failovera4.sinkgroups.g4.processor.priority.k7 = 10a4.sinkgroups.g4.processor.priority.k8 = 8a4.sinkgroups.g4.processor.maxpenalty = 10000
下游Flume日志收集汇聚层
- []L2层:App用户事件+推送点击事件日志合并收集[/]
a1.sources = s1 s2a1.channels = fc1 fc2 fc3a1.sinks = kk1 fk2 kk3# Configure source:# Configure app user event source: s1 -> fc1+fc2a1.sources.s1.channels = fc1 fc2a1.sources.s1.type = avroa1.sources.s1.bind = 10.10.1.121a1.sources.s1.port = 44446a1.sources.s1.threads = 8# Configure source# Configure push click event source: s2 -> fc2+fc3a1.sources.s2.channels = fc2 fc3a1.sources.s2.type = avroa1.sources.s2.bind = 10.10.1.122a1.sources.s2.port = 44447a1.sources.s2.threads = 4# Configure file channel(/data1)# Configure app user event channel: fc1 ->kk1a1.channels.fc1.type = filea1.channels.fc1.checkpointDir = /data1/flume/channels/app_user_event/checkpointa1.channels.fc1.useDualCheckpoints = truea1.channels.fc1.backupCheckpointDir = /data1/flume/channels/app_user_event/backupa1.channels.fc1.dataDirs = /data1/flume/channels/app_user_event/dataa1.channels.fc1.transactionCapacity = 100000a1.channels.fc1.capacity = 500000a1.channels.fc1.checkpointInterval = 60000a1.channels.fc1.keep-alive = 5a1.channels.fc1.maxFileSize = 5368709120# Configure file channel(/data2)# Configure app user event + push click event: fc2 - > fk2a1.channels.fc2.type = filea1.channels.fc2.checkpointDir = /data2/flume/channels/offline_file_event/checkpointa1.channels.fc2.useDualCheckpoints = truea1.channels.fc2.backupCheckpointDir = /data2/flume/channels/offline_file_event/backupa1.channels.fc2.dataDirs = /data2/flume/channels/offline_file_event/dataa1.channels.fc2.transactionCapacity = 100000a1.channels.fc2.capacity = 500000a1.channels.fc2.checkpointInterval = 60000a1.channels.fc2.keep-alive = 5a1.channels.fc2.maxFileSize = 5368709120# Configure file channel(/data3)# Configure push click channel: fc3 ->kk3a1.channels.fc3.type = filea1.channels.fc3.checkpointDir = /data3/flume/channels/push_click_event/checkpointa1.channels.fc3.useDualCheckpoints = truea1.channels.fc3.backupCheckpointDir = /data3/flume/channels/push_click_event/backupa1.channels.fc3.dataDirs = /data3/flume/channels/push_click_event/dataa1.channels.fc3.transactionCapacity = 100000a1.channels.fc3.capacity = 500000a1.channels.fc3.checkpointInterval = 60000a1.channels.fc3.keep-alive = 5a1.channels.fc3.maxFileSize = 5368709120# Configure sink: RealtimeMessageSink(app user event)a1.sinks.kk1.type = org.shirdrn.flume.sink.RealtimeMessageSinka1.sinks.kk1.channel = fc1a1.sinks.kk1.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092a1.sinks.kk1.topic = json_user_eventa1.sinks.kk1.serializer.class = kafka.serializer.StringEncodera1.sinks.kk1.producer.type = asynca1.sinks.kk1.message.send.max.retries = 3a1.sinks.kk1.client.id = flume_app_user_event_2_1a1.sinks.kk1.event.decoder.count = 8a1.sinks.kk1.output.stat.event.batch.size = 2000a1.sinks.kk1.event.decoder.queue.size = 1000# Configure sink: RichRollingFileSinka1.sinks.fk2.type = org.shirdrn.flume.sink.RichRollingFileSinka1.sinks.fk2.channel = fc2a1.sinks.fk2.batchSize = 100a1.sinks.fk2.serializer = TEXTa1.sinks.fk2.sink.rollInterval = 60a1.sinks.fk2.sink.directory = /data/flume/rolling_filesa1.sinks.fk2.sink.file.prefix = eventa1.sinks.fk2.sink.file.suffix = .loga1.sinks.fk2.sink.file.pattern = yyyyMMddHHmmss# Configure sink: RealtimeMessageSink(push click)a1.sinks.kk3.type = org.shirdrn.flume.sink.RealtimeMessageSinka1.sinks.kk3.channel = fc3a1.sinks.kk3.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092a1.sinks.kk3.topic = json_push_click_eventa1.sinks.kk3.serializer.class = kafka.serializer.StringEncodera1.sinks.kk3.producer.type = asynca1.sinks.kk3.message.send.max.retries = 3a1.sinks.kk3.client.id = flume_push_click_2_1a1.sinks.kk3.event.decoder.count = 4a1.sinks.kk3.output.stat.event.batch.size = 2000a1.sinks.kk3.event.decoder.queue.size = 1000
上面,可以看到我们自己实现的org.shirdrn.flume.sink.RealtimeMessageSink,该Sink主要是使Flume收集的日志写入Kafka中,在Flume 1.5.0版本中还没有内置实现,所以我们自己实现了,并在其中加入了适合我们业务的处理逻辑,比如,将Nginx日志记录行解析,然后根据实时计算需要,过滤掉不需要进入Kafka(最终在Storm集群中处理)事件数据,最后转成JSON字符串的格式,写入到Kafka中的Topic里。通过上面的配置也可以看出,可以配置很多参数,例如解析线程数、队列大小等。
由于我们需要将写入本地文件系统的文件按照我们自己的方式来定义,所以基于Flume内置的file_roll实现进行修改,实现了自己的org.shirdrn.flume.sink.RichRollingFileSink,该Sink主要是对文件名字符串进行格式化,能够通过文件名来获取到文件生成的时间(人类可读格式)。
- []L2层:广告点击事件日志收集[/]
a2.sources = s3a2.channels = fc4a2.sinks = kk4# Configure source: s3 -> fc4a2.sources.s3.channels = fc4a2.sources.s3.type = avroa2.sources.s3.bind = 10.10.1.121a2.sources.s3.port = 44448a2.sources.s3.threads = 2# Configure channel(/data4)# Configure Ad channel: fc4 ->kk4a2.channels.fc4.type = filea2.channels.fc4.checkpointDir = /data4/flume/channels/ad/checkpointa2.channels.fc4.useDualCheckpoints = truea2.channels.fc4.backupCheckpointDir = /data4/flume/channels/ad/backupa2.channels.fc4.dataDirs = /data4/flume/channels/ad/dataa2.channels.fc4.transactionCapacity = 100000a2.channels.fc4.capacity = 500000a2.channels.fc4.checkpointInterval = 60000a2.channels.fc4.keep-alive = 5a2.channels.fc1.maxFileSize = 5368709120# Configure sinks: RealtimeAdKafkaSinka2.sinks.kk4.type = org.shirdrn.flume.sink.RealtimeAdKafkaSinka2.sinks.kk4.channel = fc4a2.sinks.kk4.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092a2.sinks.kk4.topic = json_ad_eventa2.sinks.kk4.serializer.class = kafka.serializer.StringEncodera2.sinks.kk4.producer.type = asynca2.sinks.kk4.message.send.max.retries = 3a2.sinks.kk4.client.id = flume_ad_2_1a2.sinks.kk4.event.decoder.count = 4a2.sinks.kk4.output.stat.event.batch.size = 2500a2.sinks.kk4.event.decoder.queue.size = 5000
这里我们简单总结一些内容,如下所示:实践总结
- []Flume监控[/]
bin/flume-ng agent -n a1 -c conf -f conf/config.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
这样便可以在Flume Agent服务节点上,浏览Web端口34545来查看,数据以JSON格式表示,比较重要的一些元数据,如channel容量、当前使用量等等,通过这些数据可以了解当前Flume的工作状态,是否需要升级扩容等等。
另外,也可以通过Ganglia来收集并分析Flume Agent服务运行状态,能够更加详细地展示Flume Agent服务的状态,因为Ganglia配置相对复杂,这里就不做过多解释,感兴趣可以尝试一下。
- []Flume内存调优[/]
JAVA_OPTS="-server -Xms1024m -Xmx4096m -Dcom.sun.management.jmxremote -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=4 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/data/flume/logs/gc-ad.log"这样,可以方便地修改GC策略,一般由于Flume实时收集日志比较注重实时性,希望能够快速地响应,尽量减少GC导致暂停业务线程被挂起的时间,所以可以将GC设置为ParNew+CMS策略。将GC日志输出,在一定程度上能够更加方便地观察Flume Agent服务运行过程中JVM GC的详细情况,通过诊断来优化服务运行。
- []下游L2层接收消息调优[/]
a1.sources.s1.type = avroa1.sources.s1.bind = 10.10.1.121a1.sources.s1.port = 44446a1.sources.s1.threads = 8上面默认情况下threads参数的值1,可以将该值调大,否则的话,L1层就会堆积日志记录,严重可能导致数据丢失。
- []Flume处理业务逻辑约束[/]
Flume的易扩展性使得我们可以根据自己的业务特点来实现一些组件,那么我们在将实际业务逻辑掺杂进Flume中时,需要考虑是否非得必须这么做?如果这么做是否会影响Flume实时传输日志的速度和效率?
Flume作为一个轻量级的日志收集工具,个人认为最好将相对复杂的业务逻辑(尤其是需要与一些存储系统,如MySQL、Redis交互时)后移,放在Storm集群中去处理,或者自己实现的业务处理集群中,而Flume就让它去做其擅长的事情——路由消息。
当然,有些业务场景可能必须在Flume日志收集层去做,如根据原始非结构化的消息,无法控制不同类型的消息路由到不同的目的地,那么可能需要在收集层做一个简单的解析或格式化,实际上这是在
Flume层做了一个简单的日志分发。无论如何,如果想在Flume层插入业务逻辑处理,尽量避免过于复杂的处理而影响整个日志传输速度,如果后端有实时推荐需求,日志中事件的实时性大大延迟,就会影响实施个性化推荐。
分享阅读原文:http://shiyanjun.cn/archives/1497.html
文章作者:时延军