Large Dask Processes Fail When Creating and Storing DataFrame

368 Views Asked by At

I have a number of image files that I'm running a face recognition model on, in order to generate a Dask Dataframe of facial encodings, the file paths for the images that contain each face, and the coordinates in the image of each face. Because I have a huge number of photos, I'm using Dask to parallelize the computation. Since I'm using a pre-built model, my code is fairly straightforward:

import argparse
import logging
import multiprocessing as mp
from pathlib import Path
from typing import Optional, Union

import dask
import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client, LocalCluster
from face_recognition import face_encodings, face_locations, load_image_file
from tqdm import tqdm


INDEX_COL = "person_id"
FILENAME_COL = "filename"
FACIAL_COORDS_COLUMNS = [f"facial_coords_{i}" for i in range(4)]
FACIAL_ENCODING_COLUMNS = [f"facial_encoding_{i}" for i in range(128)]

PERSONS_COLUMNS = (
    [INDEX_COL, FILENAME_COL]
    + FACIAL_COORDS_COLUMNS
    + FACIAL_ENCODING_COLUMNS
)

PERSONS_META = {"person_id": "f8", "filename": "object"}
PERSONS_META.update({name: "f8" for name in FACIAL_COORDS_COLUMNS})
PERSONS_META.update({name: "f8" for name in FACIAL_ENCODING_COLUMNS})


def create_fake_df() -> pd.DataFrame:

    fake_row = [111, "/path/to/file"]
    fake_row += [5.0 for _ in range(4)]
    fake_row += [5.0 for _ in range(128)]

    fake_df = pd.DataFrame([fake_row], columns=PERSONS_COLUMNS)
    fake_df.drop(0, inplace=True)
    return fake_df

FAKE_PERSONS_DF = create_fake_df()


logging.basicConfig(
    format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    filename="/face-encodings/logs/ingest.log",
)

logger = logging.getLogger(__name__)
CPUS = mp.cpu_count()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Script to ingest vk photos, encode faces, and find duplicate faces"
    )

    parser.add_argument(
        "img_path", help="Parent folder of all images to be ingested", type=Path
    )
    parser.add_argument(
        "enc_path",
        help="Folder to save all encodings in. Do not supply a path with a filename",
        type=Path,
    )
    parser.add_argument(
        "dist_path",
        help="Folder to save distance matrix to. Do not supply a path with a filename",
        type=Path,
    )
    parser.add_argument(
        "--npartitions",
        help="Number of partitions for saving encodings datafame",
        type=int,
        default=5000,
    )

    return parser.parse_args()

def create_person_id(row: pd.Series) -> int:
    return hash(row["filename"] + str(row[FACIAL_COORDS_COLUMNS].tolist()))


@dask.delayed
def dataframe_from_one_image(image: Path) -> pd.DataFrame:
    image_array = load_image_file(image)
    locations = face_locations(image_array)
    if len(locations) == 0:
        return FAKE_PERSONS_DF
    encodings = face_encodings(image_array, locations)

    locs_df = pd.DataFrame(locations, columns=FACIAL_COORDS_COLUMNS)
    encodings_df = pd.DataFrame(encodings, columns=FACIAL_ENCODING_COLUMNS)
    new_df = pd.concat([locs_df, encodings_df])
    new_df["filename"] = str(image)
    new_df["person_id"] = new_df.apply(create_person_id, axis=1)
    new_df = new_df[PERSONS_COLUMNS]
    return new_df

