Pandas-Dataframe Parallel Apply (Swifter, TQDM::process_map) Freezes? when called

1.2k Views Asked by At

I have a dataframe with ~15k paths to audio files with on which I want to perform an operation (artificially add noise). Generally the whole thing works, but takes a long time even with less (16) records. The problem is not the execution time of the function but the time until all is initialized.

start = time.time()
data_augmented = data_augmented.swifter.progress_bar(True, desc="Merge Sounds") \
        .apply(merge_sounds(**settings), axis=1)
print(f"{time.time-(start)} - Map Timer")
Merge Sounds: 100%|█████████████████████████████| 16/16 [00:07<00:00,  2.09it/s]
26.973325729370117 - Map Timer

As you can see here, the initialization takes almost 4 times as long as the runtime of the Lambda Function (merge_sounds). With initialization-time im referring to elapsed_time_measured_by_myself - elapsed_time_measured_by_tqdm so in this case 26.97.. - 7 = 19.97


    start = time.time()
    lambda_fn = merge_sounds(**settings) #doesnt work if i put it in the line below.
    data_augmented = process_map(lambda_fn, data_augmented, max_workers=threads,
                                     desc=f"Merge_sounds [{threads} Threads]")
    print(f"{time.time-(start)} - Map Timer")

is stuck at:

Merge_sounds [16 Threads]:   0%|                         | 0/16 [00:00<?, ?it/s]

    with Pool(processes=16) as pool:
        data_augmented = pool.map(merge_sounds(**settings), tqdm(data_augmented, desc=f"Merge Sounds: {16} Threads"))

is stuck at:

Merge Sounds: 16 Threads:  38%|██████          | 6/16 [00:00<00:00, 4697.75it/s]

I know that parallelization doesn't make sense for smaller datasets, I'm just puzzled why I can easily parallelize everything everywhere in the code and I just can't get ahead here. I later run this code on alot of data so if parallelism would work, i would be very happy.

The Function used within Map is:

def merge_sounds(**settings):
    _range = settings.get("snr_range", (0.15, 0.65))
    assert len(_range), "snr_range -> e.g. (0.15, 0.75)"
    target_sample_rate = settings.get("target_sample_rate", "16000")

    if "target_path" not in settings.keys():
        raise Exception("please Specify target_path in Settings-Dict")
    target_path = Path(settings["target_path"])
    target_path.mkdir(parents=True, exist_ok=True)

    def __call__(item):
        _target_path = item["path_augmented"]

        snr = round(uniform(_range[0], _range[1]), 4)
        pad_idx = item.name

        yp, _ = IO.load(item["path"], sample_rate=target_sample_rate)
        yn, _ = IO.load(item["path_noise"], sample_rate=target_sample_rate)
        item["snr"] = snr

        y_augmented = Effect.add_noise(yp, yn, snr=snr, pad_idx=pad_idx)
        IO.save_wav(y_augmented, _target_path, target_sample_rate)
        return item

    return __call__

Is there anything im Forghetting to Parallelise the Map Function (seems to work with one of this variations everywhere else in my code just as excpected)

Ty in advanced.

1

There are 1 best solutions below

0
On

If fixed this Issue by Bypassing TQDM. I zipped the Columns i needed something like this

    _paths_in = _df["path_input"]
    _paths_out = _df["path_output"]
    _path_noise = _df["path_noise"]
    job = zip(_paths_in, _path_noise, _paths_out, _filter_jobs)

and then i just passed this to MultiProcessor Function.

    jobs = list(enumerate((zip_jobs(df))))

    with Pool(processes=_threads) as pool:
        data_augmented = pool.map(execute_job, tqdm(jobs, desc=f"Audio-Augmentation: {_threads} Threads"))

merge_sounds and the new execute_jobare similar, it just changed, what parameter the function needs.