Is there a limit on the number of side outputs in Google Cloud Dataflow?

404 Views Asked by At

We have a Cloud Dataflow Job that takes in a BigQuery table, transforms it and then writes each record out to a different table depending on the month/year in the timestamp for that record. So when we run our job over a table with 12 months of data there should be 12 output tables. The first month will be the main output and the other 11 months will be the side outputs.

We have found that a job will fail when we run it over 10 or more months(9 side outputs).

Is this a limit on Cloud Dataflow or is it a bug?

I noticed in the execution graph when it was running with more than 8 side outputs that some of the outputs said "running" but they didn't seem to be writing any records.

Here are some of our job ids:

2015-06-14_23_58_06-14457541029573485807 (8 side outputs - passed)

2015-06-14_23_48_43-15277609445992188388 (9 side outputs - failed)

2015-06-14_23_11_46-10500077558949649888 (7 side outputs - passed)

2015-06-14_22_38_48-1428211312699949403 (3 side outputs - passed)

2015-06-14_21_44_27-16273252623089185131 (11 side outputs - failed)

This is the code that processes the data. There is no caching involved. (TressOutputManager only holds a cache of TupleTag<TableRow>)

public class TressDenormalizationDoFn extends DoFn<TableRow, TableRow> {
    @Inject
    @Named("tress.mappers")
    private Set<CPTMapper> mappers;
    @Inject
    private TressOutputManager tuples;

    @Override
    public void processElement(ProcessContext c) throws Exception {
        TableRow row = c.element().clone();
        for (CPTMapper mapper : mappers) {
            String mapped = mapper.map((String) row.get("event"));
            if (mapped != null) {
                row.set(mapper.getId(), mapped);
            }
        }
        // places the record in the correct month based on the time stamp
        String timeStamp = (String) row.get("time_local");
        if(timeStamp != null){
            timeStamp = timeStamp.substring(0, 7).replaceAll("-", "_");

            if (tuples.isMainOutput(timeStamp)) {
                c.output(row);
            } else {
                c.sideOutput(tuples.getTuple(timeStamp), row);
            }
        }
    }
}
0

There are 0 best solutions below