I have the class KafkaRecordInterceptor which implements the interface RecordInterceptor<String, Message> as follow
@Component
public class KafkaRecordInterceptor implements RecordInterceptor<String, Message<Object>> {
public static final ThreadLocal<MessageContext> messageContextThreadLocal = new ThreadLocal<>();
private final List<MessageContextListener> messageContextListener;
private final MetricsUtils metricsUtils;
private final SystemClock systemClock;
...
then I override the intercept method as below
@Override
public ConsumerRecord intercept(@NonNull ConsumerRecord consumerRecord, @NonNull Consumer consumer) {
MessageContext messageContext = new MessageContext();
messageContext.setStartClock(systemClock.now());
messageContextThreadLocal.set(messageContext);
Optional<MessagePriority> messagePriorityOptional =
Objects.requireNonNull(metricsUtils.getMessagePriority(consumerRecord));
messageContext.setCorrelationId(KafkaUtils.extractCorrelationId(consumerRecord));
messageContext
.setOperationType(((GenericMessage<?>) consumerRecord.value()).getPayload().getClass().getSimpleName());
messageContext.setPriority(messagePriorityOptional.map(MessagePriority::name).orElse(""));
messageContextListener.forEach(listener -> listener.onStart(messageContext));
return Objects.requireNonNull(consumerRecord);
}
I also override
@Override
public void afterRecord(ConsumerRecord record, Consumer consumer) {
MessageContext messageContext = messageContextThreadLocal.get();
messageContextListener.forEach(listener -> listener.onClear(messageContext));
messageContextThreadLocal.remove();
}
My question is, what is the best way to unit test the interceptor method and afterRecord ?
It looks like you can just
new KafkaRecordInterceptor()in the unit test. feed its method calls with mocks forConsumerRecordandConsumerand then verify in the end expected interaction.