I'm playing around with kinesis, I have tried very simple example. Steps I have performed: Execute Producer to Put Some records that works successfully.
Facing error on the consumer end while executing the getRecords. I have tried to change all these methods to get records from api: 'AT_SEQUENCE_NUMBER | AFTER_SEQUENCE_NUMBER | TRIM_HORIZON | LATEST'.
Response:
Aws\Result Object
( [data:Aws\Result:private] => Array ( [Records] => Array ( )
[NextShardIterator] => AAAAAAAAAA.....
[MillisBehindLatest] => 0
[@metadata] => Array
Source Code:
$streamName = 'test';
$numberOfRecordsPerBatch = 10000;
require_once 'vendor/autoload.php';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis([
'region' => '{REGION}',
'version' => '2013-12-02',
'credentials' => [
'key' => '{API_KEY}',
'secret' => '{API_SECRET}'
]
]);
// get all shard ids
$res = $kinesisClient->describeStream([ 'StreamName' => $streamName ]);
$shardIds = $res->search('StreamDescription.Shards[].ShardId');
$count = 0;
$startTime = microtime(true);
foreach ($shardIds as $shardId) {
echo "ShardId: $shardId\n";
// get initial shard iterator
$res = $kinesisClient->getShardIterator([
'ShardId' => $shardId,
'ShardIteratorType' => 'LATEST', // 'AT_SEQUENCE_NUMBER| AFTER_SEQUENCE_NUMBER | TRIM_HORIZON|LATEST'
// 'StartingSequenceNumber' => '<string>',
'StreamName' => $streamName,
]);
$shardIterator = $res->get('ShardIterator');
do {
echo "Get Records\n";
$res = $kinesisClient->getRecords([
'Limit' => $numberOfRecordsPerBatch,
'ShardIterator' => $shardIterator
]);
$shardIterator = $res->get('NextShardIterator');
$localCount = 0;
foreach ($res->search('Records[].[SequenceNumber, Data]') as $data) {
list($sequenceNumber, $item) = $data;
echo "- [$sequenceNumber] $item\n";
$count++;
$localCount++;
}
echo "Processed $localCount records in this batch\n";
sleep(1);
} while ($localCount>0);
}
I have explored AWS documentation as well and see that all the details we are sending is correct but still we did not receive any records in response.
Thanks
When you ingest records to kinesis data stream, you will receive sequence number for each record. Sequence numbers for the same partition key generally increase over time. The longer the time period between write requests, the larger the sequence numbers become.
when you perform GetShardIterator, you basically points to particular sequence number in that shard. There is no guarantee that ingested data is available at current pointer. So it is possible that first GetRecords might not return any records. You have to run GetRecords in loop. currently your while condition will fail if first GetRecords did not return any result. Instead you can have condition to check if "NextShardIterator" is not null in while to continuously read from shard.
If you want to get records in first GetRecords call then