Not able to publish and consume message with spring-cloud-stream-test-binder:4.0.4

1k Views Asked by At

I am using spring-cloud-stream-test-binder with version 4.0.4 But while publishing/consuming i am getting below error

java.lang.NullPointerException: Cannot invoke "org.springframework.messaging.SubscribableChannel.send(org.springframework.messaging.Message)" because the return value of "org.springframework.cloud.stream.binder.test.InputDestination.getChannelByName(String)" is null at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:89) at com.csn.tax.pfd.w2.evt.subscriber.service.ClientDataSubscriberTests.testExceptionOnGenerateW2Data(ClientDataSubscriberTests.java:512) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568)

My Code Snippet: Code Snippet Of Junit:

@DirtiesContext
@SpringBootTest
@AutoConfigureMockMvc
@TestPropertySource("classpath:test.properties")
@ImportAutoConfiguration(
    classes = TestChannelBinderConfiguration.class,
    exclude = {
            KafkaAutoConfiguration.class,
            KafkaMetricsAutoConfiguration.class,
            DataSourceAutoConfiguration.class,
            TransactionAutoConfiguration.class,
            DataSourceTransactionManagerAutoConfiguration.class})
public class ClientDataSubscriberTests {
    
static final String INPUT_TOPIC_NAME = "tax-datafeed-evt-load-clt-dnr";

static final String OUTPUT_TOPIC_NAME = "tax-statusdata-evt-load-clt-dnr";
@Test
public void testExceptionOnGenerateW2Data() {
    String fileId = "testFileId";
    String loadId = "testLoadId";
    String runId = "testRunId";
    String branchNumber = "0080";
    String clientAccountNumber = "INVALID_CLIENT";
    String quarter = "3";
    String year = "2020";

    // Build Kafka event with INVALID_CLIENT
    PayrollClientData payrollClientData = PayrollClientData.newBuilder()
            .setFileId(fileId)
            .setLoadId(loadId)
            .setRunId(runId)
            .setBranchNumber(branchNumber)
            .setClientAccountId(clientAccountNumber)
            .setQuarter(quarter)
            .setYear(year)
            .build();

    Message<PayrollClientData> clientDataMessage = MessageBuilder.withPayload(payrollClientData).build();

    // Send input message
    inputDestination.send(clientDataMessage, INPUT_TOPIC_NAME);

    // Receive output message
    @SuppressWarnings("unchecked")
    Message<PayrollLoadStatusData> payrollLoadStatusMessage = (Message<PayrollLoadStatusData>) (Object)
            outputDestination.receive(500, OUTPUT_TOPIC_NAME);

    PayrollLoadStatusData receivedPayrollLoadStatusData = payrollLoadStatusMessage.getPayload();

    // Assert that received Kafka event contains the expected data for each field
    assertNotNull(receivedPayrollLoadStatusData);

    assertEquals(receivedPayrollLoadStatusData.getRunId().toString(), runId);
    assertEquals(receivedPayrollLoadStatusData.getLoadId().toString(), loadId);
    assertEquals(receivedPayrollLoadStatusData.getFileId().toString(), fileId);
    assertEquals(receivedPayrollLoadStatusData.getQuarter().toString(), quarter);
    assertEquals(receivedPayrollLoadStatusData.getYear().toString(), year);
    assertEquals(receivedPayrollLoadStatusData.getClientAccountId().toString(), clientAccountNumber);
    assertEquals(receivedPayrollLoadStatusData.getBranchNumber().toString(), branchNumber);
    assertEquals(receivedPayrollLoadStatusData.getStatusType(), PayrollLoadStatusType.FAILED);
    
}
}


**Code snippet Of Main Class :**

@SpringBootApplication(scanBasePackages = 
{"com.csn.evt.subscriber"},
                   exclude = {DataSourceAutoConfiguration.class,
                           MongoAutoConfiguration.class,
                           MongoDataAutoConfiguration.class})
@EnableRetry
@EnableMongoAuditing
@EnableAsync
@EnableScheduling
public class Application {
public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
}
}
1

There are 1 best solutions below

3
On

The code snippet you had posted is not helpful, since it does not show anything about how your test is setup. Consider looking at some of our tests where we use Test Binder for an example https://github.com/spring-cloud/spring-cloud-stream/blob/main/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/ImplicitFunctionBindingTests.java. Also, you can see an isolated sample here