Read specific file from multiple .gz file in Spark

364 Views Asked by At

I am trying to read a file with a specific name which exists in multiple .gz files within a folder.
For example
D:/sample_datasets/gzfiles |-my_file_1.tar.gz |-my_file_1.tar |-file1.csv |-file2.csv |-file3.csv |-my_file_2.tar.gz |-my_file_2.tar |-file1.csv |-file2.csv |-file3.csv

I am only interested in reading contents of file1.csv which has the same schema across all the .gz files.

I am passing the path D:/sample_datasets/gzfiles to the wholeTextFiles() method in JavaSparkContext. However, it returns the contents of all the files in within the tar viz. file1.csv, file2.csv, file3.csv.

Is there a way I can only read the contents of file1.csv in Dataset or an RDD. Thanks in advance!

2

There are 2 best solutions below

0
On BEST ANSWER

I was able to perform the process using the following snippet I used from multiple answers on SO


JavaPairRDD tarData = sparkContext.binaryFiles("D:/sample_datasets/gzfiles/*.tar.gz");
JavaRDD tarRecords = tarData.flatMap(new FlatMapFunction, Row>(){
    private static final long serialVersionUID = 1L;

    @Override
    public Iterator call(Tuple2 t) throws Exception {
        TsvParserSettings settings = new TsvParserSettings();
        TsvParser parser = new TsvParser(settings);

        List records = new ArrayList();
        TarArchiveInputStream tarInput = new TarArchiveInputStream(new GzipCompressorInputStream(t._2.open()));
        TarArchiveEntry entry;
        while((entry = tarInput.getNextTarEntry()) != null) {
            if(entry.getName().equals("file1.csv")) {
                InputStreamReader streamReader = new InputStreamReader(tarInput);
                BufferedReader reader = new BufferedReader(streamReader);
                String line;

                while((line = reader.readLine())!= null) {
                    String [] parsedLine = parser.parseLine(line);
                    Row row = RowFactory.create(parsedLine);

                    records.add(row);
                }

                reader.close();
                break;
            }
        }
        tarInput.close();
        return records.iterator();
    }

});
1
On

use *.gz at the end of the path.

Hope this helps!