Flume 入门–几种不同的Sinks

Flume 入门–几种不同的Sinks主要介绍几种常见Flume的Sink汇聚点1.LoggerSink 记录INFO级别的日志,一般用于调试。前面介绍Source时候用到的Sink都是这个类型的Sink必须配置的属性:属性说明:         !channel   –             !type   –   Thecomponenttypename,needstobelogger  …

主要介绍几种常见Flume的Sink–汇聚点

1.Logger Sink 

记录INFO级别的日志,一般用于调试。前面介绍Source时候用到的Sink都是这个类型的Sink

必须配置的属性:

属性说明:
            !channel    –     
            !type    –    The component type name, needs to be logger
            maxBytesToLog    16    Maximum number of bytes of the Event body to log

            要求必须在 –conf 参数指定的目录下有 log4j的配置文件
            可以通过-Dflume.root.logger=INFO,console在命令启动时手动指定log4j参数

案例:前面的例子都是这种类型的Sink

2.File Roll Sink

在本地文件系统中存储事件。每隔指定时长生成文件保存这段时间内收集到的日志信息。

属性说明:
            !channel    –     
            !type    –    类型,必须是”file_roll”
            !sink.directory    –    文件被存储的目录
            sink.rollInterval    30    滚动文件每隔30秒(应该是每隔30秒钟单独切割数据到一个文件的意思)。如果设置为0,则禁止滚动,从而导致所有数据被写入到一个文件。
            sink.serializer    TEXT    Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
            batchSize    100    

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
编写配置文件:
  
#命名Agent a1的组件
  
a1.sources  =  r1
  
a1.sinks  =  k1
  
a1.channels  =  c1
 
  
#描述/配置Source
  
a1.sources.r1.
type  
= http
  
a1.sources.r1.port  = 6666
 
  
#描述Sink
  
a1.sinks.k1.
type  
= file_roll
  
a1.sinks.k1.sink.directory = 
/home/park/work/apache-flume-1
.6.0-bin
/mysink
  
#描述内存Channel
  
a1.channels.c1.
type  
=  memory
  
a1.channels.c1.capacity  =  1000
  
a1.channels.c1.transactionCapacity  =  100
 
  
#为Channle绑定Source和Sink
  
a1.sources.r1.channels  =  c1
  
a1.sinks.k1.channel  =  c1

 启动flume:

1
.
/flume-ng 
agent --conf ..
/conf 
--conf-
file 
..
/conf/template7
.conf --name a1 -Dflume.root.logger=INFO,console

测试:

通过curl命令向目标主机发送请求,就会发现在指定的文件夹下出现记录收集日志的文件

3.Avro Sink

是实现多级流动 和 扇出流(1到多) 扇入流(多到1) 的基础。非常重要 但是需要多台机器

必要属性说明:
            !channel    –     
            !type    –    The component type name, needs to be avro.
            !hostname    –    The hostname or IP address to bind to.
            !port    –    The port # to listen on.

案例1.多级流动  h1流动到h2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
h2:
                
配置配置文件:
                    
#命名Agent组件
                    
a1.sources=r1
                    
a1.sinks=k1
                    
a1.channels=c1
 
                    
#描述/配置Source
                    
a1.sources.r1.
type
=avro
                    
a1.sources.r1.bind=0.0.0.0
                    
a1.sources.r1.port=9988
                    
#描述Sink
                    
a1.sinks.k1.
type
=logger
                    
#描述内存Channel
                    
a1.channels.c1.
type
=memory
                    
a1.channels.c1.capacity=1000
                    
a1.channels.c1.transactionCapacity=1000
                    
#为Channel绑定Source和Sink
                    
a1.sources.r1.channels=c1
                    
a1.sinks.k1.channel=c1
                
启动flume:
                    
.
/flume-ng 
agent --conf ..
/conf 
--conf-
file 
..
/conf/template8
.conf --name a1 -Dflume.root.logger=INFO,console
 
                 
            
