there has some protobuf data in my kafka,the data is byte array,i want to use flume collect the kafka data to hdfs,and use spark analysis the hdfs data,after i use the flume config below
the flume configuration is
agent.sources = source_from_kafka
agent.channels = mem_channel
agent.sinks = hdfs_sink
agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.source_from_kafka.batchSize = 100
agent.sources.source_from_kafka.kafka.bootstrap.servers = localhost:9092
agent.sources.source_from_kafka.kafka.topics = mytopic
agent.sources.source_from_kafka.kafka.serializer = io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
agent.sources.source_from_kafka.kafka.consumer.group.id = mytopic_consumer
agent.sinks.hdfs_sink.type = hdfs
agent.sinks.hdfs_sink.hdfs.path = hdfs://XXXX/%{topic}/%Y%m%d/%Y%m%d%H
agent.sinks.hdfs_sink.hdfs.rollSize = 130000000
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.batchSize = 100
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.callTimeout = 60000
agent.sinks.hdfs_sink.hdfs.idleTimeout=300
agent.sinks.hdfs_sink.hdfs.fileType = DataStream
agent.sinks.hdfs_sink.hdfs.writeFormat = Text
agent.sinks.hdfs_sink.hdfs.filePrefix = events
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 100
agent.channels.mem_channel.transactionCapacity = 100
agent.channels.mem_channel.byteCapacity = 800000
agent.channels.mem_channel.byteCapacityBufferPercentage = 20
agent.channels.mem_channel.keep-alive = 60
agent.sources.source_from_kafka.channels = mem_channel
agent.sinks.hdfs_sink.channel = mem_channel
I can see the data in the hdfs like enter image description here But when i use sc.newAPIHadoopFileLongWritable, Text, TextInputFormat.map(_._2.copyBytes) to read the data and parse it to Protobuf ,the error is msg: {���ֈ1� *��/2ext{} com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)
can anyone know why,and how to resolve this
I want it works well