flume how to collect kafka protobuf data

56 Views Asked by At

there has some protobuf data in my kafka,the data is byte array,i want to use flume collect the kafka data to hdfs,and use spark analysis the hdfs data,after i use the flume config below

the flume configuration is

agent.sources = source_from_kafka
agent.channels = mem_channel
agent.sinks = hdfs_sink


agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.source_from_kafka.batchSize = 100
agent.sources.source_from_kafka.kafka.bootstrap.servers = localhost:9092
agent.sources.source_from_kafka.kafka.topics = mytopic


agent.sources.source_from_kafka.kafka.serializer = io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer

agent.sources.source_from_kafka.kafka.consumer.group.id = mytopic_consumer


agent.sinks.hdfs_sink.type = hdfs
agent.sinks.hdfs_sink.hdfs.path = hdfs://XXXX/%{topic}/%Y%m%d/%Y%m%d%H

agent.sinks.hdfs_sink.hdfs.rollSize = 130000000
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.batchSize = 100
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.callTimeout = 60000
agent.sinks.hdfs_sink.hdfs.idleTimeout=300
agent.sinks.hdfs_sink.hdfs.fileType = DataStream
agent.sinks.hdfs_sink.hdfs.writeFormat = Text
agent.sinks.hdfs_sink.hdfs.filePrefix = events

agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 100
agent.channels.mem_channel.transactionCapacity = 100
agent.channels.mem_channel.byteCapacity = 800000
agent.channels.mem_channel.byteCapacityBufferPercentage = 20
agent.channels.mem_channel.keep-alive = 60

agent.sources.source_from_kafka.channels = mem_channel
agent.sinks.hdfs_sink.channel = mem_channel

I can see the data in the hdfs like enter image description here But when i use sc.newAPIHadoopFileLongWritable, Text, TextInputFormat.map(_._2.copyBytes) to read the data and parse it to Protobuf ,the error is msg: {���ֈ1� *��/2ext{} com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)

can anyone know why,and how to resolve this

I want it works well

0

There are 0 best solutions below