h1:
                
配置配置文件
                    
#命名Agent组件
                    
a1.sources=r1
                    
a1.sinks=k1
                    
a1.channels=c1
 
                    
#描述/配置Source
                    
a1.sources.r1.
type
=http
                    
a1.sources.r1.port=8888
                    
#描述Sink
                    
a1.sinks.k1.
type
=avro
                    
a1.sinks.k1.
hostname
=192.168.242.138
                    
a1.sinks.k1.port=9988
                    
#描述内存Channel
                    
a1.channels.c1.
type
=memory
                    
a1.channels.c1.capacity=1000
                    
a1.channels.c1.transactionCapacity=1000
                    
#为Channel绑定Source和Sink
                    
a1.sources.r1.chafile:
///C
:
/Users/park/Desktop/Day01_Flume/
%E6%96%87%E6%A1%A3
/Flume
%201.6.0%20User%20Guide%20%E2%80%94%20Apache%20Flume.htm
#irc-sinknnels=c1
                    
a1.sinks.k1.channel=c1

 启动flume

发送http请求到h1:

1
curl -X POST -d 
'[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' 
http:
//192
.168.242.133:8888

 稍等几秒后,发现h2最终收到了这条消息

案例2:扇出流(h1扇出到h2,h3)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
h2 h3:
                
配置配置文件:
                    
#命名Agent组件
                    
a1.sources=r1
                    
a1.sinks=k1
                    
a1.channels=c1
 
                    
#描述/配置Source
                    
a1.sources.r1.
type
=avro
                    
a1.sources.r1.bind=0.0.0.0
                    
a1.sources.r1.port=9988
                    
#描述Sink
                    
a1.sinks.k1.
type
=logger
                    
#描述内存Channel
                    
a1.channels.c1.
type
=memory
                    
a1.channels.c1.capacity=1000
                    
a1.channels.c1.transactionCapacity=1000
                    
#为Channel绑定Source和Sink
                    
a1.sources.r1.channels=c1
                    
a1.sinks.k1.channel=c1
                
启动flume:
                    
.
/flume-ng 
agent --conf ..
/conf 
--conf-
file 
..
/conf/template8
.conf --name a1 -Dflume.root.logger=INFO,console
 
            
h1:
                
配置配置文件
                    
#命名Agent组件
                    
a1.sources=r1
                    
a1.sinks=k1 k2
                    
a1.channels=c1 c2
 
                    
#描述/配置Source
                    
a1.sources.r1.
type
=http
                    
a1.sources.r1.port=8888
                    
#描述Sink
                    
a1.sinks.k1.
type
=avro
                    
a1.sinks.k1.
hostname
=192.168.242.138
                    
a1.sinks.k1.port=9988
                    
a1.sinks.k2.
type
=avro
                    
a1.sinks.k2.
hostname
=192.168.242.135
                    
a1.sinks.k2.port=9988
                    
#描述内存Channel
                    
a1.channels.c1.
type
=memory
                    
a1.channels.c1.capacity=1000
                    
a1.channels.c1.transactionCapacity=1000
                    
a1.channels.c2.
type
=memory
                    
a1.channels.c2.capacity=1000
                    
a1.channels.c2.transactionCapacity=1000
                    
#为Channel绑定Source和Sink
                    
a1.sources.r1.channels=c1 c2
                    
a1.sinks.k1.channel=c1 
                    
a1.sinks.k2.channel=c2 

案例3:扇入流()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
m3:
                
编写配置文件:
                    
#命名Agent组件
                    
a1.sources=r1
                    
a1.sinks=k1
                    
a1.channels=c1
                    
#描述/配置Source
                    
a1.sources.r1.
type
=avro
                    
a1.sources.r1.bind=0.0.0.0
                    
a1.sources.r1.port=4141
                    
#描述Sink
                    
a1.sinks.k1.
type
=logger
                    
#描述内存Channel
                    
a1.channels.c1.
type
=memory
                    
