Reduce dask XGBoost memory consumption

927 Views Asked by At

I am writing a simple script code to train an XGBoost predictor on my dataset. This is the code I am using:

import dask.dataframe as dd
import dask_ml
from dask.distributed import Client, LocalCluster
import sys
from dask_ml.model_selection import train_test_split
import dask
import xgboost
import dask_xgboost

def start_cluster(n_workers=1, threads_per_worker=2, memory_limit="12GB", processes=False):
    cluster = LocalCluster(
        n_workers=n_workers, threads_per_worker=threads_per_worker, memory_limit=memory_limit, processes=processes
    )
    client = Client(cluster)  # use default n_threads and mem
    print(client)
    print(client.cluster)
    print("Client infos:", client.scheduler_info())
    return client

client = start_cluster()

dask_df = dd.read_parquet('./sample_dataset', engine='pyarrow')

dask_df=dask_df.drop(
    ['mapped_tweet_id',
     'mapped_creator_id',
     'mapped_engager_id',
     'engagement_retweet_timestamp',
     'engagement_comment_timestamp',
     'engagement_reply_timestamp',
     'mapped_tweet_links',
     'mapped_domains',
     'mapped_tweet_hashtags'
    ], axis=1
)

y = dask_df['engagement_like_timestamp']>0

dask_df=dask_df.drop(
    [
     'engagement_like_timestamp',
    ], axis=1
)

X_train, X_test, y_train, y_test = train_test_split(dask_df, y, test_size=0.2, shuffle= True)

params = {'objective': 'binary:logistic',
          'max_depth': 4, 'eta': 0.01, 'subsample': 0.5,
          'min_child_weight': 0.5}

bst = dask_xgboost.train(client, params, X_train, y_train, num_boost_round=10)

It runs fine, but I keep getting the usual garbage collector warning associated with Dask (distributed.utils_perf - WARNING - full garbage collections took 36% CPU time recently (threshold: 10%) )

I analyzed the available dashboard and what I noticed is that my code keeps increasing memory usage until it reaches the 80% limit (I changed the standard settings inside the .config folder) and then it starts to slow down because of the garbage collector.

Here an example:

Dashboard

It basically keeps going like this until it saturates all the available memory. The dataset I am using is quite big, that is why I am using Dask. However it seems that it is basically loading the whole dataset to memory (i don't know if this assumption is correct, but it seems so because of the read-parquet tasks).

The code is pretty straight forward and it doesn't seem to have big problems.

From the author of the library in the following issue (Dask Github Issue ) he says:

It's also worth noting that this error message

distributed.utils_perf - WARNING - full garbage collections took 47% CPU time recently (threshold: 10%) is most often (but not exclusively) the fault of the code that you're running, and not anything to do with Dask. Dask is just in a nice position to let you know if things like that are going on.

However, as I have already said, the code is really straight forward.

  1. How can I remove this warning? It is really slowing down the performance of my code
  2. I am using Dask to work "chunk-wise" since my dataset is too big to fit in memory. However it seems that it is loading everything into memory, which makes the use of Dask kind of useless. How can I "force" it to work as expected (chunk-wise)?
1

There are 1 best solutions below

3
On

Can you please try using xgboost.dask. XGboost now has native Dask support (please read our blog)

Here are some docs: https://xgboost.readthedocs.io/en/latest/tutorials/dask.html

Here is some example code from the site:

import xgboost as xgb
import dask.array as da
import dask.distributed

cluster = dask.distributed.LocalCluster(n_workers=4, threads_per_worker=1)
client = dask.distributed.Client(cluster)

# X and y must be Dask dataframes or arrays
num_obs = 1e5
num_features = 20
X = da.random.random(
    size=(num_obs, num_features),
    chunks=(1000, num_features)
)
y = da.random.random(
    size=(num_obs, 1),
    chunks=(1000, 1)
)

dtrain = xgb.dask.DaskDMatrix(client, X, y)

output = xgb.dask.train(client,
                        {'verbosity': 2,
                         'tree_method': 'hist',
                         'objective': 'reg:squarederror'
                         },
                        dtrain,
                        num_boost_round=4, evals=[(dtrain, 'train')])

Can you please let us know where you referred to the previous method of using dask and xgboost? If it is in our docs, I'd love to get it corrected!