I have stream of protobuf messages with one type AccountHolder.proto with {name,balance,time} as fields and doing some aggregations on the data content and writing the result as protobuf message of another format ie,pojo of ProcessedStream.proto {count,balance,time}. In StreamOperation.java, at aggregate() its giving class cast exception. Attaching the java classes. I am new to kafka stream, any help would be greatly appreciated.
StreamOperation.java
{
KafkaProtobufSerde<AccountHolderOuterClass.AccountHolder> accountHolderSerde = serde.accHolderSerde(props);
KafkaProtobufSerde<AccountHolderOuterClass.AccountHolder> processedStreamSerde = serde.accHolderSerde(props);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, AccountHolderOuterClass.AccountHolder> stream = builder.stream("streams-test-app-input", Consumed.with(Serdes.String(),accountHolderSerde));
AccountHolderOuterClass.AccountHolder initialValues = AccountHolderOuterClass.AccountHolder.newBuilder()
.setAmount(0).setName(null).setTime(Instant.ofEpochMilli(0L).toString())
.build();
KTable<String, ProcessedStreamOuterClass.ProcessedStream> bankbalance = stream.groupByKey(Grouped.with(Serdes.String(),accountHolderSerde))
.aggregate(() -> initialValues,(key, transaction, balance) -> balanceCalc(transaction, balance), Named.as("balance-agg"),Materialized.with(Serdes.String(),accountHolderSerde));
bankbalance.toStream().to("bank-trans-output",Produced.with(Serdes.String(),processedStreamSerde));
KafkaStreams streams = new KafkaStreams(builder.build(),props);
streams.start();
}
private static ProcessedStreamOuterClass.ProcessedStream balanceCalc(AccountHolderOuterClass.AccountHolder transaction, AccountHolderOuterClass.AccountHolder balance){
Long bal_time = Instant.parse(balance.getTime()).toEpochMilli();
Long trans_time = Instant.parse(transaction.getTime()).toEpochMilli();
Instant max = Instant.ofEpochMilli(Math.max(bal_time,trans_time));
int count = 1;
count++;
ProcessedStreamOuterClass.ProcessedStream totalBalance = ProcessedStreamOuterClass.ProcessedStream.newBuilder()
.setBalance(balance.getAmount()+transaction.getAmount())
.setCount(Integer.toString(count))
.setTime(max.toString())
.build();
return totalBalance;
}
`AccountHolder.proto`
syntax = "proto3";
package holder;
option java_package = "dk.danske.model.accholder";
message AccountHolder{
string name=1;
int32 amount=2;
string time=3;
}
`ProtobufSerde.java`
public KafkaProtobufSerde<AccountHolderOuterClass.AccountHolder> accHolderSerde(Properties envProps) {
final KafkaProtobufSerde<AccountHolderOuterClass.AccountHolder> protobufSerde = new KafkaProtobufSerde<>();
Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
protobufSerde.configure(serdeConfig, false);
return protobufSerde;
}
public KafkaProtobufSerde<ProcessedStreamOuterClass.ProcessedStream> processedStreamSerde(Properties envProps) {
final KafkaProtobufSerde<ProcessedStreamOuterClass.ProcessedStream> protobufSerde = new KafkaProtobufSerde<>();
Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
protobufSerde.configure(serdeConfig, false);
return protobufSerde;
}
`KafkaProducer.java`
Properties props = new Properties();
props.put("bootstrap.servers", "http://127.0.0.1:9200");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://localhost:3200");
KafkaProducer<String, AccountHolderOuterClass.AccountHolder> producer = new KafkaProducer<>(props);
List<String> nameList = new ArrayList<>();
nameList.add("Johny");
nameList.add("Thomas");
nameList.add("Roger");
nameList.add("Micheal");
nameList.add("William");
nameList.add("Jonas");
nameList.add("Mike");
nameList.add("Kim");
for( String name:nameList){
producer.send(createTransaction(name));
Thread.sleep(100);
log.info("proto message sent to kafka producer",createTransaction(name));
}
producer.flush();
producer.close();
}
private static ProducerRecord<String, AccountHolderOuterClass.AccountHolder> createTransaction(String name) {
AccountHolderOuterClass.AccountHolder accountHolder = AccountHolderOuterClass.AccountHolder.newBuilder()
.setAmount(ThreadLocalRandom.current().nextInt(0,9000))
.setName(name)
.setTime(Instant.now().toString())
.build();
return new ProducerRecord<>(TOPIC,name,accountHolder);
}
`pom.xml`
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-protobuf-serde</artifactId>
<version>7.5.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.24.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>5.5.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.1.0.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocVersion>3.1.0</protocVersion>
<inputDirectories>
<include>src/main/resources</include>
</inputDirectories>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Expecting: map the content of AccountHolder to ProcessedStream after aggregation of total balance amount in AccountHolder