I'm running the code pasted below in a machine with 16GB of RAM (purposely).
import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np
from dask_ml.cluster import KMeans
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1, processes=False,
memory_limit='2GB', scheduler_port=0,
silence_logs=False, dashboard_address=8787)
n_centers = 12
n_features = 4
X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)
centers = np.zeros((n_centers, n_features))
for i in range(n_centers):
centers[i] = X_small[y_small == i].mean(0)
print(centers)
n_samples_per_block = 450 * 650 * 900
n_blocks = 4
delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
centers=centers,
n_features=n_features,
random_state=i)[0]
for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X_small.dtype)
for obj in delayeds]
X = da.concatenate(arrays)
print(X)
X = X.rechunk((1000, 4))
clf = KMeans(init_max_iter=3, oversampling_factor=10)
clf.fit(X)
client.close()
Considering I'm creating 4 workers with 2 GB of memory limits (total of 8 GB), I would like to see this algorithm not exceeding the amount of memory of that machine. Unfortunately, it is using more than 16 GB and swap.
I really don't know what is wrong with that code of if I misunderstood the concepts of Dask (specially because this code does not have any complexity in terms of data dependencies).
This is not a direct answer to the problem of
dask
not respecting the memory constraint (short answer seems to be this is not a binding constraint), however the code can be improved along these directions:make_blobs
that was adapted bydask_ml
: this reduces overhead due to construction of a dask array and related reshaping;.close
, especially if there are errors in the code executed on the workers.