using kafka-streams to create a new KStream containing multiple aggregations

1.2k Views Asked by At

I am sending JSON messages containing details about a web service request and response to a Kafka topic. I want to process each message as it arrives in Kafka using Kafka streams and send the results as a continuously updated summary(JSON message) to a websocket to which a client is connected.

The client will then parse the JSON and display the various counts/summaries on a web page.

Sample input messages are as below

{
  "reqrespid":"048df165-71c2-429c-9466-365ad057eacd",
  "reqDate":"30-Aug-2017",
  "dId":"B198693",
  "resp_UID":"N",
  "resp_errorcode":"T0001",
  "resp_errormsg":"Unable to retrieve id details. DB Procedure error",
  "timeTaken":11,
  "timeTakenStr":"[0 minutes], [0 seconds], [11 milli-seconds]",
  "invocation_result":"T"
}

{
  "reqrespid":"f449af2d-1f8e-46bd-bfda-1fe0feea7140",
  "reqDate":"30-Aug-2017",
  "dId":"G335887",
  "resp_UID":"Y",
  "resp_errorcode":"N/A",
  "resp_errormsg":"N/A",
  "timeTaken":23,
  "timeTakenStr":"[0 minutes], [0 seconds], [23 milli-seconds]",
  "invocation_result":"S"
}

{
  "reqrespid":"e71b802d-e78b-4dcd-b100-fb5f542ea2e2",
  "reqDate":"30-Aug-2017",
  "dId":"X205014",
  "resp_UID":"Y",
  "resp_errorcode":"N/A",
  "resp_errormsg":"N/A",
  "timeTaken":18,
  "timeTakenStr":"[0 minutes], [0 seconds], [18 milli-seconds]",
  "invocation_result":"S"
}

As the stream of messages comes into Kafka, I want to be able to compute on the fly

**

  • total number of requests i.e a count of all
  • total number of requests with invocation_result equal to 'S'
  • total number of requests with invocation_result not equal to 'S'
  • total number of requests with invocation_result equal to 'S' and UID equal to 'Y'
  • total number of requests with invocation_result equal to 'S' and UID equal to 'Y'
  • minimum time taken i.e. min(timeTaken)
  • maximum time taken i.e. max(timeTaken)
  • average time taken i.e. avg(timeTaken)

**

and write them out into a KStream with new key set to the reqdate value and new value a JSON message that contains the computed values as shown below using the 3 messages shown earlier

{
  "total_cnt":3, "num_succ":2, "num_fail":1, "num_succ_data":2, 
  "num_succ_nodata":0, "num_fail_biz":0, "num_fail_tech":1,
  "min_timeTaken":11, "max_timeTaken":23, "avg_timeTaken":17.3
}

Am new to Kafka streams. How do i do the multiple counts and by differing columns all in one or as a chain of different steps? Would Apache flink or calcite be more appropriate as my understanding of a KTable suggests that you can only have a key e.g. 30-AUG-2017 and then a single column value e.g a count say 3. I need a resulting table structure with one key and multiple count values.

All help is very much appreciated.

1

There are 1 best solutions below

1
On

You can just do a complex aggregation step that computes all those at once. I am just sketching the idea:

class AggResult {
    long total_cnt = 0;
    long num_succ = 0;
    // and many more
}

stream.groupBy(...).aggregate(
    new Initializer<AggResult>() {
        public AggResult apply() {
            return new AggResult();
        }
    },
    new Aggregator<KeyType, JSON, AggResult> {
        AggResult apply(KeyType key, JSON value, AggResult aggregate) {
            ++aggregate.total_cnt;
            if (value.get("success").equals("true")) {
                ++aggregate.num_succ;
            }
            // add more conditions to get all the other aggregate results
            return aggregate;
        }
    },
    // other parameters omitted for brevity
)
.to("result-topic");