Java Stage-based Processing Implementation

409 Views Asked by At

There's some domain knowledge/business logic baked into the problem I'm trying to solve but I'll try to boil it down to the basics as much as possible.

Say I have an interface defined as follows:

public interface Stage<I, O> {
    StageResult<O> process(StageResult<I> input) throws StageException;
}

This represents a stage in a multi-stage data processing pipeline, my idea is to break the data processing steps into sequential (non-branching) independent steps (such as read from file, parse network headers, parse message payloads, convert format, write to file) represented by individual Stage implementations. Ideally I'd implement a FileInputStage, a NetworkHeaderParseStage, a ParseMessageStage, a FormatStage, and a FileOutputStage, then have some sort of

Stage<A, C> compose(Stage<A, B> stage1, Stage<B, C> stage2);

method such that I can eventually compose a bunch of stages into a final stage that looks like FileInput -> FileOutput.

Is this something (specifically the compose method, or a similar mechanism for aggregating many stages into one stage) even supported by the Java type system? I'm hacking away at it now and I'm ending up in a very ugly place involving reflection and lots of unchecked generic types.

Am I heading off in the wrong direction or is this even a reasonable thing to try to do in Java? Thanks so much in advance!

2

There are 2 best solutions below

0
On

You didn't post enough implementation details to show where the type safety issues are but here is my throw on how you could address the problem:

First dont make the whole thing too generic, make your satges specific reguarding their inputs and outputs

Then create a composit stage which implements Stage and combines two stages into one final result.

Here is a very simpele implementatiom

public class StageComposit<A, B, C> implements Stage<A, C> {

    final Stage<A, B> stage1;
    final Stage<B, C> stage2;

    public StageComposit(Stage<A, B> stage1, Stage<B, C> stage2) {
        this.stage1 = stage1;
        this.stage2 = stage2;
    }

    @Override
    public StageResult<C> process(StageResult<A> input) {
        return stage2.process(stage1.process(input));
    }
}

Stage result

public class StageResult<O> {
    final O result;

    public StageResult(O result) {
        this.result = result;
    }

    public O get() {
        return result;
    }
}

Example specific Stages:

public class EpochInputStage implements Stage<Long, Date> {

    @Override
    public StageResult<Date> process(StageResult<Long> input) {
        return new StageResult<Date>(new Date(input.get()));
    }
}

public class DateFormatStage implements Stage<Date, String> {

    @Override
    public StageResult<String> process(StageResult<Date> input) {
        return new StageResult<String>(
                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                    .format(input.get()));
    }
}


public class InputSplitStage implements Stage<String, List<String>> {

    @Override
    public StageResult<List<String>> process(StageResult<String> input) {
        return new StageResult<List<String>>(
                Arrays.asList(input.get().split("[-:\\s]")));
    }
}

And finally a small test demonstrating how to comibine all

public class StageTest {

    @Test
    public void process() {

        EpochInputStage efis = new EpochInputStage();
        DateFormatStage dfs = new DateFormatStage();
        InputSplitStage iss = new InputSplitStage();

        Stage<Long, String> sc1 = 
                new StageComposit<Long, Date, String>(efis, dfs);

        Stage<Long, List<String>> sc2 = 
                new StageComposit<Long, String, List<String>>(sc1, iss);

        StageResult<List<String>> result = 
                sc2.process(new StageResult<Long>(System.currentTimeMillis()));

        System.out.print(result.get());
    }
}

Output for current time would be a list of strings

[2015, 06, 24, 16, 27, 55]

As you see no type safety issues or any type castings. When you need to handle other types of inputs and outputs or convert them to suite the next stage just write a new Stage and hook it up in your stage processing chain.

1
On

You may want to consider using a composite pattern or a decorator pattern. For the decorator each stage will wrap or decorate the previous stage. To do this you have each stage implement the interface as you are doing allow a stage to contain another stage.

Decorator Pattern

The process() method does not need to accept a StageResult parameter anymore since it can call the contained Stage's process() method itself, get the StageResult and perform its own processing, returning another StageResult.

One advantage is that you can restructure your pipeline at run time.

Each Stage that may contain another can extend the ComposableStage and each stage that is an end point of the process can extend the LeafStage. Note that I just used those terms to name the classes by function but you can create more imaginative names.