flume-dirtail-source
用途:
对一个目录的日志文件进行tail作为flume的source,可以根据文件名进行筛选(正则)
原理:
source基于exec source 进行了简化
对exec source的newSingleThreadExecutor改为多线程并行的,每几个需要监控的文件对应一个execrunnable实例
tail -F 实现对文件的持续读,log的daily rolling也是没有问题的
使用apache的vfs进行目录的监控,当文件有增删改时,都有得到一个eventlistener的回调,根据回调的事件对execrunnable进行维护,新增或者删除。
config:
agent.sources.originallog.type = org.apache.flume.source.dirtail.DirTailSource
agent.sources.originallog.dirPath = /home/peiliping/dev/logs/
agent.sources.originallog.file-pattern = ^(.*)(\.log)$
agent.sources.originallog.charset = UTF-8 (default)
agent.sources.originallog.batchTimeout = 3000 (default)
agent.sources.originallog.bufferCount = 20 (default)
agent.sources.originallog.topicByFileName = false (default)
agent.sources.originallog.restart = false (default)
agent.sources.originallog.restartThrottle = 10000 (default)
TopicInterceptor
用途:
对每一个event添加一个header,表示信息的topic
如果用文件的名字,可能不方便管理,容易冲突,这里通过远程加载一个property文件来进行文件名与topic的映射
group字段用于表示不同的配置文件列别,便于管理和维护。
config:
agent.sources.originallog.interceptors = topicinterceptor
agent.sources.originallog.interceptors.topicinterceptor.type = org.apache.flume.interceptor.TopicInterceptor$Builder
agent.sources.originallog.interceptors.topicinterceptor.configUri = http://127.0.0.1/
agent.sources.originallog.interceptors.topicinterceptor.group = das.tmc
agent.sources.originallog.interceptors.topicinterceptor.checkPeriod = 600000 (default)