I have written below code for executing bulk operations (upserting the data) to cosmos db (NoSQL API).
I have written a job to store 100 000 data using Spring Schedular in batches of 300. The average Request Units taken by single data is 60-70. The container has 10 000 Request Unit (AutoScale).
AtomicInteger retry = new AtomicInteger(retryCount);
Flux<Data> gtinContainerFlux = Flux.fromIterable(dataset);
Flux<CosmosItemOperation> cosmosItemOperationFlux = gtinContainerFlux.map(data -> CosmosBulkOperations.getUpsertItemOperation(data, new PartitionKey(data.getKey())));
cosmosAsyncContainer.executeBulkOperations(cosmosItemOperationFlux, new CosmosBulkExecutionOptions())
.subscribe(cosmosBulkOperationResponse -> {
CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
if (cosmosBulkOperationResponse.getException() != null) {
log.warn("ERROR : {}", cosmosItemOperation.<Data>getItem().getId());
log.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
} else if (cosmosBulkItemResponse == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
log.error("The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete successfully with " + "a" + " {} response code.", cosmosItemOperation.<Data>getItem().getId(), cosmosItemOperation.<Data>getItem().getGtinKey(), cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
errorEvents.add(cosmosItemOperation.<Data>getItem());
} else {
log.debug("Data posted successfully with RUs used : " + cosmosBulkItemResponse.getRequestCharge() + " completed in " + cosmosBulkItemResponse.getCosmosDiagnostics().getDuration().getNano());
}
});
When saving data in batches of 300 in async manner, it start giving me below issue
Request rate is large. More Request Units may be needed, so no changes were made. Please retry this request later
StackTrace:
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.messageReceiv
ed(RntbdRequestManager.java:1121)
2024-01-15T11:09:54.963941386Z at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.channelRead(RntbdRequestManager.java:214)
2024-01-15T11:09:54.963946986Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
2024-01-15T11:09:54.963972387Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
2024-01-15T11:09:54.963978787Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
2024-01-15T11:09:54.963992287Z at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
2024-01-15T11:09:54.963997587Z at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
2024-01-15T11:09:54.964002887Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
2024-01-15T11:09:54.964007787Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
2024-01-15T11:09:54.964012587Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
2024-01-15T11:09:54.964018187Z at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
2024-01-15T11:09:54.964022987Z at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
2024-01-15T11:09:54.964028088Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
2024-01-15T11:09:54.964033288Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
2024-01-15T11:09:54.964038288Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
2024-01-15T11:09:54.964043688Z at io.netty.handler.timeou
t.IdleStateHandler.channelRead(IdleStateHandler.java:286)
2024-01-15T11:09:54.964048788Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
2024-01-15T11:09:54.964053688Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
2024-01-15T11:09:54.964058488Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
2024-01-15T11:09:54.964063888Z at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
2024-01-15T11:09:54.964081088Z at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338)
2024-01-15T11:09:54.964086788Z at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387)
2024-01-15T11:09:54.964091689Z at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
2024-01-15T11:09:54.964096589Z at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
2024-01-15T11:09:54.964101589Z at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
2024-01-15T11:09:54.964106389Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
2024-01-15T11:09:54.964111589Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
2024-01-15T11:09:54.964116489Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
2024-01-15T11:09:54.964121489Z at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
2024-01-15T11:09:54.964126289Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
2024-01-15T11:09:54.964135789Z at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
2024-01-15T11:09:54.964140789Z at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
2024-01-15T11:09:54.964145589Z at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
2024-01-15T11:09:54.964155690Z at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
2024-01-15T11:09:54.964160990Z at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
2024-01-15T11:09:54.964166190Z at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
2024-01-15T11:09:54.964171090Z at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
2024-01-15T11:09:54.964176090Z at java.base/java.lang.Thread.run(Thread.java:833)
Any log which are present in osmosBulkOperationResponse.getException() != null If block is not printed and I am not able to handle the error properly.
Can anyone give the some inputs or code changes or setting/options I need to set for handling error gracefully?