I am using Kinesis KCL 2.0 and have a consumer pretty much as showed in the example: https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html
/**
* Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
* ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
* class below.
*/
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
/**
* The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
* instance is configured with defaults provided by the ConfigsBuilder.
*/
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
);
/**
* Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
* until an exit is triggered.
*/
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
I have configured X-Ray:
@Bean
public AWSXRayRecorder awsXRayRecorder() {
AWSXRayRecorderBuilder builder = AWSXRayRecorderBuilder.standard().withPlugin(new ECSPlugin());
AWSXRayRecorder recorder = builder.build();
AWSXRay.setGlobalRecorder(recorder);
return recorder;
}
And here are the dependencies in pom.xml
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-xray-recorder-sdk-core</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-xray-recorder-sdk-aws-sdk-v2-instrumentor</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-xray-recorder-sdk-aws-sdk-instrumentor</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-xray-recorder-sdk-spring</artifactId>
</dependency>
When I run the app I get the following exceptions:
{"@timestamp":"2024-01-23T15:40:12.737814+01:00","level":"ERROR","thread_name":"prefetch-cache-shardId-000000000000-0000","logger_name":"com.amazonaws.xray.strategy.LogErrorContextMissingStrategy","message":"Suppressing AWS X-Ray context missing exception (SegmentNotFoundException): Failed to begin subsegment named 'Kinesis': segment cannot be found."}
{"@timestamp":"2024-01-23T15:40:12.16158+01:00","level":"ERROR","thread_name":"ShardRecordProcessor-0000","logger_name":"com.amazonaws.xray.strategy.LogErrorContextMissingStrategy","message":"Suppressing AWS X-Ray context missing exception (SegmentNotFoundException): Failed to begin subsegment named 'DynamoDb': segment cannot be found."}
I understand that this is because the code is running outside a Servlet Context so no X-Ray segment is available, as described here.
I tried different ways to manually start a segment, but none of them seem to work:
Entity segment = recorder.getTraceEntity();
Thread schedulerThread = new Thread(() -> {
try {
Subsegment subsegment = AWSXRay.beginSubsegment("Kinesis Consumer");
scheduler.run();
} finally {
AWSXRay.endSubsegment();
}
});
or
Segment segment = AWSXRay.beginSegment("KinesisProcessingSegment");
Scheduler scheduler = new Scheduler(
...
);
Thread schedulerThread = new Thread(() -> {
try {
AWSXRay.setTraceEntity(segment);
scheduler.run();
} finally {
AWSXRay.endSegment();
}
});
schedulerThread.setDaemon(true);
schedulerThread.start();