How to use Cassandra sink with TestContainers in Flink

755 Views Asked by At

Im trying to test Cassandra Sink with use of TestContainers in a simple Flink pipeline which use DataStreamTestBase for tests:

public class CassandraPojoSinkExampleTest extends DataStreamTestBase {

    @Rule
    public CassandraContainer cassandra = new CassandraContainer<>().withInitScript("testInit3.cql")
            .withCreateContainerCmdModifier(cmd -> cmd.getHostConfig()
                    .withCpuCount(2L));

    @Override
    public void initialize() throws Exception {
        super.initialize();
        testEnv.getConfig().disableObjectReuse();
        testEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        testEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.of(1, TimeUnit.SECONDS)));
    }

    @Test
    public void test() throws Throwable {
       ArrayList<Message> messages = new ArrayList<>(20);

            for (long i = 0; i < 20; i++) {
                messages.add(new Message("cassandra-" + i));
            }


        Cluster cluster = cassandra.getCluster();

        try (Session session = cluster.connect()) {

            CassandraPojoSinkExample cassandraPojoSinkExample= new CassandraPojoSinkExample();
            cassandraPojoSinkExample.doIt(cassandra.getHost(), testEnv.fromCollection(messages));

            MappingManager manager= new MappingManager(session);
            Mapper<Message> mapper=manager.mapper(Message.class);
            String word ="cassandra-1";
            Message message= mapper.get(word);
            System.out.println(message);
        }
    }

Flink pipeline looks like (its only sink in doIt method):

public class CassandraPojoSinkExample {
    private static final ArrayList<Message> messages = new ArrayList<>(20);

    static {
        for (long i = 0; i < 20; i++) {
            messages.add(new Message("cassandra-" + i));
        }
    }
    public static void main(String[] args) throws Exception {

        // the main method is not used in tests

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Message> source = env.fromCollection(messages);

        CassandraPojoSinkExample cassandraPojoSinkExample=new CassandraPojoSinkExample();
        cassandraPojoSinkExample.doIt("127.0.0.1", source);
    }
    public void doIt(String address, DataStreamSource<Message> source) throws Exception {

        CassandraSink.addSink(source)
                .setClusterBuilder(new ClusterBuilder() {
                    @Override
                    protected Cluster buildCluster(Builder builder) {
                        return builder.addContactPoint(address).build();
                    }
                })
                .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
                .build();
    }

When i run the test i got error:

16:41:57,675 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.checkAsyncErrors(CassandraSinkBase.java:162)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:96)
    at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    at java.lang.Thread.run(Thread.java:748)
16:41:57,683 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Collection Source -> Sink: Cassandra Sink (1/1) (721a02e3c1345d640bf03b67eff0aba7) switched from RUNNING to FAILED.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Cannot connect))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
    at com.datastax.driver.core.Cluster.init(Cluster.java:162)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:283)
    at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.createSession(CassandraPojoSink.java:123)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:87)
    at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:106)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    at java.lang.Thread.run(Thread.java:748)

What am I doing wrong here?

2

There are 2 best solutions below

0
On BEST ANSWER

From the stacktrace above, com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 it seems that the cassandra hosts are not available.

I would say you need to expose the ports to outside :

    @Rule
    public CassandraContainer cassandra = new CassandraContainer<>().withInitScript("testInit3.cql")
            .withCreateContainerCmdModifier(cmd -> cmd.getHostConfig()
            .withCpuCount(2L))
            .withExposedPorts(9042);

Also, I would ensure manually via telnet to the corresponding host/port to make sure that service started.

0
On

Answer from @Mikalai Lushchytski was good, the problem was in exposed port.

The exposed port need to be passed to .setHost() in addSink method also.

Additionally, to be honest i already try this before but didn't notice it's working because the problem was in receiving information from Cassandra database table. In provided code i wasnt able to retrieve any row from the database during the test and the problem seems to be with DataStreamTestBase and the order in which the methods are called. I confirmed working of the pipeline by manually entering the database and extracting the data within ongoing test.

The solution to properly get database records from Cassandra TestContainer with use of DataStreamTestBase is to use Overrided executeTest() method:

public class CassandraPojoSinkExampleTest extends DataStreamTestBase {
    private static boolean runTestAfterMethod = true;

    @Rule
    public CassandraContainer cassandra = new CassandraContainer<>()
            .withInitScript("testInit3.cql")
            .withExposedPorts(9042);

    @Before
    public void setUp() {
        runTestAfterMethod = true;
    }

    @Override
    public void executeTest() throws Throwable {
        if (runTestAfterMethod) {
            super.executeTest();
        }
    }

    @Override
    public void initialize() throws Exception {
        super.initialize();
        testEnv.getConfig().disableObjectReuse();
        testEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.of(1, TimeUnit.SECONDS)));
    }

    @Test
    public void test() throws Throwable {
        runTestAfterMethod = false;

        ArrayList<Message> messages = new ArrayList<>(20);
        for (long i = 0; i < 20; i++) {
            messages.add(new Message("cassandra-" + i));
        }

        DataStreamSource<Message> source = testEnv.fromCollection(messages);

        CassandraPojoSinkExample cassandraPojoSinkExample = new CassandraPojoSinkExample();
        cassandraPojoSinkExample.doIt(cassandra.getHost(), cassandra.getMappedPort(9042), source);

    try( Session session = cassandra.getCluster().newSession()) {
        testEnv.executeTest();     // Crucial thing

        ResultSet resultSet = session.execute("select * from test.features;");
        List<Row> rows = resultSet.all();
        List<String> actualRecords = new ArrayList<String>();

             if (rows.size() != 0) {
                for (int i = 0; i < 20; i++) {
                    actualRecords.add(rows.get(0).getString(i));
                }
            }

            List<String> expectedRecords = new ArrayList<String>() {
                {
                    for (int i = 0; i < 20; i++) {
                        add("cassandra-" + i);
                    }
                }
            };

            assertTrue("List equality without order",
                    actualRecords.containsAll(expectedRecords)&& actualRecords.containsAll(expectedRecords));
        }
    }
    }

}

In that way we are sure that resultSet cannot be empty, and tests will be executed in correct order.