Kafka stream with protobuf messages

87 Views Asked by At

I have stream of protobuf messages with one type AccountHolder.proto with {name,balance,time} as fields and doing some aggregations on the data content and writing the result as protobuf message of another format ie,pojo of ProcessedStream.proto {count,balance,time}. In StreamOperation.java, at aggregate() its giving class cast exception. Attaching the java classes. I am new to kafka stream, any help would be greatly appreciated.

StreamOperation.java 
{
KafkaProtobufSerde<AccountHolderOuterClass.AccountHolder> accountHolderSerde = serde.accHolderSerde(props);
        KafkaProtobufSerde<AccountHolderOuterClass.AccountHolder> processedStreamSerde = serde.accHolderSerde(props);
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, AccountHolderOuterClass.AccountHolder> stream = builder.stream("streams-test-app-input", Consumed.with(Serdes.String(),accountHolderSerde));
        AccountHolderOuterClass.AccountHolder initialValues = AccountHolderOuterClass.AccountHolder.newBuilder()
                .setAmount(0).setName(null).setTime(Instant.ofEpochMilli(0L).toString())
                .build();

        KTable<String, ProcessedStreamOuterClass.ProcessedStream> bankbalance = stream.groupByKey(Grouped.with(Serdes.String(),accountHolderSerde))
                .aggregate(() -> initialValues,(key, transaction, balance) -> balanceCalc(transaction,  balance), Named.as("balance-agg"),Materialized.with(Serdes.String(),accountHolderSerde));

        bankbalance.toStream().to("bank-trans-output",Produced.with(Serdes.String(),processedStreamSerde));

        KafkaStreams streams = new KafkaStreams(builder.build(),props);
        streams.start();
}
private static ProcessedStreamOuterClass.ProcessedStream balanceCalc(AccountHolderOuterClass.AccountHolder transaction, AccountHolderOuterClass.AccountHolder balance){

        Long bal_time = Instant.parse(balance.getTime()).toEpochMilli();
        Long trans_time = Instant.parse(transaction.getTime()).toEpochMilli();
        Instant max = Instant.ofEpochMilli(Math.max(bal_time,trans_time));

        int count = 1;
        count++;
        ProcessedStreamOuterClass.ProcessedStream totalBalance = ProcessedStreamOuterClass.ProcessedStream.newBuilder()
                .setBalance(balance.getAmount()+transaction.getAmount())
                .setCount(Integer.toString(count))
                .setTime(max.toString())
                .build();

        return totalBalance;

    }

`AccountHolder.proto`
syntax = "proto3";
package holder;
option java_package = "dk.danske.model.accholder";

message AccountHolder{
  string name=1;
  int32 amount=2;
  string time=3;

}
`ProtobufSerde.java`

 public KafkaProtobufSerde<AccountHolderOuterClass.AccountHolder> accHolderSerde(Properties envProps) {
        final KafkaProtobufSerde<AccountHolderOuterClass.AccountHolder> protobufSerde = new KafkaProtobufSerde<>();
        Map<String, String> serdeConfig = new HashMap<>();
        serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
        protobufSerde.configure(serdeConfig, false);
        return protobufSerde;
    }
public KafkaProtobufSerde<ProcessedStreamOuterClass.ProcessedStream> processedStreamSerde(Properties envProps) {
        final KafkaProtobufSerde<ProcessedStreamOuterClass.ProcessedStream> protobufSerde = new KafkaProtobufSerde<>();
        Map<String, String> serdeConfig = new HashMap<>();
        serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
        protobufSerde.configure(serdeConfig, false);
        return protobufSerde;
    }
`KafkaProducer.java`


 Properties props = new Properties();
        props.put("bootstrap.servers", "http://127.0.0.1:9200");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");

        props.put("schema.registry.url", "http://localhost:3200");

        KafkaProducer<String, AccountHolderOuterClass.AccountHolder> producer = new KafkaProducer<>(props);

        List<String> nameList = new ArrayList<>();
        nameList.add("Johny");
        nameList.add("Thomas");
        nameList.add("Roger");
        nameList.add("Micheal");
        nameList.add("William");
        nameList.add("Jonas");
        nameList.add("Mike");
        nameList.add("Kim");

        for( String name:nameList){
            producer.send(createTransaction(name));
            Thread.sleep(100);
            log.info("proto message sent to kafka producer",createTransaction(name));
        }
       producer.flush();
        producer.close();
  }

    private static ProducerRecord<String, AccountHolderOuterClass.AccountHolder> createTransaction(String name) {



        AccountHolderOuterClass.AccountHolder accountHolder = AccountHolderOuterClass.AccountHolder.newBuilder()

                                                              .setAmount(ThreadLocalRandom.current().nextInt(0,9000))
                                                              .setName(name)
                                                              .setTime(Instant.now().toString())
                                                               .build();

        return new ProducerRecord<>(TOPIC,name,accountHolder);


    }
`pom.xml`

 <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-streams-protobuf-serde</artifactId>
            <version>7.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.9</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>2.0.9</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.24.0</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-protobuf-serializer</artifactId>
            <version>5.5.1</version>
        </dependency>
 </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>17</source>
                    <target>17</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>com.github.os72</groupId>
                <artifactId>protoc-jar-maven-plugin</artifactId>
                <version>3.1.0.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>run</goal>
                        </goals>
                        <configuration>
                            <protocVersion>3.1.0</protocVersion>
                            <inputDirectories>
                                <include>src/main/resources</include>
                            </inputDirectories>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Expecting: map the content of AccountHolder to ProcessedStream after aggregation of total balance amount in AccountHolder

0

There are 0 best solutions below