Why Dask is not respecting the memory limits for LocalCluster?

687 Views Asked by At

I'm running the code pasted below in a machine with 16GB of RAM (purposely).

import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np

from dask_ml.cluster import KMeans
from dask.distributed import Client

client = Client(n_workers=4, threads_per_worker=1, processes=False,
                memory_limit='2GB', scheduler_port=0,
                silence_logs=False, dashboard_address=8787)

n_centers = 12
n_features = 4

X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)

centers = np.zeros((n_centers, n_features))

for i in range(n_centers):
    centers[i] = X_small[y_small == i].mean(0)

print(centers)

n_samples_per_block = 450 * 650 * 900
n_blocks = 4

delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
                                     centers=centers,
                                     n_features=n_features,
                                     random_state=i)[0]
            for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X_small.dtype)
          for obj in delayeds]
X = da.concatenate(arrays)

print(X)

X = X.rechunk((1000, 4))

clf = KMeans(init_max_iter=3, oversampling_factor=10)

clf.fit(X)

client.close()

Considering I'm creating 4 workers with 2 GB of memory limits (total of 8 GB), I would like to see this algorithm not exceeding the amount of memory of that machine. Unfortunately, it is using more than 16 GB and swap.

I really don't know what is wrong with that code of if I misunderstood the concepts of Dask (specially because this code does not have any complexity in terms of data dependencies).

2

There are 2 best solutions below

0
On

This is not a direct answer to the problem of dask not respecting the memory constraint (short answer seems to be this is not a binding constraint), however the code can be improved along these directions:

  • use make_blobs that was adapted by dask_ml: this reduces overhead due to construction of a dask array and related reshaping;
  • use context manager for creating the client (and cluster): this will have better handling of .close, especially if there are errors in the code executed on the workers.
from dask.distributed import Client
from dask_ml.cluster import KMeans
from dask_ml.datasets import make_blobs

client_params = dict(
    n_workers=4,
    threads_per_worker=1,
    processes=False,
    memory_limit="2GB",
    scheduler_port=0,
    silence_logs=False,
    dashboard_address=8787,
)

n_centers = 12
n_features = 4
n_samples = 1000 * 100
chunks = (1000 * 50, 4)

X, _ = make_blobs(
    n_samples=n_samples,
    centers=n_centers,
    n_features=n_features,
    random_state=0,
    chunks=chunks,
)

clf = KMeans(init_max_iter=3, oversampling_factor=10, n_clusters=n_centers)

with Client(**client_params) as client:
    result = clf.fit(X)

print(result.cluster_centers_)
0
On

This is meant as as complement to @SultanOrazbayev's answer, which was much faster than the original snippet and ran well within the allocated memory.

Using the Dask Dashboard's "Woker's Memory" panel, I also saw the total process memory exceeding the 2GB memory limit and this warning: distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS and Unmanaged memory: 3.57 GiB -- Worker memory limit: 1.86 GiB with the unmanaged memory increasing as the computation continued. Usually in this case the recommendation is to manually trim the memory, however, as explained in a similar question on the Dask Discourse, this is not currently possible with KMeans (here is the open issue). Hopefully this adds some helpful context.