Converting UnixTimestamp to TIMEUUID for Cassandra

2.7k Views Asked by At

I'm learning all about Apache Cassandra 3.x.x and I'm trying to develop some stuff to play around. The problem is that I want to store data into a Cassandra table which contains these columns:

id (UUID - Primary Key) | Message (TEXT) | REQ_Timestamp (TIMEUUID) | Now_Timestamp (TIMEUUID)

REQ_Timestamp has the time when the message left the client at frontend level. Now_Timestamp, on the other hand, is the time when the message is finally stored in Cassandra. I need both timestamps because I want to measure the amount of time it takes to handle the request from its origin until the data is safely stored.

Creating the Now_Timestamp is easy, I just use the now() function and it generates the TIMEUUID automatically. The problem arises with REQ_Timestamp. How can I convert that Unix Timestamp to a TIMEUUID so Cassandra can store it? Is this even possible?

The architecture of my backend is this: I get the data in a JSON from the frontend to a web service that process it and stores it in Kafka. Then, a Spark Streaming job takes that Kafka log and puts it in Cassandra.

This is my WebService that puts the data in Kafka.

@Path("/")
public class MemoIn {

    @POST
    @Path("/in")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.TEXT_PLAIN)
    public Response goInKafka(InputStream incomingData){
        StringBuilder bld = new StringBuilder();
        try {
            BufferedReader in = new BufferedReader(new InputStreamReader(incomingData));
            String line = null;
            while ((line = in.readLine()) != null) {
                bld.append(line);
            }
        } catch (Exception e) {
            System.out.println("Error Parsing: - ");
        }
        System.out.println("Data Received: " + bld.toString());

        JSONObject obj = new JSONObject(bld.toString());
        String line = obj.getString("id_memo") + "|" + obj.getString("id_writer") +
                                 "|" + obj.getString("id_diseased")
                                 + "|" + obj.getString("memo") + "|" + obj.getLong("req_timestamp");

        try {
            KafkaLogWriter.addToLog(line);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return Response.status(200).entity(line).build();
    }


}

Here's my Kafka Writer

    package main.java.vcemetery.webservice;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;

public class KafkaLogWriter {

    public static void addToLog(String memo)throws Exception {
        // private static Scanner in;
            String topicName = "MemosLog";

            /*
            First, we set the properties of the Kafka Log
             */
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            // We create the producer
            Producer<String, String> producer = new KafkaProducer<>(props);
            // We send the line into the producer
            producer.send(new ProducerRecord<>(topicName, memo));
            // We close the producer
            producer.close();

    }
}

And finally here's what I have of my Spark Streaming job

public class MemoStream {

    public static void main(String[] args) throws Exception {
        Logger.getLogger("org").setLevel(Level.ERROR);
        Logger.getLogger("akka").setLevel(Level.ERROR);

        // Create the context with a 1 second batch size
        SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkExample").setMaster("local[2]");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        /* Se crea un array con los tópicos a consultar, en este caso solamente un tópico */
        Collection<String> topics = Arrays.asList("MemosLog");

        final JavaInputDStream<ConsumerRecord<String, String>> kafkaStream =
                KafkaUtils.createDirectStream(
                        ssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        kafkaStream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
        // Split each bucket of kafka data into memos a splitable stream
        JavaDStream<String> stream = kafkaStream.map(record -> (record.value().toString()));
        // Then, we split each stream into lines or memos
        JavaDStream<String> memos = stream.flatMap(x -> Arrays.asList(x.split("\n")).iterator());
        /*
         To split each memo into sections of ids and messages, we have to use the code \\ plus the character
          */
        JavaDStream<String> sections = memos.flatMap(y -> Arrays.asList(y.split("\\|")).iterator());
        sections.print();
        sections.foreachRDD(rdd -> {
           rdd.foreachPartition(partitionOfRecords -> {
               //We establish the connection with Cassandra
               Cluster cluster = null;
               try {
                   cluster = Cluster.builder()
                           .withClusterName("VCemeteryMemos") // ClusterName
                           .addContactPoint("127.0.0.1") // Host IP
                           .build();

               } finally {
                   if (cluster != null) cluster.close();
               }
               while(partitionOfRecords.hasNext()){


               }
           });
        });

        ssc.start();
        ssc.awaitTermination();

    }
}

Thank you in advance.

1

There are 1 best solutions below

5
On

Cassandra has no function to convert from UNIX timestamp. You have to do the conversion on client side.

Ref: https://docs.datastax.com/en/cql/3.3/cql/cql_reference/timeuuid_functions_r.html