你好,游客 登录
背景:
阅读新闻

flume搜集日志:如何解决实时不断追加的日志文件及不断增加的文件个数问题

[日期:2017-09-06] 来源:aboutyun  作者:pig2 [字体: ]

问题导读
1.对于不断追加的文件可以使用flume哪个属性?
2.对于不断追加的文件及变化的文件个数,可是使用flume哪个属性?
3.该如何配置能够搜集网站日志的flume?




上篇文章
flume与kafka整合高可靠教程2:flume与kafka整合安装
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22173



本文的背景:
在搜集日志的过程中,日志文件的个数及日志文件需要不断的追加。flume1.6中,可以使用tail -f可以解决不断追加的文件,但是由于日志文件的个数是变化的,不可能只产生一个文件。所以tail -f就已经不能解决这个搜集日志的问题。

需求:
需要能够监控文件,并且追加这个,同时文件个数也是不断变化的。

解决办法:
这时候flume1.7就产生了,很好的通过 TAILDIRl解决了这个问题。TAILDIRl可以监控一个目录下的文件。

官网地址:http://flume.apache.org/FlumeUserGuide.html

官网文档截图:




上面加粗为常用属性。

这里我们只使用了下面两个属性
a1.sources.source1.filegroups.f1 = /data/aboutyunlog/.*log.*
a1.sources.source1.type = TAILDIR

flume1.7安装包
链接:http://pan.baidu.com/s/1c1Pzo9i 密码:fxa4



一、Flume安装

1. 压缩安装包

 
 
tar -zxvf ~/jar/apache-flume-1.7.0-bin.tar.gz -C /data
mv /data/apache-flume-1.7.0-bin/ /data/flume-1.7.0 # 重命名



2. 配置环境变量

 
 
echo -e "export FLUME_HOME=/data/flume-1.7.0\nexport PATH=\$FLUME_HOME/bin:\$PATH" >> ~/.bashrc
source ~/.bashrc



3. 配置flume

 
 
cp flume-env.sh.template flume-env.sh修改JAVA_HOME
export JAVA_HOME= /data/jdk1.8.0_111


4. 验证安装

 
flume-ng version





二、Flume使用

一个agent由source、channel、sink组成。这儿我们使用Spooling Directory Source、File Channel、Kafka Sink。

1. 单节点的agent
1) 增加配置文件

 
 
cd $FLUME_HOME/conf
vim single_agent.conf


将以下内容拷贝进去

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# agent的名称为a1
a1.sources = source1
a1.channels = channel1
a1.sinks = sink1
 
# set source
#a1.sources.source1.type = spooldir
a1.sources.source1.type = TAILDIR
a1.sources.source1.filegroups = f1
a1.sources.source1.filegroups.f1 = /data/aboutyunlog/.*log.*
#a1.sources.source1.spoolDir=/data/aboutyunlog
a1sources.source1.fileHeader = flase
 
# set sink
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.topic= aboutyunlog
a1.sinks.sink1.kafka.flumeBatchSize = 20
a1.sinks.sink1.kafka.producer.acks = 1
a1.sinks.sink1.kafka.producer.linger.ms = 1
a1.sinks.sink1.kafka.producer.compression.type = snappy
 
# set channel
a1.channels.channel1.type = file
a1.channels.channel1.checkpointDir = /data/flume_data/checkpoint
a1.channels.channel1.dataDirs= /data/flume_data/data
 
# bind
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1

2. 创建所需文件

 
 
 
mkdir -p /data/aboutyunlog
mkdir -p /data/flume_data/checkpoint
mkdir -p /data/flume_data/data


3. 查看kafka现有的topic

 
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --list


4. 在kafka上创建名为aboutyunlog的topic

 
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic aboutyunlog --replication-factor 1 --partitions 3


5. 启动flume

 
flume-ng agent --conf-file /data/flume-1.6.0/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console


启动过程中控制台会输出很多日志。

6. 创建一个kafka的consumer

 
kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181  --topic aboutyunlog --from-beginning


这条命令的意思是说创建aboutyunlog这个topic下的消费者,消费时从最开始的一条信息开始消费。

上图说明该消费者创建成功,由于本地/data/aboutyunlog目录下没有新文件加入,造成aboutyunlog这个topic没有信息输入,所以消费者没有得到一条信息。

7.  添加文件到flume source目录

 
 
echo -e "this is a test file! \nhttp://www.aboutyun.com20170820"
mv log.1 /data/aboutyunlog/


为:echo -e "this is a test file! \nhttp://www.aboutyun.com20170820">log.1
再次执行

 
echo -e "this is a test file! \nhttp://www.aboutyun.com20170820">log.2




然后我们看到

master上


注意:需要通过xshell链接两个master。也就是打开两个master界面

8. 再次查看kafka consumer


切换到创建kafka consumer的shell界面,会看到我们log.1\log.2中文件的内容被打印在屏幕上。


上图说明我们已经成功使用flume监控/data/aboutyunlog目录,并将监控目录中的内容发送到kafka的aboutyunlog主题中。

注意:如果使用flume1.6会找不到类。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
17/08/17 19:21:08 ERROR node.PollingPropertiesFileConfigurationProvider: Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: TAILDIR, class: TAILDIR
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)
        at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:42)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: TAILDIR
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
        ... 11 more

所以需更换flume1.7

收藏 推荐 打印 | 阅读:
相关新闻       kafka  flume 
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款