def encode_all_images(
    images_path: Union[str, Path],
    save_path: Union[str, Path],
    npartitions: int,
    client: Client,
    num_images: Optional[int] = None,
) -> dd.DataFrame:

    images_path = Path(images_path)

    if not images_path.exists():
        raise ValueError("Your images path does not exist")
    save_path = Path(save_path)

    if not save_path.exists():
        logger.warning(
            f"Save path for encodings did not exist. Created new path at {save_path}"
        )
        save_path.mkdir(parents=True)

    try:
        existing_df = dd.read_parquet(save_path)  # type: Optional[dd.DataFrame]
    except IndexError:
        existing_df = None

    try:
        filenames = set(dd.read_parquet(save_path)["filename"].unique().compute())
    except (OSError, IndexError):
        filenames = set()

    delayed_dfs = [
        dataframe_from_one_image(image_path)
        for image_path in tqdm(images_path.rglob("*.jp*g"))
        if (not image_path.stem.startswith("._") and str(image_path) not in filenames)
    ]
    persons_df: dd.DataFrame = dd.from_delayed(delayed_dfs, meta=FAKE_PERSONS_DF)

    if existing_df is not None:
        persons_df = existing_df.append(persons_df, ignore_index=True)

    persons_df = persons_df.repartition(npartitions)

    persons_df.to_parquet(save_path, write_index=False, schema="infer")
    return persons_df


if __name__ == "__main__":

    args = parse_args()
    cluster = LocalCluster(n_workers=CPUS)
    client = Client(cluster)

    persons = encode_all_images(
        args.img_path, args.enc_path, args.npartitions, client=client
    )

However, when I run this code, I get the following slew of errors:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7f954d3aab80>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:44785', 46, Status.running, stored: 314, running: 1/1, ready: 8283, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7f79e06120d0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7f79e05dedf0>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:39541', 77, Status.running, stored: 300, running: 1/1, ready: 8297, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7f79e06120d0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7f79e05dedf0>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:39541', 77, Status.running, stored: 349, running: 1/1, ready: 8248, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7f521f5bc9d0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7f521f5bdf40>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:40467', 109, Status.running, stored: 358, running: 1/1, ready: 8239, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7f6f700a64c0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7f6f70086d60>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:46095', 81, Status.running, stored: 354, running: 1/1, ready: 8243, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7fc7caea50d0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7fc7caf28ac0>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:38255', 84, Status.running, stored: 365, running: 1/1, ready: 8232, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7f954d3abd30>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7f954d3aab80>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:44785', 46, Status.running, stored: 370, running: 1/1, ready: 8227, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7f50ddefe9d0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7f50ddeffb50>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:37871', 104, Status.running, stored: 361, running: 1/1, ready: 8236, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7f954d3abd30>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7f954d3aab80>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:44785', 46, Status.running, stored: 420, running: 1/1, ready: 8177, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7fc7caea50d0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7fc7caf28ac0>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:38255', 84, Status.running, stored: 410, running: 1/1, ready: 8187, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7f79e06120d0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7f79e05dedf0>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:39541', 77, Status.running, stored: 383, running: 1/1, ready: 8214, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7f6f700a64c0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7f6f70086d60>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:46095', 81, Status.running, stored: 420, running: 1/1, ready: 8177, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7f521f5bc9d0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7f521f5bdf40>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:40467', 109, Status.running, stored: 400, running: 1/1, ready: 8197, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7f50ddefe9d0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
           └ <tornado.ioloop.PeriodicCallback object at 0x7f50ddeffb50>
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 648, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
            └ <Worker: 'tcp://127.0.0.1:37871', 104, Status.running, stored: 418, running: 1/1, ready: 8179, comm: 0, waiting: 0>
  File "/usr/local/lib/python3.8/site-packages/distributed/batched.py", line 146, in send
    raise CommClosedError
          └ <class 'distributed.comm.core.CommClosedError'>
distributed.comm.core.CommClosedError

