Imported Protobuf Message Type Support in Apache Flink

357 Views Asked by At

I am experiencing an issue when trying to using the "com.google.protobuf.Any" message type in my protobuf definitions along with the official "protobuf" deserializer provided by Flink in version 1.16.

It seems that the following message definition

syntax = "proto2";

option java_package = "com.company.model.proto";

import "google/protobuf/any.proto";

message MessageWrapper {
  optional google.protobuf.Any message = 1;
}

with the Flink table definition

tableEnv.executeSql("CREATE TABLE trade_records (" +
        "  kk_trade_id STRING," +
        "  message ROW<`value` BYTES>, " +
        "  PRIMARY KEY (kk_trade_id) NOT ENFORCED" +
        ") WITH (" +
        "  'connector' = 'upsert-kafka'," +
        "  'value.protobuf.message-class-name' = 'com.company.model.proto.Common$MessageWrapper'," +
        "  'key.format' = 'raw'," +
        "  'key.fields-prefix' = 'kk_'," +
        "  'value.fields-include' = 'EXCEPT_KEY'," +
        "  'value.format' = 'protobuf'" +
        ...
        ");"
);

triggers this error:

Caused by: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:265)
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:118)
    ... 14 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 20, Column 36: "com.company.model.proto.Common" declares no member type "Any"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:13014)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6873)

I have narrowed this down to an issue in deriving the correct type when Flink generates the source code to feed to the Janino compiler:

public class GeneratedProtoToRow_07c0170e88dd4e5ba9f3e2c33217562a{
...
com.company.model.proto.Common.Any message22 = message12.getMessage();

I believe this type should read com.google.protobuf.Any instead. Is there a known compiler option or some other workaround for this problem?

0

There are 0 best solutions below