mongo-hadoop package upsert with spark doesn't seem to be working

476 Views Asked by At

I am attempting to use the MongoDB Connector for Hadoop with Spark to query one collection in MongoDB and upsert all of the documents retrieved into another collection. The MongoUpdateWritable class is used for the value of the RDD to update a collection in MongoDB, and it has an upsert flag. Unfortunately, the upsert flag seems to have no effect on the execution. The code is executing with no errors as if the upsert flag were set to false.

This (Scala) code connects to a localhost mongod process, writes some data using the mongo client, then attempts to read that data and write it to another collection in the same database using spark. After that write doesn't go through, the code writes a document to the target table through the mongo client that has the same ID and runs the same spark job to show that the update part of upsert works correctly.

spark version: 1.6.0-cdh5.7.0

hadoop version: 2.6.0-cdh5.4.7

mongo version: 3.2.0

mongo-hadoop-core version: 2.0.2

import com.mongodb.client.{FindIterable, MongoCollection, MongoDatabase}
import com.mongodb.{BasicDBObject, DBCollection, MongoClient}
import com.mongodb.hadoop.io.MongoUpdateWritable
import org.apache.hadoop.conf.Configuration
import org.bson.{BSONObject, BasicBSONObject, Document}
import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object sparkTest extends App {

  //setting up mongo
  val mongo: MongoDatabase = new MongoClient("localhost",27017).getDatabase("test")
  var source: MongoCollection[Document] = mongo.getCollection("source")
  val target: MongoCollection[Document] = mongo.getCollection("target")
  source.drop()
  target.drop()
  //inserting document
  val sourceDoc = new Document()
  sourceDoc.put("unchanged","this field should not be changed")
  sourceDoc.put("_id","1")
  source.insertOne(sourceDoc)

  //setting up spark
  val conf = new SparkConf().setAppName("test mongo with spark").setMaster("local")
  val mongoConfig = new Configuration()
  val sc = new SparkContext(conf)
  mongoConfig.set("mongo.input.uri",
    "mongodb://localhost:27017/test.source")
  mongoConfig.set("mongo.output.uri",
    "mongodb://localhost:27017/test.target")

  //setting up read
  val documents = sc.newAPIHadoopRDD(
    mongoConfig,                // Configuration
    classOf[MongoInputFormat],  // InputFormat
    classOf[Object],            // Key type
    classOf[BSONObject])        // Value type

  //building updates with no document matching the query in the target collection
  val upsert_insert_rdd: RDD[(Object, MongoUpdateWritable)] =     documents.mapValues(
    (value: BSONObject) => {

  val query = new BasicBSONObject
  query.append("_id", value.get("_id").toString)

  val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject])
  update.append("added","this data will be added")

  println("val:"+value.toString)
  println("query:"+query.toString)
  println("update:"+update.toString)

  new MongoUpdateWritable(
  query,  // Query
  update,  // Update
  true,  // Upsert flag
  false,   // Update multiple documents flag
  true  // Replace flag
    )}
  )
  //saving updates
  upsert_insert_rdd.saveAsNewAPIHadoopFile(
    "",
    classOf[Object],
    classOf[MongoUpdateWritable],
    classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
    mongoConfig)

  // At this point, there should be a new document in the target database, but there is not.
  val count = target.count()
  println("count after insert: "+count+", expected: 1")

  //adding doc to display working update. This code will throw an exception     if there is a
  //document with a matching _id field in the collection, so if this breaks that means the upsert worked!
  val targetDoc = new Document()
  targetDoc.put("overwritten","this field should not be changed")
  targetDoc.put("_id","1")
  target.insertOne(targetDoc)

  //building updates when a document matching the query exists in the target collection
  val upsert_update_rdd: RDD[(Object, MongoUpdateWritable)] = documents.mapValues(
    (value: BSONObject) => {

      val query = new BasicBSONObject
      query.append("_id", value.get("_id").toString)

      val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject])
      update.append("added","this data will be added")

      println("val:"+value.toString)
      println("query:"+query.toString)
      println("update:"+update.toString)

      new MongoUpdateWritable(
        query,  // Query
        update,  // Update
        true,  // Upsert flag
        false,   // Update multiple documents flag
        true  // Replace flag
      )}
  )
  //saving updates
  upsert_update_rdd.saveAsNewAPIHadoopFile(
    "",
    classOf[Object],
    classOf[MongoUpdateWritable],
    classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
    mongoConfig)

  //checking that the update succeeded. should print:
  //contains new field:true, contains overwritten field:false
  val ret = target.find().first
  if (ret != null)
    println("contains new field:"+ret.containsKey("added")+", contains overwritten field:"+ret.containsKey("overwritten"))
  else
    println("no documents found in target")
}

Any insight to what I am missing would be helpful. I have tried changing the output formats to MongoUpdateWritable but that had no effect on the behavior. I know this is probably a configuration problem but it seems like a bug with the mongo hadoop adapter, since writing documents using their input and output formats and the MongoUpdateWritable class do successfully read and write documents.

POM for convenience:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test</groupId>
    <artifactId>spark_mongo_upsert_test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>1.6.0-cdh5.7.0</spark.version>
        <mongo.version>3.2.0</mongo.version>
        <mongo.hadoop.version>2.0.2</mongo.hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.mongodb.mongo-hadoop</groupId>
            <artifactId>mongo-hadoop-core</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb.</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>${mongo.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- Plugin to compile Scala code -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.1</version>
            </plugin>
        </plugins>
    </build>

</project>
1

There are 1 best solutions below

0
On

Saving Datasets to MongoDB that contain an _id field will replace and upsert any existing documents.