Issue with Flink Job Failure when Using Custom Class as DataStreamSource Type

37 Views Asked by At

I'm relatively new to Flink and encountering an issue while working with Flink 1.18 in IntelliJ IDEA. I'm using openjdk@11. I've noticed that when I utilize built-in data types (such as Tuple2, Tuple3, Integer, etc.) as the type for my DataStreamSource, my program runs correctly.

public class DataStreamJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
                Tuple3.of(1, "aa1", 1),
                Tuple3.of(1, "aa2", 2),
                Tuple3.of(2, "bb", 1),
                Tuple3.of(3, "cc", 1)
        );
        source2.print();

        env.execute();
    }
}

enter image description here

However, when I change the data type of the DataStreamSource to a custom class (let's call it MyClass), the job execution fails.

class Ttest2 extends Tuple2<Integer, String> implements Serializable {
    public Ttest2(Integer first, String second) {
        super(first, second);
    }

    @Override
    public String toString() {
        return "QueryInfo{" +
                "queryId=" + f0 +
                ", queryValue=" + f1 +
                '}';
    }
}

public class DataStreamJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<Ttest2> source1 = env.fromElements(
                new Ttest2(1, "a1"),
                new Ttest2(1, "a2"),
                new Ttest2(2, "b"),
                new Ttest2(3, "c")
        );
        source1.print();
        DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
                Tuple3.of(1, "aa1", 1),
                Tuple3.of(1, "aa2", 2),
                Tuple3.of(2, "bb", 1),
                Tuple3.of(3, "cc", 1)
        );
        source2.print();

        env.execute();
    }
}

This is the error message when i run it:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: `Job execution failed.`
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
    at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
    at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
    at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
    at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
    at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
    at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
    at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
    at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
    at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
    at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
    at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
    at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629)
    at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34)
    at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)
    at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
    at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
    at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
    at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
    at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
    at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
    at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
    at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
    at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
    at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
    at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
    ... 5 more
Caused by: java.io.IOException: Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions.
Serializer is org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2fd14fc9
    at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:226)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:114)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
Caused by: java.lang.RuntimeException: Cannot instantiate tuple.
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.instantiateRaw(TupleSerializer.java:168)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:31)
    at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:224)
    ... 3 more
Caused by: java.lang.InstantiationException: org.example.Ttest2
    at java.base/java.lang.Class.newInstance(Class.java:571)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.instantiateRaw(TupleSerializer.java:166)
    ... 6 more
Caused by: java.lang.NoSuchMethodException: org.example.Ttest2.<init>()
    at java.base/java.lang.Class.getConstructor0(Class.java:3349)
    at java.base/java.lang.Class.newInstance(Class.java:556)
    ... 7 more

The project is built using Maven. Here is my dependency file pom.xml. I suspect that the issue might be due to some missing crucial dependencies.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>FlinkTestJava11</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <flink.version>1.18.0</flink.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

</project>

It's worth mentioning that I'm only running test code locally, yet the issue persists. I've encountered this problem and I'm unsure about the root cause or how to resolve it. Could someone guide me on how to tackle this issue effectively?

Thank you in advance.

1

There are 1 best solutions below

0
kkrugler On

I've never tried extending a Tuple class, which might have its own issues (usually it's safer to wrap a class, via delegation).

But in any case your Ttest2 class needs a no-args constructor.