Hazelcast Jet - how to use a non-static method in a Jet pipeline

88 Views Asked by At

I have a basic pipeline below. In one of the steps I want to transform an object by calling a method from a service as below. But Jet throws an error saying that this mapFn is not serializable. What to do here? It works perfectly fine for static methods.

p.readFrom(source)
     .map(r -> dataTransformer.transformRecord(r))// dataTransformer is a service
     .writeTo(Sinks.filesBuilder(userHome).build());
1

There are 1 best solutions below

0
On BEST ANSWER

Use mapUsingService and create the service using the ServiceFactory:

p.readFrom(source)
 .mapUsingService(
    ServiceFactories.sharedService(pctx -> new DataTransformer()),
    (dataTransformer, r) -> dataTransformer.transformRecord(r))
 ...

Alternatively, if your service is serializable and stateless, you can copy it to a local variable:

DataTransformer dataTransformerLocal = dataTransformer;
p.readFrom(source)
     .map(r -> dataTransformerLocal.transformRecord(r))
     .writeTo(Sinks.filesBuilder(userHome).build());