a1.channels.c1.capacity=1000
                    
a1.channels.c1.transactionCapacity=1000
                    
#为Channel绑定Source和Sink
                    
a1.sources.r1.channels=c1
                    
a1.sinks.k1.channel=c1
                
启动flume:
                    
.
/flume-ng 
agent --conf ..
/conf 
--conf-
file 
..
/conf/template
.conf --name a1 -Dflume.root.logger=INFO,console
             
            
m1、m2:
                
编写配置文件:
                    
#命名Agent组件
                    
a1.sources=r1
                    
a1.sinks=k1
                    
a1.channels=c1
 
                    
#描述/配置Source
                    
a1.sources.r1.
type
=http
                    
a1.sources.r1.port=8888
                    
#描述Sink
                    
a1.sinks.k1.
type
=avro
                    
a1.sinks.k1.
hostname
=192.168.242.135
                    
a1.sinks.k1.port=4141
                    
#描述内存Channel
                    
a1.channels.c1.
type
=memory
                    
a1.channels.c1.capacity=1000
                    
a1.channels.c1.transactionCapacity=1000
                    
#为Channel绑定Source和Sink
                    
a1.sources.r1.channels=c1
                    
a1.sinks.k1.channel=c1
                
启动flume:
                    
.
/flume-ng 
agent --conf ..
/conf 
--conf-
file 
..
/conf/template9
.conf --name a1 -Dflume.root.logger=INFO,console
                
m1通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:
                    
[root@localhost conf]
# curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
                
m2通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:
                    
[root@localhost conf]
# curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
                
发现m3均能正确收到消息

 

4、HDFS Sink

此Sink将事件写入到Hadoop分布式文件系统HDFS中。
            目前它支持创建文本文件和序列化文件。对这两种格式都支持压缩。 这些文件可以分卷,按照指定的时间或数据量或事件的数量为基础。
            它还通过类似时间戳或机器属性对数据进行 buckets/partitions 操作   
            HDFS的目录路径可以包含将要由HDFS替换格式的转移序列用以生成存储事件的目录/文件名。
            使用这个Sink要求hadoop必须已经安装好,以便Flume可以通过hadoop提供的jar包与HDFS进行通信。
            注意,此版本hadoop必须支持sync()调用。

必要属性说明:
                !channel    –     
                !type    –    类型名称,必须是“HDFS”
                !hdfs.path    –    HDFS 目录路径 (eg hdfs://namenode/flume/webdata/)
                hdfs.filePrefix    FlumeData    Flume在目录下创建文件的名称前缀
                hdfs.fileSuffix    –    追加到文件的名称后缀 (eg .avro – 注: 日期时间不会自动添加)
                hdfs.inUsePrefix    –    Flume正在处理的文件所加的前缀
                hdfs.inUseSuffix    .tmp    Flume正在处理的文件所加的后缀

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#命名Agent组件
    
a1.sources=r1
    
a1.sinks=k1
    
a1.channels=c1
 
    
#描述/配置Source
    
a1.sources.r1.
type
=http
    
a1.sources.r1.port=8888
    
#描述Sink
    
a1.sinks.k1.
type
=hdfs
    
a1.sinks.k1.hdfs.path=hdfs:
//0
.0.0.0:9000
/ppp
    
#描述内存Channel
    
a1.channels.c1.
type
=memory
    
a1.channels.c1.capacity=1000
    
a1.channels.c1.transactionCapacity=1000
    
#为Channel绑定Source和Sink
    
a1.sources.r1.channels=c1
    
a1.sinks.k1.channel=c1
           

 启动flume:

1
.
/flume-ng 
agent --conf ..
/conf 
--conf-
file 
..
/conf/template9
.conf --name a1 -Dflume.root.logger=INFO,console

 测试:通过利用curl给目的主机发送命令,会发现在HDFS中会生成相应的记录文件。

今天的文章Flume 入门–几种不同的Sinks分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/4784.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注