Using java spark encoder bean to create a typed subset of a Dataset<Row>

307 Views Asked by At

I read a parquet file and I get a Dataset containing 57 columns.

Dataset<Row> ds = spark.read().parquet(locations);

I would like to use a custom type instead of Row. I have defined a java bean such as

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

@Getter
@Setter
public class MyBean implements Serializable {
    MyBean (){ }
    private String col1;
    private String col2;
    private String col3;
 }

The field names are perfectly matching the column names in the parquet file (which is annoying because they are in snake case and I would like camelCase in my POJO but that's not the main issue here).

To convert Dataset<Row> to Dataset<MyBean> I use ds.as(Encoders.bean(MyBean.class)

However if I try to show the Dataset<MyBean> produced it has all 57 columns. I was expecting 3 columns corresponding to MyBean, hoping it would only read the pieces of the parquet that are of interest to me.

At first I thought as was a transformation and not an action but show is definitely an action, and I also tried .cache().count() beforehand just in case.

What am I missing about Encoders here ?

2

There are 2 best solutions below

3
Chris On BEST ANSWER

You didn't misunderstand or miss anything about Encoders except perhaps that they, like Datasets, are also lazily involved. If you manually create the setters and put breakpoints on them they'll not be called with a simple as().show.

In fact you can just use read.as(beanEncoder).map(identity, beanEncoder).show and see that it's only using those columns.

Similar to Oli's point if you do .explain on the result of just "as" (with no select to reduce fields), you'll see all the fields mentioned in the LocalRelation or Scan results. Add the map and explain and you'll see the fields in the bean.

Using a dataset, even after using .as, will let you select fields that aren't in your mean as well, it's only really becoming just the bean after something which forces the read structure to change.

1
Oli On

If you use select after spark.read.parquet, only the selected columns will be read, not the others.

Therefore, athough it is a bit more verbose,

ds.select("col1", "col2", "col3").as(Encoders.bean(MyBean.class)

should achieve what you want.

Feel free to try ds.select("col1", "col2", "col3").explain() and check that the ReadSchema only contains the selected columns. Note that the same goes for filters. If you filter just after reading a parquet file, the filter will be "pushed" and only the selected rows will be read.