How to create a KafkaRecord<String, GenericRecord> in Apache Beam Manually for Unit Tests

306 Views Asked by At

I'm doing an Apache Beam-based implementation, and data is taken from a Kafka stream into the pipeline through a KafkaIO. After reading the data, I have a few PTranforms to process the input data and I need to unit test the first PTranform that accepts a KafkaRecord<String, GenericRecord> as the input type.

I need to get some help/idea on how to create a KafkaRecord<String, GenericRecord> manually for unit tests. If it is not possible, any suggestion on workarounds is also acceptable.

Also apart from the main question, I have a unit test for another PTranform which validate PCollection<KV<GenericRecord, GenericRecord>> in a PAssert. But when I provide the necessary information into containsInAnyOrder(KV.of(.., ..)), it throws the below error,

Jan 27, 2023 12:03:21 AM org.junit.platform.launcher.core.EngineDiscoveryOrchestrator lambda$logTestDescriptorExclusionReasons$7
INFO: 0 containers and 2 tests were Method or class mismatch

Forbidden IOException when reading from InputStream
java.lang.IllegalArgumentException: Forbidden IOException when reading from InputStream
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:145)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:102)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:96)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:168)
    at org.apache.beam.runners.direct.CloningBundleFactory$CloningBundle.add(CloningBundleFactory.java:87)
    at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$OutputWindowedValueToBundle.outputWindowedValue(GroupAlsoByWindowEvaluatorFactory.java:257)
    at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$OutputWindowedValueToBundle.outputWindowedValue(GroupAlsoByWindowEvaluatorFactory.java:243)
    at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1058)
    at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
    at org.apache.beam.repackaged.direct_java.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
    at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061)
    at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:771)
    at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$GroupAlsoByWindowEvaluator.processElement(GroupAlsoByWindowEvaluatorFactory.java:190)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException
    at org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:73)
    at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:150)
    at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:59)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:142)

This is the code that throws the above error,

@Test
    void processEvent() throws InvocationTargetException, IllegalAccessException {
        Create.Values<EnterpriseModel> collection = Create.of(Arrays.asList(accountTerritory))
                .withCoder(SerializableCoder.of(EnterpriseModel.class));

        PCollectionTuple results = testPipeline
                .apply(collection)
                .apply("Test Build Message", ParDo.of(new BuildGenericKafkaMessage())
                .withOutputTags(BuildGenericKafkaMessage.territoryTag,
                        TupleTagList.of(Arrays.asList(BuildGenericKafkaMessage.userTerritoryTag, BuildGenericKafkaMessage.accountTerritoryTag))));



        PCollection<KV<GenericRecord, GenericRecord>> kvpCollection1 =
        results.get(BuildGenericKafkaMessage.accountTerritoryTag).setCoder(KvCoder.of(GenericRecordCoder.of(), GenericRecordCoder.of()));


        PAssert.that(kvpCollection1).containsInAnyOrder(
                KV.of(generateGenericRecord(this.accountKeySchema, this.accountTerritory), generateGenericRecord(this.accountSchema, this.accountTerritory))
        );
        testPipeline.run();
    }

No clear idea why this happens, appreciate some help here also. Thanks!
Note - Using Direct Runner for development and Flink for Production, the Java version is 11, IntelliJ, Generic record library org.apache.avro.generic.GenericRecord

0

There are 0 best solutions below