I'm attempting to use Ray Tune 2.8.0 to optimize the hyperparameters of a pytorch neural network model--a fairly standard use case as far as I can understand. Pytorch is distributed with several standard prepackaged data sets that can be loaded to facilitate user practice and learning exercises. In the pytorch processing model, the data loading procedure seems to be subdivided into two main steps: an initial step creates sort of a "handle" object or reference to a local copy of the data set (additionally downloading from an external URL if a local copy does not yet exist), and a second step creates a DataLoader()
object which is basically a python iterator that cycles through the training and test samples in batches. (That batch_size
, coincidentally, is one of the hyperparameters that I would like to tune.)
I've encountered a situation in which the pytorch model for data loading appears to conflict (or at least, it interacts rather poorly) with the Ray Tune model for managing data access across various workers and nodes.
The example code below implements two required items that are needed to define a Ray Tune Tuner()
object: a trivial "trainable" function and a hyperparameter search space. For two other required items that are mentioned in the Getting Started documentation (search algorithm and scheduler), I simply accept the standard system defaults.
Additionally, I create a dict()
of pytorch data set handle objects that I mentioned above, and I attempt to pass that dict()
object into the "trainable" function as well (because of course the trainable function in any realistic use case would need access to that).
from torchvision import datasets
from torchvision.transforms import ToTensor
from functools import partial
from ray import train, tune
import numpy as np
dthan = dict()
for splitid, trnopt in zip(['train', 'test'], [True, False]):
# Within the pytorch framework, I think this creates some kind of initial
# "handle" which can be used to facilitate further data access
dthan[splitid] = datasets.FashionMNIST(root="data", train=trnopt,
download=True, transform=ToTensor())
# Ray tune "trainable" function, to be passed in to the Tuner()
def dummytrain(config, data_handle):
# In a real hyperparameter tuning exercise, the data_handle would be
# used to created a batched pytorch DataLoader(), and this would be
# subsequently used in batched iterative cycles to train a neural
# network model, ultimately resulting in some "loss" value at the
# end of the training schedule. But for simplicity here we ignore
# all of that and just report an arbitrary random final output value
# to ray tune.
train.report({'loss': np.random.uniform()})
# Ray tune hyperparameter search space
config = {
"lr": tune.loguniform(1e-7, 1e-1),
"batch_size": tune.choice([2, 4, 8, 16, 32, 64, 128])
}
# According to the ray tune documentation, in addition to a "trainable"
# and a search space, we can also define a search algorithm and a scheduler,
# but to keep things simple, here we'll just accept the defaults
tuner = tune.Tuner(
# Create a wrapped version of dummytrain with one of the input parameters
# (data_handle) already pre-defined, so that ray tune only needs to pass
# in the config value for each finalized instance of this function
partial(dummytrain, data_handle=dthan),
# If you comment out the above line and uncomment this one, then ray tune
# behaves as expected, reporting random numbers for the loss value
# defined in the trainable function
#partial(dummytrain, data_handle=None),
# Take 10 draws from the search space
tune_config=tune.TuneConfig(num_samples=10),
# Pass in the search space
param_space=config
)
# Attempt to tune the hyperparameters
tuner.fit()
After running the code, I obtain a cascade of exceptions, the second one apparently raised while attempting to handle the first:
Python 3.11.4 (v3.11.4:d2340ef257, Jun 6 2023, 19:15:51) [Clang 13.0.0 (clang-1300.0.29.30)]
Type 'copyright', 'credits' or 'license' for more information
IPython 8.14.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]: %run testray.py
2023-11-29 16:38:23,308 INFO worker.py:1673 -- Started a local Ray instance.
2023-11-29 16:38:25,904 INFO tune.py:220 -- Initializing Ray automatically. For cluster usage or custom Ray initialization, call `ray.init(...)` before `Tuner(...)`.
2023-11-29 16:38:25,905 INFO tune.py:595 -- [output] This will use the new output engine with verbosity 1. To disable the new output and use the legacy output engine, set the environment variable RAY_AIR_NEW_OUTPUT=0. For more information, please see https://github.com/ray-project/ray/issues/36949
╭───────────────────────────────────────────────────────────────────╮
│ Configuration for experiment dummytrain_2023-11-29_16-38-21 │
├───────────────────────────────────────────────────────────────────┤
│ Search algorithm BasicVariantGenerator │
│ Scheduler FIFOScheduler │
│ Number of trials 10 │
╰───────────────────────────────────────────────────────────────────╯
View detailed results here: /Users/stachyra/ray_results/dummytrain_2023-11-29_16-38-21
To visualize your results with TensorBoard, run: `tensorboard --logdir /Users/stachyra/ray_results/dummytrain_2023-11-29_16-38-21`
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/tune/tune.py:1007, in run(run_or_experiment, name, metric, mode, stop, time_budget_s, config, resources_per_trial, num_samples, storage_path, storage_filesystem, search_alg, scheduler, checkpoint_config, verbose, progress_reporter, log_to_file, trial_name_creator, trial_dirname_creator, sync_config, export_formats, max_failures, fail_fast, restore, server_port, resume, reuse_actors, raise_on_failed_trial, callbacks, max_concurrent_trials, keep_checkpoints_num, checkpoint_score_attr, checkpoint_freq, checkpoint_at_end, chdir_to_trial_dir, local_dir, _experiment_checkpoint_dir, _remote, _remote_string_queue, _entrypoint)
1006 while not runner.is_finished() and not experiment_interrupted_event.is_set():
-> 1007 runner.step()
1008 if has_verbosity(Verbosity.V1_EXPERIMENT):
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py:731, in TuneController.step(self)
730 # Handle one event
--> 731 if not self._actor_manager.next(timeout=0.1):
732 # If there are no actors running, warn about potentially
733 # insufficient resources
734 if not self._actor_manager.num_live_actors:
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/air/execution/_internal/actor_manager.py:191, in RayActorManager.next(self, timeout)
190 # We always try to start actors as this won't trigger an event callback
--> 191 self._try_start_actors()
193 # If an actor was killed, this was our event, and we return.
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/air/execution/_internal/actor_manager.py:361, in RayActorManager._try_start_actors(self, max_actors)
360 # Start Ray actor
--> 361 actor = remote_actor_cls.remote(**kwargs)
363 # Track
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/actor.py:686, in ActorClass.options.<locals>.ActorOptionWrapper.remote(self, *args, **kwargs)
685 def remote(self, *args, **kwargs):
--> 686 return actor_cls._remote(args=args, kwargs=kwargs, **updated_options)
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
23 auto_init_ray()
---> 24 return fn(*args, **kwargs)
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/util/tracing/tracing_helper.py:388, in _tracing_actor_creation.<locals>._invocation_actor_class_remote_span(self, args, kwargs, *_args, **_kwargs)
387 assert "_ray_trace_ctx" not in kwargs
--> 388 return method(self, args, kwargs, *_args, **_kwargs)
390 class_name = self.__ray_metadata__.class_name
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/actor.py:889, in ActorClass._remote(self, args, kwargs, **actor_options)
885 # After serialize / deserialize modified class, the __module__
886 # of modified class will be ray.cloudpickle.cloudpickle.
887 # So, here pass actor_creation_function_descriptor to make
888 # sure export actor class correct.
--> 889 worker.function_actor_manager.export_actor_class(
890 meta.modified_class,
891 meta.actor_creation_function_descriptor,
892 meta.method_meta.methods.keys(),
893 )
895 resources = ray._private.utils.resources_from_ray_options(actor_options)
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/_private/function_manager.py:531, in FunctionActorManager.export_actor_class(self, Class, actor_creation_function_descriptor, actor_method_names)
522 actor_class_info = {
523 "class_name": actor_creation_function_descriptor.class_name.split(".")[-1],
524 "module": actor_creation_function_descriptor.module_name,
(...)
528 "actor_method_names": json.dumps(list(actor_method_names)),
529 }
--> 531 check_oversized_function(
532 actor_class_info["class"],
533 actor_class_info["class_name"],
534 "actor",
535 self._worker,
536 )
538 self._worker.gcs_client.internal_kv_put(
539 key, pickle.dumps(actor_class_info), True, KV_NAMESPACE_FUNCTION_TABLE
540 )
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/_private/utils.py:755, in check_oversized_function(pickled, name, obj_type, worker)
744 error = (
745 "The {} {} is too large ({} MiB > FUNCTION_SIZE_ERROR_THRESHOLD={}"
746 " MiB). Check that its definition is not implicitly capturing a "
(...)
753 ray_constants.FUNCTION_SIZE_ERROR_THRESHOLD // (1024 * 1024),
754 )
--> 755 raise ValueError(error)
ValueError: The actor ImplicitFunc is too large (105 MiB > FUNCTION_SIZE_ERROR_THRESHOLD=95 MiB). Check that its definition is not implicitly capturing a large array or other object in scope. Tip: use ray.put() to put large objects in the Ray object store.
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
File ~/deep_learning/neuralnetwork_exercises/testray.py:49
34 tuner = tune.Tuner(
35 # Create a wrapped version of dummytrain with my kwargs (data_handle)
36 # already defined, so that ray tune only needs to pass in the config
(...)
46 param_space=config
47 )
48 # Attempt to tune the hyperparameters
---> 49 tuner.fit()
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/tune/tuner.py:364, in Tuner.fit(self)
362 if not self._is_ray_client:
363 try:
--> 364 return self._local_tuner.fit()
365 except TuneError as e:
366 raise TuneError(
367 _TUNER_FAILED_MSG.format(
368 path=self._local_tuner.get_experiment_checkpoint_dir()
369 )
370 ) from e
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/tune/impl/tuner_internal.py:526, in TunerInternal.fit(self)
524 param_space = copy.deepcopy(self.param_space)
525 if not self._is_restored:
--> 526 analysis = self._fit_internal(trainable, param_space)
527 else:
528 analysis = self._fit_resume(trainable, param_space)
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/tune/impl/tuner_internal.py:645, in TunerInternal._fit_internal(self, trainable, param_space)
632 """Fitting for a fresh Tuner."""
633 args = {
634 **self._get_tune_run_arguments(trainable),
635 **dict(
(...)
643 **self._tuner_kwargs,
644 }
--> 645 analysis = run(
646 **args,
647 )
648 self.clear_remote_string_queue()
649 return analysis
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/tune/tune.py:1014, in run(run_or_experiment, name, metric, mode, stop, time_budget_s, config, resources_per_trial, num_samples, storage_path, storage_filesystem, search_alg, scheduler, checkpoint_config, verbose, progress_reporter, log_to_file, trial_name_creator, trial_dirname_creator, sync_config, export_formats, max_failures, fail_fast, restore, server_port, resume, reuse_actors, raise_on_failed_trial, callbacks, max_concurrent_trials, keep_checkpoints_num, checkpoint_score_attr, checkpoint_freq, checkpoint_at_end, chdir_to_trial_dir, local_dir, _experiment_checkpoint_dir, _remote, _remote_string_queue, _entrypoint)
1012 _report_air_progress(runner, air_progress_reporter)
1013 except Exception:
-> 1014 runner.cleanup()
1015 raise
1017 tune_taken = time.time() - tune_start
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py:2025, in TuneController.cleanup(self)
2023 def cleanup(self):
2024 """Cleanup trials and callbacks."""
-> 2025 self._cleanup_trials()
2026 self.end_experiment_callbacks()
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py:845, in TuneController._cleanup_trials(self)
840 trial = self._actor_to_trial[tracked_actor]
841 logger.debug(
842 f"Scheduling trial stop at end of experiment (trial {trial}): "
843 f"{tracked_actor}"
844 )
--> 845 self._schedule_trial_stop(trial)
847 # Clean up cached actors now
848 self._cleanup_cached_actors(force_all=True)
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py:1455, in TuneController._schedule_trial_stop(self, trial, exception)
1451 self._actor_to_trial.pop(tracked_actor)
1453 trial.set_ray_actor(None)
-> 1455 self._remove_actor(tracked_actor=tracked_actor)
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py:864, in TuneController._remove_actor(self, tracked_actor)
863 def _remove_actor(self, tracked_actor: TrackedActor):
--> 864 stop_future = self._actor_manager.schedule_actor_task(
865 tracked_actor, "stop", _return_future=True
866 )
867 now = time.monotonic()
869 if self._actor_manager.remove_actor(
870 tracked_actor, kill=False, stop_future=stop_future
871 ):
872 # If the actor was previously alive, track
File /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ray/air/execution/_internal/actor_manager.py:725, in RayActorManager.schedule_actor_task(self, tracked_actor, method_name, args, kwargs, on_result, on_error, _return_future)
722 if tracked_actor not in self._live_actors_to_ray_actors_resources:
723 # Actor is not started, yet
724 if tracked_actor not in self._pending_actors_to_attrs:
--> 725 raise ValueError(
726 f"Tracked actor is not managed by this event manager: "
727 f"{tracked_actor}"
728 )
730 # Cache tasks for future execution
731 self._pending_actors_to_enqueued_actor_tasks[tracked_actor].append(
732 (tracked_actor_task, method_name, args, kwargs)
733 )
ValueError: Tracked actor is not managed by this event manager: <TrackedActor 327435863944350007128109305967424045307>
In [2]:
Assuming the first exception is actually the relevant one, it gives some kind of error message that seems to imply my technique for passing data access into the trainable function resulted in a function instance that took up too much memory:
ValueError: The actor ImplicitFunc is too large (105 MiB > FUNCTION_SIZE_ERROR_THRESHOLD=95 MiB). Check that its definition is not implicitly capturing a large array or other object in scope. Tip: use ray.put() to put large objects in the Ray object store.
My question: what is the preferred technique for incorporating data access into my Ray Tune trainable function? The error message seems to be hinting that I should use some other function called ray.put()
to solve this, however I was unable to find examples in the documentation showing how this would typically work in practice in constructing a Tuner()
instance. What are the key "rules" that I should bear in mind when passing data into Ray and across different modules in Ray, in order to avoid these types of problems in the future?
As an additional point of caution, I note the Ray Core documentation contains a section mentioning that data transferred between Ray workers and nodes must be serializeable by Python Pickle. Based on the error messages that Ray Tune is issuing so far, I'm unable to tell yet whether my code example above runs afoul of this requirement. However, if there are special techniques for data communication in Ray that would either sidestep or work around this requirement (such as having workers and nodes each load their own copies of the data separately from disk) that would be helpful to highlight in any answer to this question.