Apache Spark Kinesis Sample not working

1.7k Views Asked by At

I am trying to run the JavaKinesisWordCountASL example.

The example seem to connect to my Kinesis Stream and gets data from the stream (as shown in the log below). However, Sparks does not invoke the call function passed to the unionStreams.flatMap method in the example and does not prints any wordcount.

I have tried running using both Java 8 and Java 7. I am running it on an ubuntu instance. The same example works on my macbook.

14/11/15 01:59:42 INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks 14/11/15 01:59:42 INFO storage.MemoryStore: ensureFreeSpace(264) called with curMem=3512, maxMem=938244833 14/11/15 01:59:42 INFO storage.MemoryStore: Block input-0-1416016781800 stored as values in memory (estimated size 264.0 B, free 894.8 MB) 14/11/15 01:59:42 INFO storage.BlockManagerInfo: Added input-0-1416016781800 in memory on ip-10-80-91-13.ec2.internal:39149 (size: 264.0 B, free: 894.8 MB) 14/11/15 01:59:42 INFO storage.BlockManagerMaster: Updated info of block input-0-1416016781800 14/11/15 01:59:42 INFO scheduler.JobScheduler: Added jobs for time 1416016782000 ms 14/11/15 01:59:42 INFO network.SendingConnection: Initiating connection to [ip-10-80-91-13.ec2.internal/10.80.91.13:39149] 14/11/15 01:59:42 INFO network.SendingConnection: Connected to [ip-10-80-91-13.ec2.internal/10.80.91.13:39149], 1 messages pending 14/11/15 01:59:42 INFO network.ConnectionManager: Accepted connection from [ip-10-80-91-13.ec2.internal/10.80.91.13:56700] 14/11/15 01:59:42 WARN storage.BlockManager: Block input-0-1416016781800 already exists on this machine; not re-adding it 14/11/15 01:59:42 INFO receiver.BlockGenerator: Pushed block input-0-1416016781800 14/11/15 01:59:43 INFO storage.MemoryStore: ensureFreeSpace(256) called with curMem=3776, maxMem=938244833 14/11/15 01:59:43 INFO storage.MemoryStore: Block input-0-1416016782800 stored as values in memory (estimated size 256.0 B, free 894.8 MB) 14/11/15 01:59:43 INFO storage.BlockManagerInfo: Added input-0-1416016782800 in memory on ip-10-80-91-13.ec2.internal:39149 (size: 256.0 B, free: 894.8 MB) 14/11/15 01:59:43 INFO storage.BlockManagerMaster: Updated info of block input-0-1416016782800 14/11/15 01:59:43 WARN storage.BlockManager: Block input-0-1416016782800 already exists on this machine; not re-adding it 14/11/15 01:59:43 INFO receiver.BlockGenerator: Pushed block input-0-1416016782800 14/11/15 01:59:44 INFO scheduler.ReceiverTracker: Stream 0 received 2 blocks 14/11/15 01:59:44 INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks 14/11/15 01:59:44 INFO scheduler.JobScheduler: Added jobs for time 1416016784000 ms 14/11/15 01:59:46 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks 14/11/15 01:59:46 INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks 14/11/15 01:59:46 INFO scheduler.JobScheduler: Added jobs for time 1416016786000 ms 14/11/15 01:59:46 INFO impl.CWPublisherRunnable: Successfully published 17 datums. 14/11/15 01:59:46 INFO storage.MemoryStore: ensureFreeSpace(248) called with curMem=4032, maxMem=938244833 14/11/15 01:59:46 INFO storage.MemoryStore: Block input-1-1416016786000 stored as values in memory (estimated size 248.0 B, free 894.8 MB) 14/11/15 01:59:46 INFO storage.BlockManagerInfo: Added input-1-1416016786000 in memory on ip-10-80-91-13.ec2.internal:39149 (size: 248.0 B, free: 894.8 MB) 14/11/15 01:59:46 INFO storage.BlockManagerMaster: Updated info of block input-1-1416016786000 14/11/15 01:59:46 WARN storage.BlockManager: Block input-1-1416016786000 already exists on this machine; not re-adding it 14/11/15 01:59:46 INFO receiver.BlockGenerator: Pushed block input-1-1416016786000 14/11/15 01:59:46 INFO impl.CWPublisherRunnable: Successfully published 14 datums. 14/11/15 01:59:48 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks 14/11/15 01:59:48 INFO storage.MemoryStore: ensureFreeSpace(264) called with curMem=4280, maxMem=938244833 14/11/15 01:59:48 INFO scheduler.ReceiverTracker: Stream 1 received 1 blocks 14/11/15 01:59:48 INFO storage.MemoryStore: Block input-0-1416016787800 stored as values in memory (estimated size 264.0 B, free 894.8 MB) 14/11/15 01:59:48 INFO storage.BlockManagerInfo: Added input-0-1416016787800 in memory on ip-10-80-91-13.ec2.internal:39149 (size: 264.0 B, free: 894.8 MB) 14/11/15 01:59:48 INFO storage.BlockManagerMaster: Updated info of block input-0-1416016787800 14/11/15 01:59:48 INFO scheduler.JobScheduler: Added jobs for time 1416016788000 ms 14/11/15 01:59:48 WARN storage.BlockManager: Block input-0-1416016787800 already exists on this machine; not re-adding it 14/11/15 01:59:48 INFO receiver.BlockGenerator: Pushed block input-0-1416016787800 14/11/15 01:59:50 INFO scheduler.ReceiverTracker: Stream 0 received 1 blocks 14/11/15 01:59:50 INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks 14/11/15 01:59:50 INFO scheduler.JobScheduler: Added jobs for time 1416016790000 ms 14/11/15 01:59:51 INFO storage.MemoryStore: ensureFreeSpace(264) called with curMem=4544, maxMem=938244833 14/11/15 01:59:51 INFO storage.MemoryStore: Block input-0-1416016790800 stored as values in memory (estimated size 264.0 B, free 894.8 MB) 14/11/15 01:59:51 INFO storage.BlockManagerInfo: Added input-0-1416016790800 in memory on ip-10-80-91-13.ec2.internal:39149 (size: 264.0 B, free: 894.8 MB) 14/11/15 01:59:51 INFO storage.BlockManagerMaster: Updated info of block input-0-1416016790800 14/11/15 01:59:51 WARN storage.BlockManager: Block input-0-1416016790800 already exists on this machine; not re-adding it 14/11/15 01:59:51 INFO receiver.BlockGenerator: Pushed block input-0-1416016790800

2

There are 2 best solutions below

1
On BEST ANSWER

This might have something to do with how many worker thread you got. I had the same problem when I ran the app with --master local[2]. I spent numerous hours searching for an answer and found nothing. Just out of curiosity, I changed to --master local[4] and it worked. I do not know the root cause. Maybe somebody more familiar with Spark can enlighten us.

Note: in my case, my Kinesis stream had two shards. So the app created two input streams, one for each shard.

2
On

Thanks to the hint from @user3594557.

There are two big notes from https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#input-dstreams

If the number of cores allocated to the application is less than or equal to the number of input DStreams / receivers, then the system will receive data, but not be able to process them.

When running locally, if you master URL is set to “local”, then there is only one core to run tasks. That is insufficient for programs with even one input DStream (file streams are okay) as the receiver will occupy that core and there will be no core left to process the data.