No need to Dynamodb client while consuming data from Kinesis using kcl

241 Views Asked by At

I am using kcl api version 2 ,and dont want to use Dynamodb Client for storing the records .

private static final Logger LOG = LoggerFactory.getLogger(DisplayConsumerApplication.class);


    public static void main(String... args) {

        KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder().credentialsProvider(ProfileCredentialsProvider.create())
                .region(Region.of("US-EAST-1")).build();

 //DynamoDbAsyncClient dynamoClient =
        //  DynamoDbAsyncClient.builder().credentialsProvider(ProfileCredentialsProvider.
        //  create()) .region(Region.of("US-EAST-1")).build();

        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().credentialsProvider(ProfileCredentialsProvider.create())
                .region(Region.of("US-EAST-1")).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder("Sample","Sample", kinesisClient,null,
                cloudWatchClient, UUID.randomUUID().toString(), new DisplayConsumerFactory());

        Scheduler scheduler = new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(), configsBuilder.retrievalConfig());

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            LOG.error("Caught exception while waiting for confirm.  Shutting down", ioex);
        }

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        LOG.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOG.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            LOG.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            LOG.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        LOG.info("Completed, shutting down now.");
    }
}

As you can see I commented initialize of DynamodbClient ,but in that method it is manadatory to pass the object of Dynamoclient .So I passed as null ,but getting null pointer exception ,Could you please share your idea how I can use Scheduler without dynamodb client ?

0

There are 0 best solutions below