Why my code runs so much slower with joblib.Parallel() than without?

1.2k Views Asked by At

I am new to use joblib.Parallel() to speed up some massive numpy.fft calculations.

I follow this example presented on joblib-web

Using the example, I can see following result on my computer:

Elapsed time computing the average of couple of slices 1.69 s
Elapsed time computing the average of couple of slices 2.64 s
Elapsed time computing the average of couple of slices 0.40 s
Elapsed time computing the average of couple of slices 0.26 s 

They look very good! Then I change data[s1].mean() to np.fft.fft( data[s1] ), see following code:

    import numpy as np

    data = np.random.random((int(2**24),))
    window_size = int(256)
    slices = [slice(start, start + window_size)
          for start in range(0, data.size - window_size, window_size)]
    len(slices)

    import time

    def slow_FFT(data, sl):
        return np.fft.fft(data[sl])

    tic = time.time()
    results = [slow_FFT(data, sl) for sl in slices]
    toc = time.time()
    print('\nElapsed time computing the average of couple of slices {:.2f} s'
      .format(toc - tic))
    np.shape(results)


    from joblib import Parallel, delayed

    tic = time.time()
    results2 = Parallel(n_jobs=4)(delayed(slow_FFT)(data, sl) for sl in slices)
    toc = time.time()
    print('\nElapsed time computing the average of couple of slices {:.2f} s'
      .format(toc - tic))


    import os
    from joblib import dump, load, Parallel

    folder = './joblib5_memmap'
    try:
        os.mkdir(folder)
    except FileExistsError:
        pass
    data_filename_memmap = os.path.join(folder, 'data_memmap')
    dump(data, data_filename_memmap)
    data = load(data_filename_memmap, mmap_mode='r')

    tic = time.time()
    results3 = Parallel(n_jobs=4)(delayed(slow_FFT)(data, sl) for sl in slices)
    toc = time.time()
    print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
      .format(toc - tic))

    def slow_FFT_write_output(data, sl, output, idx):
        res_ = np.fft.fft(data[sl])
        output[idx,:] = res_

    output_filename_memmap = os.path.join(folder, 'output_memmap')
    output = np.memmap(output_filename_memmap, dtype=np.cdouble,shape= 
       (len(slices),window_size), mode='w+')
    data = load(data_filename_memmap, mmap_mode='r')

    tic = time.time()
    _ = Parallel(n_jobs=4)(delayed(slow_FFT_write_output)
        (data, sl, output, idx) for idx, sl in enumerate(slices))
    toc = time.time()
    print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
      .format(toc - tic))

    print(np.allclose(np.array(results),output))

I do not see speedup with 4 cores in "Writable memmap for shared memory"

First, we will evaluate the sequential computing on our problem:

Elapsed time computing the average of couple of slices 0.62 s

joblib.Parallel() is used to compute in parallel the average of all slices using 4 workers:

Elapsed time computing the average of couple of slices 4.29 s

Parallel processing is already faster than the sequential processing. It is also possible to remove a bit of overhead by dumping the data array to a memmap and pass the memmap to joblib.Parallel():

Elapsed time computing the average of couple of slices 1.94 s

Writable memmap for shared memory:

Elapsed time computing the average of couple of slices 1.46 s
True

Can someone help me "why"? Many thanks in advance!

1

There are 1 best solutions below

2
On

Q :
" Can someone help me "why"? "

A :
Sure, your code has acquired IMMENSE add-on overhead-costs, and it keeps repeating collecting 65536 x ( that many times!!) the :
SER / xfer / DES add-on costs ( [SPACE]-wise as RAM allocations + [TIME]-wise CPU + RAM-I/O delays )
to again and again serialise + transfer p2p + deserialise a block of 1.1 [GB] of the same data RAM to RAM

pass;                                        tic = time.time()
#||||||||||||||||||||||||||||||||||||||||| # CRITICAL SECTION 
results3 = Parallel( n_jobs = 4            # 1.spawn 4 process replicas
                     )( delayed( slow_FFT  # + keep
                                 )( data,  #   feeding them with
                                    sl )   #   <_1.1_GB_data_> + <_sl_>-Objects
                            for     sl     #   for each slice
                                 in slices #       from slices
                        )                  #   again and again 65k+ times
#||||||||||||||||||||||||||||||||||||||||| # CRITICAL SECTION +72 [TB] DATA-FLOW RAM-I/O PAIN
pass;                                        toc = time.time()

This "low-cost" SLOC with using an iterator syntax sugar is punished by doing awfully lot unproductive work, thus not doing the only useful one.

Refactor the strategy, to pay SER/xfer/DES add-on costs just once (during the instantiation of the n_jobs-processes, which is done anyway)
and never pass data, that are already "known" inside all the n_jobs copied Python Interpreter processes. Best formulate ad-hoc iterators to work inside the "remote" workers autonomously on large blocks, defined via the smart call-signature, being called just once
( and not as many as 65536 x )

def smartFFT( aTupleOfStartStopShiftINDEX = ( 0, -FFT_WINDOW_SIZE, 1 ) ):
    global FFT_WINDOW_SIZE
    global DATA_IN
    #------------------------
    # compute all FFT-results
    #         for "known" DATA_IN,
    #         for each block from aTupleOfStartStopShiftINDEX[0]
    #                        till aTupleOfStartStopShiftINDEX[1]
    #                 shifting by aTupleOfStartStopShiftINDEX[2]
    #                     of size FFT_WINDOW_SIZE
    #------------------------prefer powers of Numpy vectorized code
    #------------------------best with using smart-striding-tricks
    return block_of_RESULTS_at_once

and next just :

pass;                                        tic = time.time()
#||||||||||||||||||||||||||||||||||||||||| # CRITICAL SECTION 
results3 = Parallel( n_jobs = 4            # 1.spawn 4 process replicas
                     )( delayed( smartFFT  # + keep
                                 )( iTup ) #   feeding them with
                            for     iTup   #       just iTup tuple
                                 in iTuples#
                        )                  #   just n_jobs ~ 4 times
#||||||||||||||||||||||||||||||||||||||||| # CRITICAL SECTION +0 [kB] DATA-FLOW
pass;                                        toc = time.time()

So WHY?

You simply paid so much on add-on overhead costs, that there is no speed-up possible at all, as Amdahl's Law has explained, once we stop closing eyes from or forgetting about all the add-on costs, accrued on the "extending" the code so as to start working, at least somehow, in parallel ( the atomicity of work being the second important update of the classical formula, not to use it against the nature of the flow of the work-packages across the real-world devices ( processors or networks of processors ) ).

Paying more costs than receiving in speed-up - this is "The WHY" part. Never mind - many times repeated flaw - just can check how often )