flume写kafka topic覆盖问题fix

简介:

2000元阿里云代金券免费领取,2核4G云服务器仅664元/3年,新老用户都有优惠,立即抢购>>>


阿里云采购季(云主机223元/3年)活动入口:请点击进入>>>,


阿里云学生服务器(9.5元/月)购买入口:请点击进入>>>,

结构:

nginx-flume->kafka->flume->kafka(因为牵扯到跨机房问题,在两个kafka之间加了个flume,蛋疼。。)


现象:

在第二层,写入kafka的topic和读取的kafka的topic相同,手动设定的sink topic不生效


打开debug日志:

source实例化:

1
2
3
21 Apr 2015 19:24:03,146 INFO [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:41) - Creating instance of source kafka1, type org.apache.flume.source.kafka.KafkaSource
21 Apr 2015 19:24:03,146 DEBUG [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.getClass:61)  - Source type org.apache.flume.source.kafka.KafkaSource is a custom type
21 Apr 2015 19:24:03,152 INFO  [conf-file-poller-0] (org.apache.flume.source.kafka.KafkaSourceUtil.getKafkaProperties:37)  - context={ parameters:{topic=bigdata_api_ele_me_access, batchDurationMillis=5000, groupId=nginx, zookeeperConnect=xxx, channels=bigdata_api_ele_me_access-channel4, batchSize=2000, type=org.apache.flume.source.kafka.KafkaSource} }

sink实例化:

1
2
3
4
5
6
21 Apr 2015 19:24:03,185 INFO  [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42)  - Creating instance of sink: web-sink2, type: org.apache.flume.sink.kafka.KafkaSink
21 Apr 2015 19:24:03,185 DEBUG [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.getClass:63)  - Sink type org.apache.flume.sink.kafka.KafkaSink is a custom type
21 Apr 2015 19:24:03,190 DEBUG [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:220)  - Using batch size: 2000
21 Apr 2015 19:24:03,190 INFO  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:229)  - Using the static topic: nginx-access this may be over-ridden by event headers
21 Apr 2015 19:24:03,191 INFO  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34)  - context={ parameters:{topic=nginx-access, brokerList=1xxx, requiredAcks=1, batchSize=2000, type=org.apache.flume.sink.kafka.KafkaSink, channel=bigdata_api_ele_me_access-channel4} }
21 Apr 2015 19:24:03,191 DEBUG [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:236)  - Kafka producer properties: {metadata.broker.list=192.168.101.43:9092,192.168.101.44:9092,192.168.101.45:9092, request.required.acks=1, key.serializer.class=kafka.serializer.StringEncoder, serializer.class=kafka.serializer.DefaultEncoder}

可以看到创建sink和source实例的时候配置上下文context中topic是按设置的来的,但是看到日志中有下面一段:

1
Using the static topic: nginx-access this may be over-ridden by event headers

分析KafkaSink源码:

org.apache.flume.sink.kafka.KafkaSink.process方法中:

1
2
3
4
5
6
7
8
9
10
11
12
   public  static  final  String KEY_HDR =  "key" ;
   public  static  final  String TOPIC_HDR =  "topic" ;
   ...
         if  ((eventTopic = headers.get(TOPIC_HDR)) ==  null ) {
           eventTopic = topic;
         //eventTopic的取值,会从header中获取,如果header中没有才会使用配置的topic
         ...
         eventKey = headers.get(KEY_HDR);
         ...
         KeyedMessage<String,  byte []> data =  new  KeyedMessage<String,  byte []>
           (eventTopic, eventKey, eventBody);
         messageList.add(data);

其中topic的取值在configure中:

1
2
3
4
5
6
7
8
9
10
     topic = context.getString(KafkaSinkConstants.TOPIC,
       KafkaSinkConstants.DEFAULT_TOPIC);  //通过flume的配置获取topic,如果没有设置topic按默认default-flume-topic处理
     if  (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {
       logger.warn( "The Property 'topic' is not set. "  +
         "Using the default topic name: "  +
         KafkaSinkConstants.DEFAULT_TOPIC);
     else  {
       logger.info( "Using the static topic: "  + topic +
         " this may be over-ridden by event headers" );  //这里提示可能会被header覆盖
     }

header的来源:

1)kafka中的数据是没有header的概念的

2)flume中的消息分header/body概念

这种结构下,数据由kafkasource进入flume,添加了header信息,然后流入到kafkasink

kafkasource中header的添加处理在org.apache.flume.source.kafka.KafkaSource.process方法中:

1
2
3
4
5
6
7
8
9
10
         if  (iterStatus) {
           // get next message
           MessageAndMetadata< byte [],  byte []> messageAndMetadata = it.next();
           kafkaMessage = messageAndMetadata.message();
           kafkaKey = messageAndMetadata.key();
           // Add headers to event (topic, timestamp, and key)
           headers =  new  HashMap<String, String>();
           headers.put(KafkaSourceConstants.TIMESTAMP,
                   String.valueOf(System.currentTimeMillis()));
           headers.put(KafkaSourceConstants.TOPIC, topic);

因为kafka中不需要header,注释掉org.apache.flume.sink.kafka.KafkaSink.process中这几段代码即可:

1
2
3
4
5
6
         /*
         if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
           eventTopic = topic;
         }
         */
         eventTopic = topic;  //增加这一段,否则会有npe错误


本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1638342,如需转载请自行联系原作者
相关文章
|
5天前
|
消息中间件 存储 监控
Flume+Kafka整合案例实现
Flume+Kafka整合案例实现
41 1
|
5天前
|
消息中间件 Kafka 流计算
Flink的分区表订阅功能是通过Kafka的topic分区来实现的
Flink的分区表订阅功能是通过Kafka的topic分区来实现的【1月更文挑战第6天】【1月更文挑战第26篇】
100 1
|
5天前
|
消息中间件 大数据 Kafka
记录一下Kafka报错:timeout expired while fetching topic metadata
记录一下Kafka报错:timeout expired while fetching topic metadata
189 0
|
5天前
|
消息中间件 存储 Java
Kafka的Topic CRUD演示
Kafka的Topic CRUD演示
26 0
|
5天前
|
消息中间件 Kafka API
kafka topic 管理api
kafka topic 管理api
39 0
|
5天前
|
数据可视化 JavaScript 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
51 0
|
5天前
|
SQL 消息中间件 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
86 0
|
5天前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
69 0
|
5天前
|
消息中间件 存储 数据采集
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
59 0
|
5天前
|
存储 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(一)案例需求
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(一)案例需求
60 0

热门文章

最新文章

http://www.vxiaotou.com