Using pubsub lite library in spark getting error

253 Views Asked by At

I am getting error while publishing message to gcp pubsub lite using spark structured streaming. I cannot use writestream as I want to use it in forEachBatch sink in spark so I am using foreachpartition and foreach and publishing message inside foreach for each dataframe row. Below is error I get , some messages get published but in some I can see below exception:

2022-06-07 10:08:17 WARN  PartitionCountWatcherImpl:101 - Failed to refresh partition count
com.google.api.gax.rpc.ApiException: 
    at com.google.cloud.pubsublite.internal.CheckedApiException.<init>(CheckedApiException.java:51)
    at com.google.cloud.pubsublite.internal.CheckedApiException.<init>(CheckedApiException.java:55)
    at com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical(ExtractStatus.java:49)
    at com.google.cloud.pubsublite.internal.wire.PartitionCountWatcherImpl.pollTopicConfig(PartitionCountWatcherImpl.java:92)
    at com.google.cloud.pubsublite.internal.wire.PartitionCountWatcherImpl.onAlarm(PartitionCountWatcherImpl.java:71)
    at com.google.cloud.pubsublite.internal.AlarmFactory.lambda$null$0(AlarmFactory.java:41)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:456)
    at com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:100)
    at com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:73)
    at com.google.cloud.pubsublite.internal.wire.PartitionCountWatcherImpl.pollTopicConfig(PartitionCountWatcherImpl.java:81)
    ... 9 more
0

There are 0 best solutions below