Spark Streaming using Kinesis doesn't work if shard checkpoints exist in DynamoDB

831 Views Asked by At

This was cross-posted in SPARK-22685.

TL;DR – If shard checkpoints don't exist in DynamoDB (== completely fresh), Spark Streaming application reading from Kinesis works flawlessly. However, if the checkpoints exist (e.g. due to app restart), it fails most of the times.


The app uses Spark Streaming 2.2.0 and spark-streaming-kinesis-asl_2.11. When starting the app with checkpointed shard data (written by KCL to DynamoDB), after a few successful batches (number varies), this is what I can see in the logs:

First, Leases are lost:

17/12/01 05:16:50 INFO LeaseRenewer: Worker 10.0.182.119:9781acd5-6cb3-4a39-a235-46f1254eb885 lost lease with key shardId-000000000515

Then in random order: Can't update checkpoint - instance doesn't hold the lease for this shard and com.amazonaws.SdkClientException: Unable to execute HTTP request: The target server failed to respond follow, bringing down the whole app in a few batches:

17/12/01 05:17:10 ERROR ProcessTask: ShardId shardId-000000000394: Caught exception:
com.amazonaws.SdkClientException: Unable to execute HTTP request: The target server failed to respond
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1069)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1948)
        at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1924)
        at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:969)
        at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:945)
        at com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.get(KinesisProxy.java:156)
        at com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator.get(MetricsCollectingKinesisProxyDecorator.java:74)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.getRecords(KinesisDataFetcher.java:68)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.getRecordsResultAndRecordMillisBehindLatest(ProcessTask.java:291)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.getRecordsResult(ProcessTask.java:256)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:127)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
  Caused by: org.apache.http.NoHttpResponseException: The target server failed to respond
        at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:143)
        at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
        at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
        at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
        at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
        at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
        at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:82)
        at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
        at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
        at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
        at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
        at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1190)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
        ... 22 more

and

17/12/01 05:20:59 ERROR KinesisRecordProcessor: ShutdownException:  Caught shutdown exception, skipping checkpoint.
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:173)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:137)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:103)
        at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply$mcV$sp(KinesisCheckpointer.scala:94)
        at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:94)
        at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:94)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.kinesis.KinesisRecordProcessor$.retryRandom(KinesisRecordProcessor.scala:158)
        at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:94)
        at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:88)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.streaming.kinesis.KinesisCheckpointer.checkpoint(KinesisCheckpointer.scala:88)
        at org.apache.spark.streaming.kinesis.KinesisCheckpointer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll(KinesisCheckpointer.scala:116)
        at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$1.apply$mcVJ$sp(KinesisCheckpointer.scala:130)
        at org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
        at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
        at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)

Right now, the workaround is to go and delete all the checkpoint data of all shards from DynamoDB so that the app starts from the InitialPositionInStream.LATEST. Obviously, the downside of that is that checkpoint information is not used at all, and data is lost.

I may have missed something obvious, so any help would be appreciated.

0

There are 0 best solutions below