Cassandra CSharp driver batch statement failing

284 Views Asked by At

I am trying to use a batch statement instead of a single binded insert statement.
Even though this is a very small change this fails and I am not looking for a good way for the error handling and to find out which part is the issue.
One issue is definetly that the Java API has a getStatements method which is missing in the C# driver.
The pseudo code looks like this:

private BatchStatement batchStatement = new BatchStatement();
private const int blockingFactor = 5;
private int i = 0;
private object locker = new object();
public CassandraBufferHandler()
{
    Cluster = Cluster.Builder().AddContactPoints("localhost").Build();
    Session = Cluster.Connect("my_keyspace");
    InsertStatement = Session.Prepare("Insert into ticks (instrumentcode, timestamp, type, exchange, price, volume) values(?,?,?,?,?,?) if not exists;");
}


public void OnEvent(TickCassandra tickCassandra, long sequence, bool endOfBatch)
{
    try
    {
            lock (locker)
                batchStatement.Add(
                InsertStatement.Bind(tickCassandra.Instrumentcode,
                                     tickCassandra.Timestamp,
                                     tickCassandra.Type,
                                     tickCassandra.Exchange,
                                     tickCassandra.Price,
                                     tickCassandra.Volume));
            if (i++ % blockingFactor == 0)
            {
                BatchStatement tmp;
                lock (locker)
                {
                    tmp = batchStatement;
                    tmp.EnableTracing();
                    batchStatement = new BatchStatement();
                }
                Session.ExecuteAsync(tmp).ContinueWith(t =>
                {
                    if (t.Exception != null)
                    {
                        ErrorCount++;
                        Log.Error(t.Exception.Message + tmp.ToString());
                    }
                    else
                        InsertCount++;
                });
            }                
    }
    catch (Exception ex)
    {
        Log.Error("Exception:" + ex);
        Active = false;
    }
0

There are 0 best solutions below