2000元阿里云代金券免费领取,2核4G云服务器仅664元/3年,新老用户都有优惠,立即抢购>>>
阿里云采购季(云主机223元/3年)活动入口:请点击进入>>>,
阿里云学生服务器(9.5元/月)购买入口:请点击进入>>>,
结构:
nginx-flume->kafka->flume->kafka(因为牵扯到跨机房问题,在两个kafka之间加了个flume,蛋疼。。)
现象:
在第二层,写入kafka的topic和读取的kafka的topic相同,手动设定的sink topic不生效
打开debug日志:
source实例化:
1
2
3
|
21 Apr 2015 19:24:03,146 INFO [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:41) - Creating instance of source kafka1, type org.apache.flume.source.kafka.KafkaSource
21 Apr 2015 19:24:03,146 DEBUG [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.getClass:61) - Source type org.apache.flume.source.kafka.KafkaSource is a custom type
21 Apr 2015 19:24:03,152 INFO [conf-file-poller-0] (org.apache.flume.source.kafka.KafkaSourceUtil.getKafkaProperties:37) - context={ parameters:{topic=bigdata_api_ele_me_access, batchDurationMillis=5000, groupId=nginx, zookeeperConnect=xxx, channels=bigdata_api_ele_me_access-channel4, batchSize=2000, type=org.apache.flume.source.kafka.KafkaSource} }
|
sink实例化:
1
2
3
4
5
6
|
21 Apr 2015 19:24:03,185 INFO [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42) - Creating instance of sink: web-sink2, type: org.apache.flume.sink.kafka.KafkaSink
21 Apr 2015 19:24:03,185 DEBUG [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.getClass:63) - Sink type org.apache.flume.sink.kafka.KafkaSink is a custom type
21 Apr 2015 19:24:03,190 DEBUG [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:220) - Using batch size: 2000
21 Apr 2015 19:24:03,190 INFO [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:229) - Using the static topic: nginx-access this may be over-ridden by event headers
21 Apr 2015 19:24:03,191 INFO [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34) - context={ parameters:{topic=nginx-access, brokerList=1xxx, requiredAcks=1, batchSize=2000, type=org.apache.flume.sink.kafka.KafkaSink, channel=bigdata_api_ele_me_access-channel4} }
21 Apr 2015 19:24:03,191 DEBUG [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:236) - Kafka producer properties: {metadata.broker.list=192.168.101.43:9092,192.168.101.44:9092,192.168.101.45:9092, request.required.acks=1, key.serializer.class=kafka.serializer.StringEncoder, serializer.class=kafka.serializer.DefaultEncoder}
|
可以看到创建sink和source实例的时候配置上下文context中topic是按设置的来的,但是看到日志中有下面一段:
1
|
Using the static topic: nginx-access this may be over-ridden by event headers
|
分析KafkaSink源码:
org.apache.flume.sink.kafka.KafkaSink.process方法中:
1
2
3
4
5
6
7
8
9
10
11
12
|
public
static
final
String KEY_HDR =
"key"
;
public
static
final
String TOPIC_HDR =
"topic"
;
...
if
((eventTopic = headers.get(TOPIC_HDR)) ==
null
) {
eventTopic = topic;
}
//eventTopic的取值,会从header中获取,如果header中没有才会使用配置的topic
...
eventKey = headers.get(KEY_HDR);
...
KeyedMessage<String,
byte
[]> data =
new
KeyedMessage<String,
byte
[]>
(eventTopic, eventKey, eventBody);
messageList.add(data);
|
其中topic的取值在configure中:
1
2
3
4
5
6
7
8
9
10
|
topic = context.getString(KafkaSinkConstants.TOPIC,
KafkaSinkConstants.DEFAULT_TOPIC);
//通过flume的配置获取topic,如果没有设置topic按默认default-flume-topic处理
if
(topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {
logger.warn(
"The Property 'topic' is not set. "
+
"Using the default topic name: "
+
KafkaSinkConstants.DEFAULT_TOPIC);
}
else
{
logger.info(
"Using the static topic: "
+ topic +
" this may be over-ridden by event headers"
);
//这里提示可能会被header覆盖
}
|
header的来源:
1)kafka中的数据是没有header的概念的
2)flume中的消息分header/body概念
这种结构下,数据由kafkasource进入flume,添加了header信息,然后流入到kafkasink
kafkasource中header的添加处理在org.apache.flume.source.kafka.KafkaSource.process方法中:
1
2
3
4
5
6
7
8
9
10
|
if
(iterStatus) {
// get next message
MessageAndMetadata<
byte
[],
byte
[]> messageAndMetadata = it.next();
kafkaMessage = messageAndMetadata.message();
kafkaKey = messageAndMetadata.key();
// Add headers to event (topic, timestamp, and key)
headers =
new
HashMap<String, String>();
headers.put(KafkaSourceConstants.TIMESTAMP,
String.valueOf(System.currentTimeMillis()));
headers.put(KafkaSourceConstants.TOPIC, topic);
|
因为kafka中不需要header,注释掉org.apache.flume.sink.kafka.KafkaSink.process中这几段代码即可:
1
2
3
4
5
6
|
/*
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}
*/
eventTopic = topic;
//增加这一段,否则会有npe错误
|
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1638342,如需转载请自行联系原作者