I am attempting to use the MongoDB Connector for Hadoop with Spark to query one collection in MongoDB and upsert all of the documents retrieved into another collection. The MongoUpdateWritable class is used for the value of the RDD to update a collection in MongoDB, and it has an upsert flag. Unfortunately, the upsert flag seems to have no effect on the execution. The code is executing with no errors as if the upsert flag were set to false.
This (Scala) code connects to a localhost mongod process, writes some data using the mongo client, then attempts to read that data and write it to another collection in the same database using spark. After that write doesn't go through, the code writes a document to the target table through the mongo client that has the same ID and runs the same spark job to show that the update part of upsert works correctly.
spark version: 1.6.0-cdh5.7.0
hadoop version: 2.6.0-cdh5.4.7
mongo version: 3.2.0
mongo-hadoop-core version: 2.0.2
import com.mongodb.client.{FindIterable, MongoCollection, MongoDatabase}
import com.mongodb.{BasicDBObject, DBCollection, MongoClient}
import com.mongodb.hadoop.io.MongoUpdateWritable
import org.apache.hadoop.conf.Configuration
import org.bson.{BSONObject, BasicBSONObject, Document}
import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object sparkTest extends App {
//setting up mongo
val mongo: MongoDatabase = new MongoClient("localhost",27017).getDatabase("test")
var source: MongoCollection[Document] = mongo.getCollection("source")
val target: MongoCollection[Document] = mongo.getCollection("target")
source.drop()
target.drop()
//inserting document
val sourceDoc = new Document()
sourceDoc.put("unchanged","this field should not be changed")
sourceDoc.put("_id","1")
source.insertOne(sourceDoc)
//setting up spark
val conf = new SparkConf().setAppName("test mongo with spark").setMaster("local")
val mongoConfig = new Configuration()
val sc = new SparkContext(conf)
mongoConfig.set("mongo.input.uri",
"mongodb://localhost:27017/test.source")
mongoConfig.set("mongo.output.uri",
"mongodb://localhost:27017/test.target")
//setting up read
val documents = sc.newAPIHadoopRDD(
mongoConfig, // Configuration
classOf[MongoInputFormat], // InputFormat
classOf[Object], // Key type
classOf[BSONObject]) // Value type
//building updates with no document matching the query in the target collection
val upsert_insert_rdd: RDD[(Object, MongoUpdateWritable)] = documents.mapValues(
(value: BSONObject) => {
val query = new BasicBSONObject
query.append("_id", value.get("_id").toString)
val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject])
update.append("added","this data will be added")
println("val:"+value.toString)
println("query:"+query.toString)
println("update:"+update.toString)
new MongoUpdateWritable(
query, // Query
update, // Update
true, // Upsert flag
false, // Update multiple documents flag
true // Replace flag
)}
)
//saving updates
upsert_insert_rdd.saveAsNewAPIHadoopFile(
"",
classOf[Object],
classOf[MongoUpdateWritable],
classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
mongoConfig)
// At this point, there should be a new document in the target database, but there is not.
val count = target.count()
println("count after insert: "+count+", expected: 1")
//adding doc to display working update. This code will throw an exception if there is a
//document with a matching _id field in the collection, so if this breaks that means the upsert worked!
val targetDoc = new Document()
targetDoc.put("overwritten","this field should not be changed")
targetDoc.put("_id","1")
target.insertOne(targetDoc)
//building updates when a document matching the query exists in the target collection
val upsert_update_rdd: RDD[(Object, MongoUpdateWritable)] = documents.mapValues(
(value: BSONObject) => {
val query = new BasicBSONObject
query.append("_id", value.get("_id").toString)
val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject])
update.append("added","this data will be added")
println("val:"+value.toString)
println("query:"+query.toString)
println("update:"+update.toString)
new MongoUpdateWritable(
query, // Query
update, // Update
true, // Upsert flag
false, // Update multiple documents flag
true // Replace flag
)}
)
//saving updates
upsert_update_rdd.saveAsNewAPIHadoopFile(
"",
classOf[Object],
classOf[MongoUpdateWritable],
classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
mongoConfig)
//checking that the update succeeded. should print:
//contains new field:true, contains overwritten field:false
val ret = target.find().first
if (ret != null)
println("contains new field:"+ret.containsKey("added")+", contains overwritten field:"+ret.containsKey("overwritten"))
else
println("no documents found in target")
}
Any insight to what I am missing would be helpful. I have tried changing the output formats to MongoUpdateWritable but that had no effect on the behavior. I know this is probably a configuration problem but it seems like a bug with the mongo hadoop adapter, since writing documents using their input and output formats and the MongoUpdateWritable class do successfully read and write documents.
POM for convenience:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test</groupId>
<artifactId>spark_mongo_upsert_test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>1.6.0-cdh5.7.0</spark.version>
<mongo.version>3.2.0</mongo.version>
<mongo.hadoop.version>2.0.2</mongo.hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb.mongo-hadoop</groupId>
<artifactId>mongo-hadoop-core</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.mongodb.</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>${mongo.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Plugin to compile Scala code -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.1</version>
</plugin>
</plugins>
</build>
</project>
Saving Datasets to MongoDB that contain an _id field will replace and upsert any existing documents.