I am using hadoop map-reduce for processing XML file. I am directly storing the JSON data into mongodb.
How can I achieve that only non-duplicate records will be stored into database before executing BulkWriteOperation
?
The duplicate records criteria will be based on product image and product name, I do not want to use a layer of morphia where we can assign indexes to the class members.
Here is my reducer class:
public class XMLReducer extends Reducer<Text, MapWritable, Text, NullWritable>{
private static final Logger LOGGER = Logger.getLogger(XMLReducer.class);
protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) throws IOException, InterruptedException{
LOGGER.info("reduce()------Start for key>"+key);
Map<String,String> insertProductInfo = new HashMap<String,String>();
try{
MongoClient mongoClient = new MongoClient("localhost", 27017);
DB db = mongoClient.getDB("test");
BulkWriteOperation operation = db.getCollection("product").initializeOrderedBulkOperation();
for (MapWritable entry : values) {
for (Entry<Writable, Writable> extractProductInfo : entry.entrySet()) {
insertProductInfo.put(extractProductInfo.getKey().toString(), extractProductInfo.getValue().toString());
}
if(!insertProductInfo.isEmpty()){
BasicDBObject basicDBObject = new BasicDBObject(insertProductInfo);
operation.insert(basicDBObject);
}
}
//How can I check for duplicates before executing bulk operation
operation.execute();
LOGGER.info("reduce------end for key"+key);
}catch(Exception e){
LOGGER.error("General Exception in XMLReducer",e);
}
}
}
EDIT: After the suggested answer I have added :
BasicDBObject query = new BasicDBObject("product_image", basicDBObject.get("product_image"))
.append("product_name", basicDBObject.get("product_name"));
operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicDBObject));
operation.insert(basicDBObject);
I am getting error like: com.mongodb.MongoInternalException: no mapping found for index 0
Any help will be useful.Thanks.
I suppose it all depends on what you really want to do with the "duplicates" here as to how you handle it.
For one you can always use
.initializeUnOrderedBulkOperation()
which won't "error" on a duplicate key from your index ( which you need to stop duplicates ) but will report any such errors in the returnedBulkWriteResult
object. Which is returned from.execute()
On the other hand, you can just use "upserts" instead and use operators such as
$setOnInsert
to only make changes where no duplicate existed:So you basically look up the value of the field that holds the "key" to determine a duplicate with a query, then only actually change any data where that "key" was not found and therefore a new document and "inserted".
In either case the default behaviour here will be to "insert" the first unique "key" value and then ignore all other occurances. If you want to do other things like "overwrite" or "increment" values where the same key is found then the
.update()
"upsert" approach is the one you want, but you will use other update operators for those actions.