Check for duplicate records while BulkWriteOperation into mongo using hadoop reducer

947 Views Asked by At

I am using hadoop map-reduce for processing XML file. I am directly storing the JSON data into mongodb.
How can I achieve that only non-duplicate records will be stored into database before executing BulkWriteOperation?

The duplicate records criteria will be based on product image and product name, I do not want to use a layer of morphia where we can assign indexes to the class members.

Here is my reducer class:

public class XMLReducer extends Reducer<Text, MapWritable, Text, NullWritable>{

private static final Logger LOGGER = Logger.getLogger(XMLReducer.class);    

protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) throws IOException, InterruptedException{
    LOGGER.info("reduce()------Start for key>"+key);
    Map<String,String> insertProductInfo = new HashMap<String,String>();
    try{
        MongoClient mongoClient = new MongoClient("localhost", 27017);
        DB db = mongoClient.getDB("test");
        BulkWriteOperation operation = db.getCollection("product").initializeOrderedBulkOperation();
        for (MapWritable entry : values) {
             for (Entry<Writable, Writable> extractProductInfo : entry.entrySet()) {
                    insertProductInfo.put(extractProductInfo.getKey().toString(), extractProductInfo.getValue().toString());
                }
             if(!insertProductInfo.isEmpty()){
                 BasicDBObject basicDBObject = new BasicDBObject(insertProductInfo);
                 operation.insert(basicDBObject);
             }          
        }
        //How can I check for duplicates before executing bulk operation
        operation.execute();
        LOGGER.info("reduce------end for key"+key);
    }catch(Exception e){
        LOGGER.error("General Exception in XMLReducer",e);
    }
  } 
}

EDIT: After the suggested answer I have added :

 BasicDBObject query = new BasicDBObject("product_image", basicDBObject.get("product_image"))
                 .append("product_name", basicDBObject.get("product_name"));
                 operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicDBObject));
 operation.insert(basicDBObject);

I am getting error like: com.mongodb.MongoInternalException: no mapping found for index 0

Any help will be useful.Thanks.

1

There are 1 best solutions below

6
On BEST ANSWER

I suppose it all depends on what you really want to do with the "duplicates" here as to how you handle it.

For one you can always use .initializeUnOrderedBulkOperation() which won't "error" on a duplicate key from your index ( which you need to stop duplicates ) but will report any such errors in the returned BulkWriteResult object. Which is returned from .execute()

BulkWriteResult result = operation.execute();

On the other hand, you can just use "upserts" instead and use operators such as $setOnInsert to only make changes where no duplicate existed:

BasicDBObject basicdbobject = new BasicDBObject(insertProductInfo);
BasicDBObject query = new BasicDBObject("key", basicdbobject.get("key"));

operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicdbobject));

So you basically look up the value of the field that holds the "key" to determine a duplicate with a query, then only actually change any data where that "key" was not found and therefore a new document and "inserted".

In either case the default behaviour here will be to "insert" the first unique "key" value and then ignore all other occurances. If you want to do other things like "overwrite" or "increment" values where the same key is found then the .update() "upsert" approach is the one you want, but you will use other update operators for those actions.