Map Only MapReduce Job with BigQuery

927 Views Asked by At

We have a Mapreduce job created to inject data into BigQuery. There is not much of filtering function in our job so we'd like to make it map-only job to make it faster and more efficient.

However, the java class "com.google.gson.JsonObject" accepted by BigQuery doesn't implement the Writable interface needed for hadoop Mapper interface. JsonObject is also final and we can not extend it ...

Any suggestions on how we get around the issue?

Thanks,

2

There are 2 best solutions below

0
On

You should be able to use the BigQuery connector for Hadoop (see https://cloud.google.com/hadoop/bigquery-connector) which provides an implementation of the Hadoop OutputFormat class.

0
On

To add-on to William's response: I wanted to test this myself and I created a new cluster with the bigquery connector installed and then ran the following map-only job:

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryOutputFormat;
import com.google.common.base.Splitter;
import com.google.gson.JsonObject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.regex.Pattern;

/**
 * An example MapOnlyJob with BigQuery output
 */
public class MapOnlyJob {
  public static class MapOnlyMapper extends Mapper<LongWritable, Text, LongWritable, JsonObject> {
    private static final LongWritable KEY_OUT = new LongWritable(0L);
    // This requires a new version of guava be included in a shaded / repackaged libjar.
    private static final Splitter SPLITTER =
        Splitter.on(Pattern.compile("\\s+"))
            .trimResults()
            .omitEmptyStrings();
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String line = value.toString();
      for (String word : SPLITTER.split(line)) {
        JsonObject json = new JsonObject();
        json.addProperty("word", word);
        json.addProperty("mapKey", key.get());
        context.write(KEY_OUT, json);
      }
    }
  }

  /**
   * Configures and runs the main Hadoop job.
   */
  public static void main(String[] args)
      throws IOException, InterruptedException, ClassNotFoundException {

    GenericOptionsParser parser = new GenericOptionsParser(args);
    args = parser.getRemainingArgs();

    if (args.length != 3) {
      System.out.println("Usage: hadoop MapOnlyJob "
          + "[projectId] [input_file] [fullyQualifiedOutputTableId]");
      String indent = "    ";
      System.out.println(indent
          + "projectId - Project under which to issue the BigQuery operations. "
          + "Also serves as the default project for table IDs which don't explicitly specify a "
          + "project for the table.");
      System.out.println(indent
          + "input_file - Input file pattern of the form "
          + "gs://foo/bar*.txt or hdfs:///foo/bar*.txt or foo*.txt");
      System.out.println(indent
          + "fullyQualifiedOutputTableId - Output table ID of the form "
          + "<optional projectId>:<datasetId>.<tableId>");
      System.exit(1);
    }

    // Global parameters from args.
    String projectId = args[0];

    // Set InputFormat parameters from args.
    String inputPattern = args[1];

    // Set OutputFormat parameters from args.
    String fullyQualifiedOutputTableId = args[2];

    // Default OutputFormat parameters for this sample.
    String outputTableSchema =
        "[{'name': 'word','type': 'STRING'},{'name': 'mapKey','type': 'INTEGER'}]";

    Configuration conf = parser.getConfiguration();
    Job job = Job.getInstance(conf);
    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
    // Set classes and configure them:
    job.setOutputFormatClass(BigQueryOutputFormat.class);
    BigQueryConfiguration.configureBigQueryOutput(
        job.getConfiguration() /* Required as Job made a new Configuration object */,
        fullyQualifiedOutputTableId,
        outputTableSchema);
    // Configure file-based input:
    FileInputFormat.setInputPaths(job, inputPattern);

    job.setJarByClass(MapOnlyMapper.class);
    job.setMapperClass(MapOnlyMapper.class);
    // The key will be discarded by BigQueryOutputFormat.
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(JsonObject.class);
    // Make map-only
    job.setNumReduceTasks(0);

    job.waitForCompletion(true);
  }
}

And I have the following dependencies:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>com.google.cloud.bigdataoss</groupId>
  <artifactId>bigquery-connector</artifactId>
  <version>0.7.0-hadoop1</version>
</dependency>
<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>18.0</version>
</dependency>