Configuring AWS X-Ray with Kinesis KCL 2.0

22 Views Asked by At

I am using Kinesis KCL 2.0 and have a consumer pretty much as showed in the example: https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html

       /**
     * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
     * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
     * class below.
     */
    DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
    CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
    ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

    /**
     * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
     * instance is configured with defaults provided by the ConfigsBuilder.
     */
    Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
    );

    /**
     * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
     * until an exit is triggered.
     */
    Thread schedulerThread = new Thread(scheduler);
    schedulerThread.setDaemon(true);
    schedulerThread.start();

I have configured X-Ray:

    @Bean
public AWSXRayRecorder awsXRayRecorder() {
    AWSXRayRecorderBuilder builder = AWSXRayRecorderBuilder.standard().withPlugin(new ECSPlugin());
    AWSXRayRecorder recorder = builder.build();
    AWSXRay.setGlobalRecorder(recorder);
    return recorder;
}

And here are the dependencies in pom.xml

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-xray-recorder-sdk-core</artifactId>
    </dependency>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-xray-recorder-sdk-aws-sdk-v2-instrumentor</artifactId>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-xray-recorder-sdk-aws-sdk-instrumentor</artifactId>
    </dependency>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-xray-recorder-sdk-spring</artifactId>
    </dependency>

When I run the app I get the following exceptions:

{"@timestamp":"2024-01-23T15:40:12.737814+01:00","level":"ERROR","thread_name":"prefetch-cache-shardId-000000000000-0000","logger_name":"com.amazonaws.xray.strategy.LogErrorContextMissingStrategy","message":"Suppressing AWS X-Ray context missing exception (SegmentNotFoundException): Failed to begin subsegment named 'Kinesis': segment cannot be found."}

{"@timestamp":"2024-01-23T15:40:12.16158+01:00","level":"ERROR","thread_name":"ShardRecordProcessor-0000","logger_name":"com.amazonaws.xray.strategy.LogErrorContextMissingStrategy","message":"Suppressing AWS X-Ray context missing exception (SegmentNotFoundException): Failed to begin subsegment named 'DynamoDb': segment cannot be found."}

I understand that this is because the code is running outside a Servlet Context so no X-Ray segment is available, as described here.

I tried different ways to manually start a segment, but none of them seem to work:

        Entity segment = recorder.getTraceEntity();
    Thread schedulerThread = new Thread(() -> {
        try {
            Subsegment subsegment = AWSXRay.beginSubsegment("Kinesis Consumer");
            scheduler.run();

        } finally {
            AWSXRay.endSubsegment();
        }
    });

or

Segment segment = AWSXRay.beginSegment("KinesisProcessingSegment");
    Scheduler scheduler = new Scheduler(
        ...
    );


    Thread schedulerThread = new Thread(() -> {
        try {
            AWSXRay.setTraceEntity(segment);
            scheduler.run();

        } finally {
            AWSXRay.endSegment();
        }
    });

    schedulerThread.setDaemon(true);
    schedulerThread.start();
0

There are 0 best solutions below