How to write Avro Objects to Parquet with partitions in Java ? How to append data to the same parquet?

1.3k Views Asked by At

I am using Confluent's KafkaAvroDerserializer to deserialize Avro Objects sent over Kafka. I want to write the recieved data to a Parquet file. I want to be able to append data to the same parquet and to create a Parquet with Partitions.

I managed to create a Parquet with AvroParquetWriter - but I didn't find how to add partitions or append to the same file:

Before using Avro I used spark to write the Parquet - With spark writing a parquet with partitions and using append mode was trivial - should I try creating Rdds from my Avro objects and use spark to create the parquet ?

1

There are 1 best solutions below

0
On

I want to write the Parquets to HDFS

Personally, I would not use Spark for this.

Rather I would use the HDFS Kafka Connector. Here is a config file that can get you started.

name=hdfs-sink
# List of topics to read
topics=test_hdfs

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
# increase to be the sum of the partitions for all connected topics
tasks.max=1 

# the folder where core-site.xml and hdfs-site.xml exist
hadoop.conf.dir=/etc/hadoop
# the namenode url, defined as fs.defaultFS in the core-site.xml
hdfs.url=hdfs://hdfs-namenode.example.com:9000

# number of messages per file
flush.size=10 
# The format to write the message values
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

# Setup Avro parser
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry.example.com:8081
value.converter.schemas.enable=true
schema.compatibility=BACKWARD

If you want HDFS Partitions based on a field rather than the literal "Kafka Partition" number, then refer to the configuration docs on the FieldPartitioner. If you want automatic Hive integration, see the docs on that as well.


Let's say you did want to use Spark, though, you can try AbsaOSS/ABRiS to read in an Avro DataFrame, then you should be able to do something like df.write.format("parquet").path("/some/path") (not exact code, because I have not tried it)