I'm encountering an error while training a Reinforcement Learning (RL) Single Agent in following configurations for the FigureEightNetwork from flow example (flow/examples/exp_configs/rl/singleagent/singleagent_figure_eight.py).
**Configuration 1 (Encounters Error): **
- Experiment: singleagent_figure_eight
- Network: FigureEightNetwork -** Environment: AccelEnv **
- Acceleration Controller: IDMController (for 21 human vehicles) and RLController (for 1 RL vehicle)
- Routing Controller: ContinuousRouter
- Simulator: traci
- Render Mode: False
Encountered Error:
Error: Answered with error to command 0xc4: The vehicle 'human_0' to add already exists.
More Details Errors are depicted below,
ERROR trial_runner.py:482 -- Error processing event.
Traceback (most recent call last):
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/tune/trial_runner.py", line 426, in _process_trial
result = self.trial_executor.fetch_result(trial)
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/tune/ray_trial_executor.py", line 378, in fetch_result
result = ray.get(trial_future[0], DEFAULT_GET_TIMEOUT)
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/worker.py", line 1457, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TraCIException): ray::PPO.train() (pid=5104, ip=127.0.1.1)
File "python/ray/_raylet.pyx", line 636, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 619, in ray._raylet.execute_task.function_executor
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/agents/trainer.py", line 444, in train
raise e
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/agents/trainer.py", line 433, in train
result = Trainable.train(self)
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/tune/trainable.py", line 176, in train
result = self._train()
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/agents/trainer_template.py", line 129, in _train
fetches = self.optimizer.step()
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/optimizers/multi_gpu_optimizer.py", line 140, in step
self.num_envs_per_worker, self.train_batch_size)
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/optimizers/rollout.py", line 29, in collect_samples
next_sample = ray_get_and_free(fut_sample)
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/utils/memory.py", line 33, in ray_get_and_free
result = ray.get(object_ids)
ray.exceptions.RayTaskError(TraCIException): ray::RolloutWorker.sample() (pid=5106, ip=127.0.1.1)
File "/home/plexe/flow/flow/core/kernel/vehicle/traci.py", line 1120, in add
departSpeed=str(speed))
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/traci/_vehicle.py", line 1427, in add
self._connection._sendExact()
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/traci/connection.py", line 106, in _sendExact
raise TraCIException(err, prefix[1], _RESULTS[prefix[2]])
traci.exceptions.TraCIException: The vehicle 'human_0' to add already exists.
During handling of the above exception, another exception occurred:
ray::RolloutWorker.sample() (pid=5106, ip=127.0.1.1)
File "python/ray/_raylet.pyx", line 633, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 634, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 636, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 619, in ray._raylet.execute_task.function_executor
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/evaluation/rollout_worker.py", line 471, in sample
batches = [self.input_reader.next()]
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/evaluation/sampler.py", line 56, in next
batches = [self.get_data()]
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/evaluation/sampler.py", line 99, in get_data
item = next(self.rollout_provider)
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/evaluation/sampler.py", line 319, in _env_runner
soft_horizon, no_done_at_end)
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/evaluation/sampler.py", line 480, in _process_observations
resetted_obs = base_env.try_reset(env_id)
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/env/base_env.py", line 336, in try_reset
return {_DUMMY_AGENT_ID: self.vector_env.reset_at(env_id)}
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/env/vector_env.py", line 104, in reset_at
return self.envs[index].reset()
File "/home/plexe/flow/flow/envs/ring/accel.py", line 177, in reset
obs = super().reset()
File "/home/plexe/flow/flow/envs/base.py", line 519, in reset
speed=speed)
File "/home/plexe/flow/flow/core/kernel/vehicle/traci.py", line 1120, in add
departSpeed=str(speed))
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/traci/_vehicle.py", line 1427, in add
self._connection._sendExact()
File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/traci/connection.py", line 106, in _sendExact
raise TraCIException(err, prefix[1], _RESULTS[prefix[2]])
traci.exceptions.TraCIException: The vehicle 'human_0' to add already exists.
2024-03-25 22:54:00,843 INFO trial_runner.py:530 -- Trial PPO_AccelEnv-v0_8b75aec0: Attempting to recover trial state from last checkpoint.
2024-03-25 22:54:00,847 INFO ray_trial_executor.py:121 -- Trial PPO_AccelEnv-v0_8b75aec0: Setting up new remote runner.
(pid=5106) Warning: Teleporting vehicle 'human_7'; collision with vehicle 'human_0', lane=':center_0_0', gap=-1.00, time=9.00 stage=move.
(pid=5106) Warning: Vehicle 'human_7' ends teleporting on edge 'left', time 9.00.
(pid=5106) Error: Answered with error to command 0xc4: The vehicle 'human_0' to add already exists.
(pid=5106) Error: Answered with error to command 0xc4: The vehicle 'human_0' to add already exists.
Additional Details:
The error trace indicates problems during vehicle creation and simulation execution, leading to training failure.
However, Changing the **Render Mode to True results in the simulation running **but with occasional hangs, ultimately showing progress and the training iteration details are provided, including rewards, iterations, and timestamps.
Moreover, during training the RingNetwork from flow example (singleagent_ring.py) for configuration as described below, the process proceeds smoothly without encountering any errors.
**Configuration 2 (Works Fine and trains successfully without errors): **
- Experiment: singleagent_ring **- Network: RingNetwork
- Environment: WaveAttenuationPOEnv **
- Acceleration Controller: IDMController (for 21 human vehicles) and RLController (for 1 RL vehicle)
- Routing Controller: ContinuousRouter
- Simulator: traci
- Render Mode: False
Specific Queries:
**1. Is the issue related to the environment used (AccelEnv) or the RL controller?
Are there any issues with installing the Ray library?
As this error is a TraCI exception, should I debug the code in the libsumo library from the SUMO source code, or should I focus on the TraCIVehicle class within the 'flow.core.kernel.vehicle' module?
How can I identify the source of the error and resolve it? **
The code of singleagent_fugure_eight.py is given below,
"""Figure eight example."""
from flow.core.params import SumoParams, EnvParams, InitialConfig, NetParams
from flow.core.params import VehicleParams, SumoCarFollowingParams
from flow.controllers import IDMController, ContinuousRouter, RLController
from flow.networks.figure_eight import ADDITIONAL_NET_PARAMS
from flow.envs import AccelEnv
from flow.networks import FigureEightNetwork
# time horizon of a single rollout
#HORIZON = 1500
HORIZON = 130
# number of rollouts per training iteration
#N_ROLLOUTS = 20
N_ROLLOUTS = 2
# number of parallel workers
N_CPUS = 2
# We place one autonomous vehicle and 13 human-driven vehicles in the network
vehicles = VehicleParams()
vehicles.add(
veh_id='human',
acceleration_controller=(IDMController, {
'noise': 0.2
}),
routing_controller=(ContinuousRouter, {}),
car_following_params=SumoCarFollowingParams(
speed_mode="obey_safe_speed",
decel=1.5,
),
num_vehicles=13)
vehicles.add(
veh_id='rl',
acceleration_controller=(RLController, {}),
routing_controller=(ContinuousRouter, {}),
car_following_params=SumoCarFollowingParams(
speed_mode="obey_safe_speed",
decel=1.5,
),
num_vehicles=1)
flow_params = dict(
# name of the experiment
exp_tag='singleagent_figure_eight',
# name of the flow environment the experiment is running on
env_name=AccelEnv,
# name of the network class the experiment is running on
network=FigureEightNetwork,
# simulator that is used by the experiment
simulator='traci',
# sumo-related parameters (see flow.core.params.SumoParams)
sim=SumoParams(
#sim_step=0.1,
sim_step=1,
render=False,
#render=True,
),
# environment related parameters (see flow.core.params.EnvParams)
env=EnvParams(
horizon=HORIZON,
additional_params={
'target_velocity': 20,
'max_accel': 3,
'max_decel': 3,
'sort_vehicles': False
},
),
# network-related parameters (see flow.core.params.NetParams and the
# network's documentation or ADDITIONAL_NET_PARAMS component)
net=NetParams(
additional_params=ADDITIONAL_NET_PARAMS.copy(),
),
# vehicles to be placed in the network at the start of a rollout (see
# flow.core.params.VehicleParams)
veh=vehicles,
# parameters specifying the positioning of vehicles upon initialization/
# reset (see flow.core.params.InitialConfig)
initial=InitialConfig(),
)
The code of train.py is given below,
"""Runner script for single and multi-agent reinforcement learning experiments.
This script performs an RL experiment using the PPO algorithm. Choice of
hyperparameters can be seen and adjusted from the code below.
Usage
python train.py EXP_CONFIG
"""
import argparse
import json
import os
import sys
from time import strftime
from copy import deepcopy
from flow.core.util import ensure_dir
from flow.utils.registry import env_constructor
from flow.utils.rllib import FlowParamsEncoder, get_flow_params
from flow.utils.registry import make_create_env
def parse_args(args):
"""Parse training options user can specify in command line.
Returns
-------
argparse.Namespace
the output parser object
"""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Parse argument used when running a Flow simulation.",
epilog="python train.py EXP_CONFIG")
# required input parameters
parser.add_argument(
'exp_config', type=str,
help='Name of the experiment configuration file, as located in '
'exp_configs/rl/singleagent or exp_configs/rl/multiagent.')
# optional input parameters
parser.add_argument(
'--rl_trainer', type=str, default="rllib",
help='the RL trainer to use. either rllib or Stable-Baselines')
parser.add_argument(
'--num_cpus', type=int, default=1,
help='How many CPUs to use')
parser.add_argument(
'--num_steps', type=int, default=5000,
help='How many total steps to perform learning over')
parser.add_argument(
'--rollout_size', type=int, default=1000,
help='How many steps are in a training batch.')
parser.add_argument(
'--checkpoint_path', type=str, default=None,
help='Directory with checkpoint to restore training from.')
return parser.parse_known_args(args)[0]
def run_model_stablebaseline(flow_params,
num_cpus=1,
rollout_size=50,
num_steps=50):
"""Run the model for num_steps if provided.
Parameters
----------
flow_params : dict
flow-specific parameters
num_cpus : int
number of CPUs used during training
rollout_size : int
length of a single rollout
num_steps : int
total number of training steps
The total rollout length is rollout_size.
Returns
-------
stable_baselines.*
the trained model
"""
from stable_baselines.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines import PPO2
if num_cpus == 1:
constructor = env_constructor(params=flow_params, version=0)()
# The algorithms require a vectorized environment to run
env = DummyVecEnv([lambda: constructor])
else:
env = SubprocVecEnv([env_constructor(params=flow_params, version=i)
for i in range(num_cpus)])
train_model = PPO2('MlpPolicy', env, verbose=1, n_steps=rollout_size)
train_model.learn(total_timesteps=num_steps)
return train_model
def setup_exps_rllib(flow_params,
n_cpus,
n_rollouts,
policy_graphs=None,
policy_mapping_fn=None,
policies_to_train=None):
"""Return the relevant components of an RLlib experiment.
Parameters
----------
flow_params : dict
flow-specific parameters (see flow/utils/registry.py)
n_cpus : int
number of CPUs to run the experiment over
n_rollouts : int
number of rollouts per training iteration
policy_graphs : dict, optional
TODO
policy_mapping_fn : function, optional
TODO
policies_to_train : list of str, optional
TODO
Returns
-------
str
name of the training algorithm
str
name of the gym environment to be trained
dict
training configuration parameters
"""
from ray import tune
from ray.tune.registry import register_env
try:
from ray.rllib.agents.agent import get_agent_class
except ImportError:
from ray.rllib.agents.registry import get_agent_class
horizon = flow_params['env'].horizon
alg_run = "PPO"
agent_cls = get_agent_class(alg_run)
config = deepcopy(agent_cls._default_config)
config["num_workers"] = n_cpus
config["train_batch_size"] = horizon * n_rollouts
config["gamma"] = 0.999 # discount rate
config["model"].update({"fcnet_hiddens": [32, 32, 32]})
config["use_gae"] = True
config["lambda"] = 0.97
config["kl_target"] = 0.02
config["num_sgd_iter"] = 10
config["horizon"] = horizon
# save the flow params for replay
flow_json = json.dumps(
flow_params, cls=FlowParamsEncoder, sort_keys=True, indent=4)
config['env_config']['flow_params'] = flow_json
config['env_config']['run'] = alg_run
# multiagent configuration
if policy_graphs is not None:
print("policy_graphs", policy_graphs)
config['multiagent'].update({'policies': policy_graphs})
if policy_mapping_fn is not None:
config['multiagent'].update(
{'policy_mapping_fn': tune.function(policy_mapping_fn)})
if policies_to_train is not None:
config['multiagent'].update({'policies_to_train': policies_to_train})
create_env, gym_name = make_create_env(params=flow_params)
# Register as rllib env
register_env(gym_name, create_env)
return alg_run, gym_name, config
def train_rllib(submodule, flags):
"""Train policies using the PPO algorithm in RLlib."""
import ray
from ray.tune import run_experiments
flow_params = submodule.flow_params
n_cpus = submodule.N_CPUS
n_rollouts = submodule.N_ROLLOUTS
policy_graphs = getattr(submodule, "POLICY_GRAPHS", None)
policy_mapping_fn = getattr(submodule, "policy_mapping_fn", None)
policies_to_train = getattr(submodule, "policies_to_train", None)
alg_run, gym_name, config = setup_exps_rllib(
flow_params, n_cpus, n_rollouts,
policy_graphs, policy_mapping_fn, policies_to_train)
ray.init(num_cpus=n_cpus + 1, object_store_memory=200 * 1024 * 1024)
exp_config = {
"run": alg_run,
"env": gym_name,
"config": {
**config
},
"checkpoint_freq": 20,
"checkpoint_at_end": True,
#"max_failures": 999,
"max_failures": 2,
"stop": {
"training_iteration": flags.num_steps,
},
}
if flags.checkpoint_path is not None:
exp_config['restore'] = flags.checkpoint_path
run_experiments({flow_params["exp_tag"]: exp_config})
def train_h_baselines(env_name, args, multiagent):
"""Train policies using SAC and TD3 with h-baselines."""
from hbaselines.algorithms import OffPolicyRLAlgorithm
from hbaselines.utils.train import parse_options, get_hyperparameters
# Get the command-line arguments that are relevant here
args = parse_options(description="", example_usage="", args=args)
# the base directory that the logged data will be stored in
base_dir = "training_data"
for i in range(args.n_training):
# value of the next seed
seed = args.seed + i
# The time when the current experiment started.
now = strftime("%Y-%m-%d-%H:%M:%S")
# Create a save directory folder (if it doesn't exist).
dir_name = os.path.join(base_dir, '{}/{}'.format(args.env_name, now))
ensure_dir(dir_name)
# Get the policy class.
if args.alg == "TD3":
if multiagent:
from hbaselines.multi_fcnet.td3 import MultiFeedForwardPolicy
policy = MultiFeedForwardPolicy
else:
from hbaselines.fcnet.td3 import FeedForwardPolicy
policy = FeedForwardPolicy
elif args.alg == "SAC":
if multiagent:
from hbaselines.multi_fcnet.sac import MultiFeedForwardPolicy
policy = MultiFeedForwardPolicy
else:
from hbaselines.fcnet.sac import FeedForwardPolicy
policy = FeedForwardPolicy
else:
raise ValueError("Unknown algorithm: {}".format(args.alg))
# Get the hyperparameters.
hp = get_hyperparameters(args, policy)
# Add the seed for logging purposes.
params_with_extra = hp.copy()
params_with_extra['seed'] = seed
params_with_extra['env_name'] = args.env_name
params_with_extra['policy_name'] = policy.__name__
params_with_extra['algorithm'] = args.alg
params_with_extra['date/time'] = now
# Add the hyperparameters to the folder.
with open(os.path.join(dir_name, 'hyperparameters.json'), 'w') as f:
json.dump(params_with_extra, f, sort_keys=True, indent=4)
# Create the algorithm object.
alg = OffPolicyRLAlgorithm(
policy=policy,
env="flow:{}".format(env_name),
eval_env="flow:{}".format(env_name) if args.evaluate else None,
**hp
)
# Perform training.
alg.learn(
total_steps=args.total_steps,
log_dir=dir_name,
log_interval=args.log_interval,
eval_interval=args.eval_interval,
save_interval=args.save_interval,
initial_exploration_steps=args.initial_exploration_steps,
seed=seed,
)
def train_stable_baselines(submodule, flags):
"""Train policies using the PPO algorithm in stable-baselines."""
from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2
flow_params = submodule.flow_params
# Path to the saved files
exp_tag = flow_params['exp_tag']
result_name = '{}/{}'.format(exp_tag, strftime("%Y-%m-%d-%H:%M:%S"))
# Perform training.
print('Beginning training.')
model = run_model_stablebaseline(
flow_params, flags.num_cpus, flags.rollout_size, flags.num_steps)
# Save the model to a desired folder and then delete it to demonstrate
# loading.
print('Saving the trained model!')
path = os.path.realpath(os.path.expanduser('~/baseline_results'))
ensure_dir(path)
save_path = os.path.join(path, result_name)
model.save(save_path)
# dump the flow params
with open(os.path.join(path, result_name) + '.json', 'w') as outfile:
json.dump(flow_params, outfile,
cls=FlowParamsEncoder, sort_keys=True, indent=4)
# Replay the result by loading the model
print('Loading the trained model and testing it out!')
model = PPO2.load(save_path)
flow_params = get_flow_params(os.path.join(path, result_name) + '.json')
flow_params['sim'].render = True
env = env_constructor(params=flow_params, version=0)()
# The algorithms require a vectorized environment to run
eval_env = DummyVecEnv([lambda: env])
obs = eval_env.reset()
reward = 0
for _ in range(flow_params['env'].horizon):
action, _states = model.predict(obs)
obs, rewards, dones, info = eval_env.step(action)
reward += rewards
print('the final reward is {}'.format(reward))
def main(args):
"""Perform the training operations."""
# Parse script-level arguments (not including package arguments).
flags = parse_args(args)
# Import relevant information from the exp_config script.
module = __import__(
"exp_configs.rl.singleagent", fromlist=[flags.exp_config])
module_ma = __import__(
"exp_configs.rl.multiagent", fromlist=[flags.exp_config])
# Import the sub-module containing the specified exp_config and determine
# whether the environment is single agent or multi-agent.
if hasattr(module, flags.exp_config):
submodule = getattr(module, flags.exp_config)
multiagent = False
elif hasattr(module_ma, flags.exp_config):
submodule = getattr(module_ma, flags.exp_config)
assert flags.rl_trainer.lower() in ["rllib", "h-baselines"], \
"Currently, multiagent experiments are only supported through "\
"RLlib. Try running this experiment using RLlib: " \
"'python train.py EXP_CONFIG'"
multiagent = True
else:
raise ValueError("Unable to find experiment config.")
# Perform the training operation.
if flags.rl_trainer.lower() == "rllib":
train_rllib(submodule, flags)
elif flags.rl_trainer.lower() == "stable-baselines":
train_stable_baselines(submodule, flags)
elif flags.rl_trainer.lower() == "h-baselines":
train_h_baselines(flags.exp_config, args, multiagent)
else:
raise ValueError("rl_trainer should be either 'rllib', 'h-baselines', "
"or 'stable-baselines'.")
if __name__ == "__main__":
main(sys.argv[1:])