欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flume

时间:2023-05-14
Flume安装地址

·Flume官网地址:http://flume.apache.org/·文档查看地址:http://flume.apache.org/FlumeUserGuide.html·下载地址:http://archive.apache.org/dist/flume/

Flume概述

·Flume是一个高可用的,分布式的海量日志采集、聚合和传输的系统:基于流式架构,简单灵活·Flume可以实时的监控本地磁盘的数据,实时读取服务器本地磁盘,聚合在一起,上传到HDFS等

Flume基础架构

·Agent是一个JVM进程,主要组成的是source、channel、sink·source·source是负责拉取数据到Flume agent的组件。source组件可以处理各种类型、各种格式的日志数据·channel(管道)·channel是位于source和sink之间的缓冲区;channel允许source和sink运作速率不同。线程安全的,可以同时处理多个source的写入和sink的读取操作·Flume自带两种channel:Memroy channel、File channel·sink(将数据传送到HDFS或者存储系统或另一个Flume Agent)·不断地轮询Channel中的event且批量删除,并将这些event批量写入存储体系等·Event·Flume传输基本单元,以Event形式将数据从源头送至目的地·Event有Header和Body组成·Header用来存放Event的一些属性,为K-V结构;拦截器就是利用Header来的·Body用来存储该条数据,为字节数组

Flume安装部署

·解压flume的压缩包·将flume/lib下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3·修改/conf下的log4j.properties日志打印位置#console表示同时将日志输出到控制台flume.root.logger=INFO,LOGFILE,console#固定日志输出的位置flume.log.dir=/opt/module/flume/logs#日志文件的名称flume.log.file=flume.log

Flume入门案例

需求:使用Flume监听一个端口,收集该端口数据,并打印到控制台步骤:·通过nc命令查看是否存在netcat,如果没有先安装netcat工具(网络测试工具,可以将两台机器进行tcp/udp连接)·sudo yum install -y nc·查看端口是否被占用·sudo netstat -nlp | grep 端口·在flume下创建一个job目录,也可以直接使用conf目录·创建flume agent配置文件nc-flume-console.conf·添加以下内容添加内容如下:# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source 从netcat中拿取数据a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel 一个sink只能接一个channel,一个channels可以接多个source和sinka1.sources.r1.channels = c1a1.sinks.k1.channel = c1·先开启flume监听端口·flume-ng agent -c /flume/conf -f /job/nc-flume-console.conf -n a1或·flume-ng agent --conf conf/ --name a1 --conf-file conf/nc-flume-log.conf -Dflume.root.logger=INFO,console

实时监控目录下的多个追加文件

需求:使用flume监听整个目录的实时追加文件,并上传到hdfs·source的TAILDIR类型·Taildir:观察指定的文件,并在检测到附加到每个文件的新行后几乎实时地跟踪它们。具有断点续传的功能·filegroups:文件组,可以监控多个文件夹,需要给文件夹起个名字·配置文件# Name the components on this agent a1为agent的别名a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source TAILDIRa1.sources.r1.type = TAILDIR#filegroups:文件组,可以监控多个文件夹,需要给文件夹起个名字a1.sources.r1.filegroups = f1 f2#文件组的绝对路径,必须到精确到文件,可以写匹配表达式匹配多个文件a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/files1/.*file.*a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/files2/.*file.*#JSON格式的文件,记录每个尾号文件的inode、绝对路径和最后位置。a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json# Describe the sinka1.sinks.k1.type = hdfs#HDFS路径,主机名:端口号可写可不写,会自动识别a1.sinks.k1.hdfs.path = /flume/%Y%m%d/%H#可设置前缀、后缀a1.sinks.k1.hdfs.filePrefix = log-#为解决小文件问题,先存为.tmp文件,达到一定要求转为正式文件#业内经验rollInterval=1h,rollSize=128M,rollCount=0a1.sinks.k1.hdfs.rollInterval = 10a1.sinks.k1.hdfs.rollSize = 134217700a1.sinks.k1.hdfs.rollCount = 0#是否使用本地时间戳,以上占位符会使用的到a1.sinks.k1.hdfs.useLocalTimeStamp = true#设置文件类型,DataStrem不带序列化a1.sinks.k1.hdfs.fileType = DataStream# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

Flume进阶

·Put事务·source和channel之间的事务·Take事务·channel和sink之间的事务

Flume内部原理

·ChannelSelector·ChannelSelector的作用是选出Event将要被发往哪个Channel。共两种类型:Replicating(复制)和Multiplexing(多路复用)·ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。·SinkProcessor·共三种类型:DefaultSinkProcessor(默认1对1)、LoadBalancingSinkProcessor(负载均衡)和FailoverSinkProcessor(故障转移)·DefaultSinkProcessor对应的是单个的Sink;LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。

Replication(复制)案例

