Collecting a GPars loop to a Map

748 Views Asked by At

I need to iterate on a List and for every item run a time-expensive operation and then collect its results to a map, something like this:

List<String> strings = ['foo', 'bar', 'baz']
Map<String, Object> result = strings.collectEntries { key ->
    [key, expensiveOperation(key)]
}

So that then my result is something like

[foo: <an object>, bar: <another object>, baz: <another object>]

Since the operations i need to do are pretty long and don't depend on each other, I've been willing to investigate using GPars to run the loop in parallel.

However, GPars has a collectParallel method that loops through a collection in parallel and collects to a List but not a collectEntriesParallel that collects to a Map: what's the correct way to do this with GPars?

1

There are 1 best solutions below

1
Szymon Stepniak On BEST ANSWER

There is no collectEntriesParallel because it would have to produce the same result as:

collectParallel {}.collectEntries {}

as Tim mentioned in the comment. It's hard to make reducing list of values to map (or any other mutable container) in a deterministic way other than collecting results to a list in parallel and in the end collecting to map entries in a sequential manner. Consider following sequential example:

static def expensiveOperation(String key) {
    Thread.sleep(1000)
    return key.reverse()
}

List<String> strings = ['foo', 'bar', 'baz']


GParsPool.withPool {
    def result = strings.inject([:]) { seed, key ->
        println "[${Thread.currentThread().name}] (${System.currentTimeMillis()}) seed = ${seed}, key = ${key}"
        seed + [(key): expensiveOperation(key.toString())]
    }

    println result
}

In this example we are using Collection.inject(initialValue, closure) which is an equivalent of good old "fold left" operation - it starts with initial value [:] and iterates over all values and adds them as key and value to initial map. Sequential execution in this case takes approximately 3 seconds (each expensiveOperation() sleeps for 1 second).

Console output:

[main] (1519925046610) seed = [:], key = foo
[main] (1519925047773) seed = [foo:oof], key = bar
[main] (1519925048774) seed = [foo:oof, bar:rab], key = baz
[foo:oof, bar:rab, baz:zab]

And this is basically what collectEntries() does - it's kind of reduction operation where initial value is an empty map.

Now let's see what happens if we try to parallelize it - instead of inject we will use injectParallel method:

GParsPool.withPool {
    def result = strings.injectParallel([:]) { seed, key ->
        println "[${Thread.currentThread().name}] (${System.currentTimeMillis()}) seed = ${seed}, key = ${key}"
        seed + [(key): expensiveOperation(key.toString())]
    }

    println result
}

Let's see what is the result:

[ForkJoinPool-1-worker-1] (1519925323803) seed = foo, key = bar
[ForkJoinPool-1-worker-2] (1519925323811) seed = baz, key = [:]
[ForkJoinPool-1-worker-1] (1519925324822) seed = foo[bar:rab], key = baz[[:]:]:[]
foo[bar:rab][baz[[:]:]:[]:][:]:]:[[zab]

As you can see parallel version of inject does not care about the order (which is expected) and e.g. first thread received foo as a seed variable and bar as a key. This is what could happen if reduction to a map (or any mutable object) was performed in parallel and without specific order.

Solution

There are two ways to parallelize the process:

1. collectParallel + collectEntries combination

As Tim Yates mentioned in the comment you can parallel expensive operation execution and in the end collect results to a map sequentially:

static def expensiveOperation(String key) {
    Thread.sleep(1000)
    return key.reverse()
}

List<String> strings = ['foo', 'bar', 'baz']

GParsPool.withPool {
    def result = strings.collectParallel { [it, expensiveOperation(it)] }.collectEntries { [(it[0]): it[1]] }

    println result
}

This example executes in approximately 1 second and produces following output:

[foo:oof, bar:rab, baz:zab]

2. Java's parallel stream

Alternatively you can use Java's parallel stream with Collectors.toMap() reducer function:

static def expensiveOperation(String key) {
    Thread.sleep(1000)
    return key.reverse()
}

List<String> strings = ['foo', 'bar', 'baz']

def result = strings.parallelStream()
        .collect(Collectors.toMap(Function.identity(), { str -> expensiveOperation(str)}))

println result 

This example also executes in approximately 1 second and produces output like that:

[bar:rab, foo:oof, baz:zab]

Hope it helps.