I have set of records under some specific shards in the Kinesis stream. I m using KCL 2.x consumer to consume records from kinesis, but the issue is that consumer is fetching me records from all the shards available in the stream. So is there any way i can specify shards or their ID's while configuring the configBuilder object or KCL consumer, so that only records from specified shards are consumed.
Sample Code :
configsBuilder = new ConfigsBuilder(
applicationName,
streamName,
kinesisAsyncClient,
dynamoDbClient,
cloudWatchClient,
workerID,
new RecordProcessorFactory());
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configBuilder.retrievalConfig()
);
// start the kinesis records consumer.
schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
Thanks in advance!
KCL 2.x provides a
ShardPrioritization
interface which allows to prioritize or filter the shards:That's said, you can provide the
ShardPrioritization
implementation which will leave only shards relevant for you.After that, just specify your prioritizer in coordinator config: