Creating stream of elements from collection

875 Views Asked by At

I am using Junit 5 Dynamic tests. My intention is to create a stream of elements from the collection to pass it on to test in JUnit5. However with this code, I am able to run only 1000 records. How do I make this work seamlessly non-blocking.

    MongoCollection<Document> collection = mydatabase.getCollection("mycoll");
    final List<Document> cache = Collections.synchronizedList(new ArrayList<Document>());

    FindIterable<Document> f = collection.find().batchSize(1000);
    f.batchCursor(new SingleResultCallback<AsyncBatchCursor<Document>>() {

        @Override
        public void onResult(AsyncBatchCursor<Document> t, Throwable thrwbl) {
            t.next(new SingleResultCallback<List<Document>>() {

                @Override
                public void onResult(List<Document> t, Throwable thrwbl) {
                    if (thrwbl != null) {
                        th.set(thrwbl);
                    }
                    cache.addAll(t);
                    latch.countDown();;

                }
            });
        }
    });
    latch.await();
    return cache.stream().map(batch->process(batch));

Updated Code

@ParameterizedTest
@MethodSource("setUp")
void cacheTest(MyClazz myclass) throws Exception {
    assertTrue(doTest(myclass));
}
public static MongoClient getMongoClient() {
 // get client here
}

private static Stream<MyClazz> setUp() throws Exception {
    MongoDatabase mydatabase = getMongoClient().getDatabase("test");
    List<Throwable> failures = new ArrayList<>();
    CountDownLatch latch = new CountDownLatch(1);
    List<MyClazz> list = Collections.synchronizedList(new ArrayList<>());
            mydatabase.getCollection("testcollection").find()
            .toObservable().subscribe(
            document -> {
                list.add(process(document));
            },
            throwable -> {
                failures.add(throwable);
            },
            () -> {
                latch.countDown();
            });
    latch.await();
    return list.stream();
}

public boolean doTest(MyClazz myclass) { 
// processing goes here
}
public MyClazz process(Document doc) { 
// doc gets converted to MyClazz
   return MyClazz;
}

Even now, I see that all the data is loaded after which the unit testing happens. I think this is because of latch.await(). However, if I remove that, there is a chance that no test cases are run as the db could possibly be loading collection.

My use case is : I have million records in mongo and am running sort of integration test case with them. It wouldn't be feasible to load all of them in memory and hence I am attempting the streaming solution.

1

There are 1 best solutions below

3
On BEST ANSWER

I don't think I fully understand your use case but given that your question is tagged with java and mongo-asyc-driver this requirement is certainly achievable:

create a stream of elements from the collection to pass it on to test ... make this work seamlessly non-blocking

The following code:

  • Uses the MongoDB RxJava driver to query a collection
  • Creates a Rx Observable from that collection
  • Subscribes to that Observable
  • Records exceptions
  • Marks completion

    CountDownLatch latch = new CountDownLatch(1);
    List<Throwable> failures = new ArrayList<>();
    collection.find()
            .toObservable().subscribe(
            // on next, this is invoked for each document returned by your find call
            document -> {
                // presumably you'll want to do something here to meet this requirement: "pass it on to test in JUnit5" 
                System.out.println(document);
            },
            /// on error
            throwable -> {
                failures.add(throwable);
            },
            // on completion
            () -> {
                latch.countDown();
            });
    // await the completion event
    latch.await(); 
    

Notes:

  • This requires use of the MongoDB RxJava driver (i.e. classes in the com.mongodb.rx.client namespace ... the org.mongodb::mongodb-driver-rx Maven artifact)
  • In your question you are invoking collection.find().batchSize() which clearly indicates that you are not currently using the Rx driver (since batchSize cannot be a Rx friendly concept :)
  • The above code is verified with v1.4.0 of the MongoDB RxJava driver and v1.1.10 of io.reactive::rxjava

Update 1: based on the change to your question (which follows my original answer), you have asked: " I see that all the data is loaded after which the unit testing happens. I think this is because of latch.await()"? I think you are pop[ulating a list from the observable stream and only after the observable is exhausted do you start invoking doTest(). This approach involves (1) streaming results from MongoDB; (2) storing those results in-memory; (3) running doTest() for each stored result. If you really want to stream all-the-way then you should call doTest() from within your observable's subscription. For example:

mydatabase.getCollection("testcollection").find()
        .toObservable().subscribe(
        document -> {
            doTest(process(document));
        },
        throwable -> {
            failures.add(throwable);
        },
        () -> {
            latch.countDown();
        });
latch.await();

The above code will invoke doTest() as it receives each document from MongoDB and when the entire observable is exhausted the latch will be decremented and your code will complete.