So I have 2 collection, lets say its new collection and other collection and I wanted to insert data from this other collection to a whole new collection in which the other collection contains data that could be processed to a new form of data in which will be stored to this new collection. This process will be done in one go. Problem is the asynchronous behavior of the Scala MongoDB driver is kind of a problem. I wanted this new collection to be auto increment because I needed the ID after it was inserted to the database.
So the obvious step would be :
- Subscribe to the Other Collection findAll Observable, and get the emitted data
- Count the data on new collection to find out the amout, increment by one and you get the last id for the data emitted
- Process the data (mapping, mutate it, etc)
- Insert that new data into new collection
Now the problem is, between step 2 and 4, there are some latency before inserting the data, because it sometimes throw an Exception saying, duplicate ID upon inserting data to MongoDB. I didn't find anyway to block in the documentation.
How am I supposed to block the observable so that it will start the process after other threads have finished?
Or is there a better way to do what I want? I am open to suggestion
Any reference for reading are also welcomed, Thanks
Edit: After doing a little more searching, I found a more Scala-like way of solving this problem:
This does exactly what it looks like: runs the query, waits for it to finish, and returns the result.
I wrote up this bit of code to fix a very similar problem. A bit of a hack but it works.
Acquiring the semaphore sets it to zero, which then causes the next acquire to block. This thread will then sleep until the observer either completes or gets an error, achieving functionally the same result as if the subscribe had blocked.