I am using Junit 5 Dynamic tests. My intention is to create a stream of elements from the collection to pass it on to test in JUnit5. However with this code, I am able to run only 1000 records. How do I make this work seamlessly non-blocking.
MongoCollection<Document> collection = mydatabase.getCollection("mycoll");
final List<Document> cache = Collections.synchronizedList(new ArrayList<Document>());
FindIterable<Document> f = collection.find().batchSize(1000);
f.batchCursor(new SingleResultCallback<AsyncBatchCursor<Document>>() {
@Override
public void onResult(AsyncBatchCursor<Document> t, Throwable thrwbl) {
t.next(new SingleResultCallback<List<Document>>() {
@Override
public void onResult(List<Document> t, Throwable thrwbl) {
if (thrwbl != null) {
th.set(thrwbl);
}
cache.addAll(t);
latch.countDown();;
}
});
}
});
latch.await();
return cache.stream().map(batch->process(batch));
Updated Code
@ParameterizedTest
@MethodSource("setUp")
void cacheTest(MyClazz myclass) throws Exception {
assertTrue(doTest(myclass));
}
public static MongoClient getMongoClient() {
// get client here
}
private static Stream<MyClazz> setUp() throws Exception {
MongoDatabase mydatabase = getMongoClient().getDatabase("test");
List<Throwable> failures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
List<MyClazz> list = Collections.synchronizedList(new ArrayList<>());
mydatabase.getCollection("testcollection").find()
.toObservable().subscribe(
document -> {
list.add(process(document));
},
throwable -> {
failures.add(throwable);
},
() -> {
latch.countDown();
});
latch.await();
return list.stream();
}
public boolean doTest(MyClazz myclass) {
// processing goes here
}
public MyClazz process(Document doc) {
// doc gets converted to MyClazz
return MyClazz;
}
Even now, I see that all the data is loaded after which the unit testing happens. I think this is because of latch.await(). However, if I remove that, there is a chance that no test cases are run as the db could possibly be loading collection.
My use case is : I have million records in mongo and am running sort of integration test case with them. It wouldn't be feasible to load all of them in memory and hence I am attempting the streaming solution.
I don't think I fully understand your use case but given that your question is tagged with
java
andmongo-asyc-driver
this requirement is certainly achievable:The following code:
Observable
from that collectionObservable
Marks completion
Notes:
com.mongodb.rx.client
namespace ... theorg.mongodb::mongodb-driver-rx
Maven artifact)collection.find().batchSize()
which clearly indicates that you are not currently using the Rx driver (sincebatchSize
cannot be a Rx friendly concept :)io.reactive::rxjava
Update 1: based on the change to your question (which follows my original answer), you have asked: " I see that all the data is loaded after which the unit testing happens. I think this is because of latch.await()"? I think you are pop[ulating a list from the observable stream and only after the observable is exhausted do you start invoking
doTest()
. This approach involves (1) streaming results from MongoDB; (2) storing those results in-memory; (3) runningdoTest()
for each stored result. If you really want to stream all-the-way then you should calldoTest()
from within your observable's subscription. For example:The above code will invoke
doTest()
as it receives each document from MongoDB and when the entire observable is exhausted the latch will be decremented and your code will complete.