read a parquet file using Java, but it works in local machine, and doesn't work in docker container

4.6k Views Asked by At

I have a requirement to read parquet files and publish to Kafka in a Java standalone application. I have the below code to read the parquet file which is generated by spark scala application.

public void readTest(Path path) {
        try {
            ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);
            List<BlockMetaData> blocks = readFooter.getBlocks();
            System.out.println("Blocks size: "+blocks.size());
            Map<String, String> keyValueMetaData = readFooter.getFileMetaData().getKeyValueMetaData();
            System.out.println("K-v metadata: "+keyValueMetaData);
            System.out.println("Created by: "+readFooter.getFileMetaData().getCreatedBy());
            
            MessageType schema = readFooter.getFileMetaData().getSchema();
            System.out.println("Schema: "+schema);
            ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);

            PageReadStore pages = null;
            try {
                while (null != (pages = r.readNextRowGroup())) {
                    final long rows = pages.getRowCount();
                    System.out.println("Number of rows: " + rows);

                    final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
                    
                    final RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
                    for (int i = 0; i < rows; i++) {
                        final Group g = (Group) recordReader.read();
                        System.out.println("group: "+g);
                        printGroup(g);
                    }
                }
            } finally {
                r.close();
            }
        } catch (Exception e) {
            System.out.println("Error reading parquet file.");
            e.printStackTrace();
        }
    }

The java app is a spring boot non-web application. This code works fine when ran in local or intelliJ IDE, but i'm getting the below error when the same file and code is dockerized.

............Processing File name............: part-00000-2b69fe41-592a-485b-85e8-8971c5842155-c000.snappy.parquet
07:45:50.770 [main] WARN  o.a.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-08-28 07:45:50 DEBUG Tracer:106 - sampler.classes = ; loaded no samplers
2021-08-28 07:45:51 DEBUG Tracer:128 - span.receiver.classes = ; loaded no span receivers
Blocks size: 1
K-v metadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"first_name","type":"string","nullable":true,"metadata":{}},{"name":"last_name","type":"string","nullable":true,"metadata":{}},{"name":"email","type":"string","nullable":true,"metadata":{}},{"name":"gender","type":"string","nullable":true,"metadata":{}}]}}
Created by: parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)
Schema: message spark_schema {
  optional binary first_name (STRING);
  optional binary last_name (STRING);
  optional binary email (STRING);
  optional binary gender (STRING);
}

07:45:52.073 [main] INFO  o.a.hadoop.io.compress.CodecPool - Got brand-new decompressor [.snappy]
Number of rows: 1000
Error reading parquet file.
java.lang.IllegalArgumentException
    at java.nio.Buffer.limit(Buffer.java:275)
    at org.xerial.snappy.Snappy.uncompress(Snappy.java:553)
    at org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:71)
    at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
    at java.io.DataInputStream.readFully(DataInputStream.java:195)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
    at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
    at org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:91)
    at org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:74)
    at org.apache.parquet.column.Encoding$1.initDictionary(Encoding.java:90)
    at org.apache.parquet.column.Encoding$5.initDictionary(Encoding.java:163)
    at org.apache.parquet.column.impl.ColumnReaderBase.<init>(ColumnReaderBase.java:413)
    at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:46)
    at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:82)
    at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
    at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:177)
    at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
    at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:80)
    at com.csp.cdp.CPRemediationIntegrationPub.readers.NFSStorageReader.readTest(NFSStorageReader.java:131)
    at com.csp.cdp.CPRemediationIntegrationPub.readers.NFSStorageReader.read(NFSStorageReader.java:59)
    at com.csp.cdp.CPRemediationIntegrationPub.readers.NFSStorageReader.read(NFSStorageReader.java:38)
    at com.csp.cdp.CPRemediationIntegrationPub.executors.AppExecutor.execute(AppExecutor.java:29)
    at com.csp.cdp.CPRemediationIntegrationPub.CpRemediationIntegrationPubApplication.main(CpRemediationIntegrationPubApplication.java:47)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) 

my Dockerfile for reference.

FROM openjdk:8-jdk-alpine
RUN mkdir /app
RUN mkdir /app/files/
COPY testDir /app/files/
COPY normal /app/files/
WORKDIR /app
COPY target/appname-0.0.1-SNAPSHOT.jar /app/app.jar
ENTRYPOINT ["java","-jar","app.jar"]

Please help on this.

3

There are 3 best solutions below

2
On

I changed the base image to

FROM openjdk:8

and it worked like a charm. Not sure what was wrong with the earlier base image. I think the parquet was not able to support the special unicode characters.

0
On

There are 2 issues here

  1. snappy-java didnt find a native library, end fallback to pure java implementation
  2. java implementation is broken, this is fixed but not released yet

Solution: install a native library and use it

I wrote a small post about it: https://thomasdecaux.medium.com/use-snappy-compression-native-when-running-spark-on-alpine-5fee0ce28ed7:

  1. install apk add java-snappy-native
  2. use it spark.executor.extraJavaOptions: -Dorg.xerial.snappy.use.systemlib=true -Dorg.xerial.snappy.lib.path=/usr/lib/libsnappyjava.so
0
On

For those who are using jdk 17, please use the base image as

FROM openjdk:17

The image amazoncorretto:17.0.8-alpine1.x.y will not work as native library is not present on alpine based image.