·Agent-01# Name the components on this agent a1为agent的别名a1.sources = r1a1.sources = r1a1.sinks = k1 k2a1.channels = c1 c2# Describe/configure the source TAILDIRa1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = f1#文件组的绝对路径,必须到精确到文件,可以写匹配表达式匹配多个文件a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/files1/.*#实现断点续传的文件存放位置 不改有默认位置也能实现断点续传。a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json# 将数据流复制给所有channel 默认参数可以不写a1.sources.r1.selector.type = replicating# Describe the sink Avro的source和sink是一组的,能形成通信a1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop101a1.sinks.k1.port = 4141a1.sinks.k2.type = avroa1.sinks.k2.hostname = hadoop101a1.sinks.k2.port = 4142#Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memorya1.channels.c2.capacity = 1000a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1 c2a1.sinks.k1.channel = c1a1.sinks.k2.channel = c2·Agent-02(上传到hdfs)# Name the components on this agent a1为agent的别名a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source avroa1.sources.r1.type = avroa1.sources.r1.bind = hadoop101a1.sources.r1.port = 4141# Describe the sinka1.sinks.k1.type = hdfs#HDFS路径,主机名:端口号可写可不写,会自动识别a1.sinks.k1.hdfs.path = /flumeHDFS/%Y%m%d/%H#可设置前缀、后缀a1.sinks.k1.hdfs.filePrefix = log-#为解决小文件问题,先存为.tmp文件,达到一定要求转为正式文件#业内经验rollInterval=1h,rollSize=128M,rollCount=0a1.sinks.k1.hdfs.rollInterval = 10a1.sinks.k1.hdfs.rollSize = 134217700a1.sinks.k1.hdfs.rollCount = 0#是否使用本地时间戳,以上占位符会使用的到a1.sinks.k1.hdfs.useLocalTimeStamp = true#设置文件类型a1.sinks.k1.hdfs.fileType = DataStream#Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1·Agent-03(传到本地)# Name the components on this agent a1为agent的别名a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source TAILDIRa1.sources.r1.type = avroa1.sources.r1.bind = hadoop101a1.sources.r1.port = 4142# Describe the sink 输出到本地磁盘: file_rolla1.sinks.k1.type = file_roll#输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。a1.sinks.k1.sink.directory = /opt/module/flume-1.9.0/flumeFile_roll_datas#Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

Multiplexing多路复用

需求:使用Flume采集服务器本地日志,按照日志类型的不同,将不同种类的日志发送到不同分析系统模拟:在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。分析:在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume的channel selecter中的Multiplexing结构,Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。·编写拦截器APIpublic class MyInterceptor implements Interceptor { //初始化,程序开始走以便 @Override public void initialize() { } //处理单个Event,可以嵌套到下面的方法,解耦增加复用性 @Override public Event intercept(Event event) { //需求:在event的header中添加标记 //提供给channel selector 选择发送不同的channel Map header = event.getHeaders(); byte[] body = event.getBody(); String info = new String(body); //判断info的开头的第一个字符,数字发送给channel,字母发送给channel2 char c = info.charAt(0); if (c >= '0' && c <='9'){ header.put("type","number"); }else if((c >= 'a'&&c <='z')||(c >= 'A'&&c <='Z')){ header.put("type","letter"); } return event; } //处理多个Event,系统调用的是此方法 @Override public List intercept(List events) { for (Event event : events) { intercept(event); } return events; } //关闭 @Override public void close() { } public static class Builder implements Interceptor.Builder{ //通过此方法创建拦截器对象 @Override public Interceptor build() { return new MyInterceptor(); } //配置文件 @Override public void configure(Context context) { } }}·打的jar包要放到/flume/lib下·flume-1# Name the components on this agenta1.sources = r1a1.sinks = k1 k2a1.channels = c1 c2# Describe/configure the source 从netcat中拿取数据a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# 多路复用a1.sources.r1.selector.type = multiplexing#使用header中的哪些参数,对应header中的keya1.sources.r1.selector.header = type#number/letter对应valuea1.sources.r1.selector.mapping.number = c1a1.sources.r1.selector.mapping.letter = c2#匹配不上走的channela1.sources.r1.selector.default = c1#注册拦截器a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = com.flume.interceptors.MyInterceptor$builder# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop101a1.sinks.k1.port = 4141a1.sinks.k2.type = avroa1.sinks.k2.hostname = hadoop101a1.sinks.k2.port = 4142# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memorya1.channels.c2.capacity = 1000a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel 一个sink只能接一个channel,一个channels可以接多个source和sinka1.sources.r1.channels = c1 c2a1.sinks.k1.channel = c1a1.sinks.k2.channel = c2

聚合

需求: 监听一台服务器的一个本地文件夹,监听另一台服务器的一个端口,前两台服务器的变化最终输出到第三台服务器的控制台上·第一服务器# Name the components on this agent a1为agent的别名a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source TAILDIRa1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = f1#文件组的绝对路径,必须到精确到文件,可以写匹配表达式匹配多个文件a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/files1/.*file.*#实现断点续传的文件存放位置 不改有默认位置也能实现断点续传。a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop103a1.sinks.k1.port = 4141#Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1·第二台服务器 # Name the components on this agent a1为agent的别名a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop103a1.sinks.k1.port = 4142# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1·第三台服务器# Name the components on this agent a1为agent的别名a1.sources = r1 r2a1.sinks = k1a1.channels = c1# Describe/configure the source TAILDIRa1.sources.r1.type = avroa1.sources.r1.bind = hadoop103a1.sources.r1.port = 4141a1.sources.r2.type = avroa1.sources.r2.bind = hadoop103a1.sources.r2.port = 4142# Describe the sinka1.sinks.k1.type = logger#Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sources.r2.channels = c1a1.sinks.k1.channel = c1

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。