Traceback (most recent call last):
  File "src/face_encodings/db/ingest.py", line 200, in <module>
    persons = encode_all_images(
  File "src/face_encodings/db/ingest.py", line 156, in encode_all_images
    persons_df.to_parquet(save_path, write_index=False, schema="infer")
    │                     └ PosixPath('/face_encodings/data/encodings')
    └ Dask DataFrame Structure:
                 person_id filename facial_coords_0 facial_coords_1 facial_coords_2 facial_coords_3 fa...
  File "/usr/local/lib/python3.8/site-packages/dask/dataframe/core.py", line 3974, in to_parquet
    return to_parquet(self, path, *args, **kwargs)
           │          │     │      │       └ {'write_index': False, 'schema': 'infer'}
           │          │     │      └ ()
           │          │     └ PosixPath('/face_encodings/data/encodings')
           │          └ Dask DataFrame Structure:
                 person_id filename facial_coords_0 facial_coords_1 facial_coords_2 facial_coords_3 fa...
           └ <function to_parquet at 0x7f158dfab3a0>
  File "/usr/local/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 508, in to_parquet
    out = out.compute(**compute_kwargs)
    │     │             └ {}
    │     └ Delayed('write_metadata-10406578-f252-494b-81ea-c7449ae7cff2')
    └ Delayed('write_metadata-10406578-f252-494b-81ea-c7449ae7cff2')
  File "/usr/local/lib/python3.8/site-packages/dask/base.py", line 167, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                │       │                       └ {}
                │       └ Delayed('write_metadata-10406578-f252-494b-81ea-c7449ae7cff2')
                └ <function compute at 0x7f163c408a60>
  File "/usr/local/lib/python3.8/site-packages/dask/base.py", line 452, in compute
    results = schedule(dsk, keys, **kwargs)
              │        │    │       └ {}
              │        │    └ [['write_metadata-10406578-f252-494b-81ea-c7449ae7cff2']]
              │        └ {'write_metadata-10406578-f252-494b-81ea-c7449ae7cff2': (<function apply at 0x7f163c6be160>, <function ArrowEngine.write_metadat...
              └ <bound method Client.get of <Client: 'tcp://127.0.0.1:33577' processes=106 threads=106, memory=711.35 GB>>
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 2725, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
              │           │                    │                    └ None
              │           │                    └ None
              │           └ [[<Future: cancelled, key: write_metadata-10406578-f252-494b-81ea-c7449ae7cff2>]]
              └ <Client: 'tcp://127.0.0.1:33577' processes=106 threads=106, memory=711.35 GB>
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1986, in gather
    return self.sync(
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 832, in sync
    return sync(
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 340, in sync
    raise exc.with_traceback(tb)
          │                  └ <traceback object at 0x7f10d144cf00>
          └ KilledWorker("('dataframe_from_one_image-from-delayed-bbaf57f69c90763b1975c5fbd337d279', 266578)", <Worker 'tcp://127.0.0.1:4609...
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 324, in f
    result[0] = yield future
    │                 └ <coroutine object Client._gather at 0x7f16301095c0>
    └ [None]
  File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
            └ None
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1851, in _gather
    raise exception.with_traceback(traceback)
          │                        └ None
          └ KilledWorker("('dataframe_from_one_image-from-delayed-bbaf57f69c90763b1975c5fbd337d279', 266578)", <Worker 'tcp://127.0.0.1:4609...
distributed.scheduler.KilledWorker: ("('dataframe_from_one_image-from-delayed-bbaf57f69c90763b1975c5fbd337d279', 266578)", <Worker 'tcp://127.0.0.1:46095', name: 81, memory: 0, processing: 8834>)
distributed.worker - WARNING - Heartbeat to scheduler failed
distributed.worker - WARNING - Heartbeat to scheduler failed
distributed.worker - WARNING - Heartbeat to scheduler failed
distributed.worker - WARNING - Heartbeat to scheduler failed
distributed.worker - WARNING - Heartbeat to scheduler failed
distributed.worker - WARNING - Heartbeat to scheduler failed
distributed.worker - WARNING - Heartbeat to scheduler failed
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing

The error is extra verbose because I'm using the library Better Exceptions. I don't understand what's causing this error, but I'm sure it has something to do with either running inside Docker, or memory usage, because when I've tested this code out on small batches, I have no problems.

This is being built and run in a Docker container that is built off a python:3.8 base image. The container has access to 80 CPUs and 700 GB of memory.

I'm using dask[complete]==2.30.0.

0

There are 0 best solutions below