Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
365 views
in Technique[技术] by (71.8m points)

如何动态扩展flume的avro sink?

我有几个服务实例,部署了flume进行日志采集,这些flume将会通过avro sink,将日志传输到flume-master节点进行分发,目前flume-master有两个实例。
我现在遇到的问题是,在请求量上升后,两个flume-master实例已经无法处理所有请求,会产生堆积,我希望能水平扩展flume-master。

目前使用的是下面的负载均衡的形式,将日志sink到两个master实例,但是在需要扩展master节点的时候,需要手动修改flume的配置,加多一个sink,这种方式并不友好。

agent.sources = source
agent.channels = fileChannel
agent.sinks = avroSink1 avroSink2

agent.sources.source.type = TAILDIR
agent.sources.source.filegroups = group1
agent.sources.source.filegroups.group1 = /home/xx/logs/xx/xx.log
agent.sources.source.positionFile = /home/xx/data/flume-xx/taildir/position.json
agent.sources.source.skipToEnd = true
agent.sources.source.channels = fileChannel


agent.sinks.avroSink1.type = avro
agent.sinks.avroSink1.hostname = HOST_1
agent.sinks.avroSink1.port = 4143
agent.sinks.avroSink1.channel = fileChannel

agent.sinks.avroSink2.type = avro
agent.sinks.avroSink2.hostname = HOST_2
agent.sinks.avroSink2.port = 4143
agent.sinks.avroSink2.channel = fileChannel

agent.sinkgroups = avroGroup
agent.sinkgroups.avroGroup.sinks = avroSink1 avroSink2
agent.sinkgroups.avroGroup.processor.type = load_balance
agent.sinkgroups.avroGroup.processor.backoff = true
agent.sinkgroups.avroGroup.processor.selector = round_robin


agent.channels.fileChannel.type = file
agent.channels.fileChannel.checkpointDir = /home/acs/data/flume-xx/checkpoint
agent.channels.fileChannel.dataDirs = /home/xx/data/flume-xx/data
agent.channels.fileChannel.checkpointInterval = 2
agent.channels.fileChannel.capacity = 400000

那么是否可以使用一个代理的HOST来代理所有的flume master节点,这样在对master进行水平扩展的时候,就不需要修改服务上的flume。但经过测试后发现这个方法无法生效。
具体原因如下:
sink第一次传递event的时候通过代理地址拿到了真实地址,并创建了一个长连接,导致sink只会往一个avro地址传递event,直到该地址失效。
然后,我又注意到avro sink中有reset-connection-interval 这个参数,这个参数规定了强制重置avro sink的间隔,看起来似乎可以达到负载均衡的目的,但实际上还是不行的,存在明显的缺陷:

  • avro sink本身创建了长连接,如果为了达到负载均衡的目的,不断重新创建连接,会极大地影响性能
  • 连接会一直创建销毁重新创建,但始终只有一个连接存在,在那段时间内该连接承担所有流量
agent.sources = source
agent.channels = fileChannel
agent.sinks = avroSink

agent.sources.source.type = TAILDIR
agent.sources.source.filegroups = group1
agent.sources.source.filegroups.group1 = /home/xx/logs/xx/xx.log
agent.sources.source.positionFile = /home/xx/data/flume-xx/taildir/position.json
agent.sources.source.skipToEnd = true
agent.sources.source.channels = fileChannel


agent.sinks.avroSink.type = avro
agent.sinks.avroSink.hostname = HOST #HOST 为负载均衡的地址,代理了所有的master节点
agent.sinks.avroSink.port = 4143
agent.sinks.avroSink.channel = fileChannel


agent.channels.fileChannel.type = file
agent.channels.fileChannel.checkpointDir = /home/acs/data/flume-xx/checkpoint
agent.channels.fileChannel.dataDirs = /home/xx/data/flume-xx/data
agent.channels.fileChannel.checkpointInterval = 2
agent.channels.fileChannel.capacity = 400000

以上是对动态扩展avor sink的尝试,最终目的是为了友好的水平扩展以应对流量的增长,所以如果不局限于avro sink,可以通过引入中间件kafka的形式来应对流量突增,因为kafka sink的bootstrap.servers 是一个broker的发现地址,类型负载均衡的实现


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神解答

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...