Training Error : RL Single Agent for "FigureEightNetwork" (with ray RLlib)

23 Views Asked by At

I'm encountering an error while training a Reinforcement Learning (RL) Single Agent in following configurations for the FigureEightNetwork from flow example (flow/examples/exp_configs/rl/singleagent/singleagent_figure_eight.py).

**Configuration 1 (Encounters Error): **

  • Experiment: singleagent_figure_eight
  • Network: FigureEightNetwork -** Environment: AccelEnv **
  • Acceleration Controller: IDMController (for 21 human vehicles) and RLController (for 1 RL vehicle)
  • Routing Controller: ContinuousRouter
  • Simulator: traci
  • Render Mode: False

Encountered Error:

Error: Answered with error to command 0xc4: The vehicle 'human_0' to add already exists.

More Details Errors are depicted below,

ERROR trial_runner.py:482 -- Error processing event. 

Traceback (most recent call last): 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/tune/trial_runner.py", line 426, in _process_trial 

    result = self.trial_executor.fetch_result(trial) 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/tune/ray_trial_executor.py", line 378, in fetch_result 

    result = ray.get(trial_future[0], DEFAULT_GET_TIMEOUT) 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/worker.py", line 1457, in get 

    raise value.as_instanceof_cause() 

ray.exceptions.RayTaskError(TraCIException): ray::PPO.train() (pid=5104, ip=127.0.1.1) 

  File "python/ray/_raylet.pyx", line 636, in ray._raylet.execute_task 

  File "python/ray/_raylet.pyx", line 619, in ray._raylet.execute_task.function_executor 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/agents/trainer.py", line 444, in train 

    raise e 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/agents/trainer.py", line 433, in train 

    result = Trainable.train(self) 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/tune/trainable.py", line 176, in train 

    result = self._train() 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/agents/trainer_template.py", line 129, in _train 

    fetches = self.optimizer.step() 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/optimizers/multi_gpu_optimizer.py", line 140, in step 

    self.num_envs_per_worker, self.train_batch_size) 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/optimizers/rollout.py", line 29, in collect_samples 

    next_sample = ray_get_and_free(fut_sample) 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/utils/memory.py", line 33, in ray_get_and_free 

    result = ray.get(object_ids) 

ray.exceptions.RayTaskError(TraCIException): ray::RolloutWorker.sample() (pid=5106, ip=127.0.1.1) 

  File "/home/plexe/flow/flow/core/kernel/vehicle/traci.py", line 1120, in add 

    departSpeed=str(speed)) 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/traci/_vehicle.py", line 1427, in add 

    self._connection._sendExact() 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/traci/connection.py", line 106, in _sendExact 

    raise TraCIException(err, prefix[1], _RESULTS[prefix[2]]) 

traci.exceptions.TraCIException: The vehicle 'human_0' to add already exists. 

  

During handling of the above exception, another exception occurred: 

  

ray::RolloutWorker.sample() (pid=5106, ip=127.0.1.1) 

  File "python/ray/_raylet.pyx", line 633, in ray._raylet.execute_task 

  File "python/ray/_raylet.pyx", line 634, in ray._raylet.execute_task 

  File "python/ray/_raylet.pyx", line 636, in ray._raylet.execute_task 

  File "python/ray/_raylet.pyx", line 619, in ray._raylet.execute_task.function_executor 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/evaluation/rollout_worker.py", line 471, in sample 

    batches = [self.input_reader.next()] 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/evaluation/sampler.py", line 56, in next 

    batches = [self.get_data()] 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/evaluation/sampler.py", line 99, in get_data 

    item = next(self.rollout_provider) 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/evaluation/sampler.py", line 319, in _env_runner 

    soft_horizon, no_done_at_end) 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/evaluation/sampler.py", line 480, in _process_observations 

    resetted_obs = base_env.try_reset(env_id) 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/env/base_env.py", line 336, in try_reset 

    return {_DUMMY_AGENT_ID: self.vector_env.reset_at(env_id)} 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/ray/rllib/env/vector_env.py", line 104, in reset_at 

    return self.envs[index].reset() 

  File "/home/plexe/flow/flow/envs/ring/accel.py", line 177, in reset 

    obs = super().reset() 

  File "/home/plexe/flow/flow/envs/base.py", line 519, in reset 

    speed=speed) 

  File "/home/plexe/flow/flow/core/kernel/vehicle/traci.py", line 1120, in add 

    departSpeed=str(speed)) 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/traci/_vehicle.py", line 1427, in add 

    self._connection._sendExact() 

  File "/home/plexe/anaconda3/envs/flow/lib/python3.7/site-packages/traci/connection.py", line 106, in _sendExact 

    raise TraCIException(err, prefix[1], _RESULTS[prefix[2]]) 

traci.exceptions.TraCIException: The vehicle 'human_0' to add already exists. 

2024-03-25 22:54:00,843 INFO trial_runner.py:530 -- Trial PPO_AccelEnv-v0_8b75aec0: Attempting to recover trial state from last checkpoint. 

2024-03-25 22:54:00,847 INFO ray_trial_executor.py:121 -- Trial PPO_AccelEnv-v0_8b75aec0: Setting up new remote runner. 

(pid=5106) Warning: Teleporting vehicle 'human_7'; collision with vehicle 'human_0', lane=':center_0_0', gap=-1.00, time=9.00 stage=move. 

(pid=5106) Warning: Vehicle 'human_7' ends teleporting on edge 'left', time 9.00. 

(pid=5106) Error: Answered with error to command 0xc4: The vehicle 'human_0' to add already exists. 

(pid=5106) Error: Answered with error to command 0xc4: The vehicle 'human_0' to add already exists. 

Additional Details:

  1. The error trace indicates problems during vehicle creation and simulation execution, leading to training failure.

  2. However, Changing the **Render Mode to True results in the simulation running **but with occasional hangs, ultimately showing progress and the training iteration details are provided, including rewards, iterations, and timestamps.

  3. Moreover, during training the RingNetwork from flow example (singleagent_ring.py) for configuration as described below, the process proceeds smoothly without encountering any errors.

**Configuration 2 (Works Fine and trains successfully without errors): **

  • Experiment: singleagent_ring **- Network: RingNetwork
  • Environment: WaveAttenuationPOEnv **
  • Acceleration Controller: IDMController (for 21 human vehicles) and RLController (for 1 RL vehicle)
  • Routing Controller: ContinuousRouter
  • Simulator: traci
  • Render Mode: False

Specific Queries:

**1. Is the issue related to the environment used (AccelEnv) or the RL controller?

  1. Are there any issues with installing the Ray library?

  2. As this error is a TraCI exception, should I debug the code in the libsumo library from the SUMO source code, or should I focus on the TraCIVehicle class within the 'flow.core.kernel.vehicle' module?

  3. How can I identify the source of the error and resolve it? **

The code of singleagent_fugure_eight.py is given below,

"""Figure eight example."""
from flow.core.params import SumoParams, EnvParams, InitialConfig, NetParams
from flow.core.params import VehicleParams, SumoCarFollowingParams
from flow.controllers import IDMController, ContinuousRouter, RLController
from flow.networks.figure_eight import ADDITIONAL_NET_PARAMS
from flow.envs import AccelEnv
from flow.networks import FigureEightNetwork

# time horizon of a single rollout
#HORIZON = 1500
HORIZON = 130
# number of rollouts per training iteration
#N_ROLLOUTS = 20
N_ROLLOUTS = 2
# number of parallel workers
N_CPUS = 2

# We place one autonomous vehicle and 13 human-driven vehicles in the network
vehicles = VehicleParams()
vehicles.add(
    veh_id='human',
    acceleration_controller=(IDMController, {
        'noise': 0.2
    }),
    routing_controller=(ContinuousRouter, {}),
    car_following_params=SumoCarFollowingParams(
        speed_mode="obey_safe_speed",
        decel=1.5,
    ),
    num_vehicles=13)
vehicles.add(
    veh_id='rl',
    acceleration_controller=(RLController, {}),
    routing_controller=(ContinuousRouter, {}),
    car_following_params=SumoCarFollowingParams(
        speed_mode="obey_safe_speed",
        decel=1.5,
    ),
    num_vehicles=1)

flow_params = dict(
    # name of the experiment
    exp_tag='singleagent_figure_eight',

    # name of the flow environment the experiment is running on
    env_name=AccelEnv,

    # name of the network class the experiment is running on
    network=FigureEightNetwork,

    # simulator that is used by the experiment
    simulator='traci',

    # sumo-related parameters (see flow.core.params.SumoParams)
    sim=SumoParams(
        #sim_step=0.1,
    sim_step=1,
        render=False,
    #render=True,
    ),

    # environment related parameters (see flow.core.params.EnvParams)
    env=EnvParams(
        horizon=HORIZON,
        additional_params={
            'target_velocity': 20,
            'max_accel': 3,
            'max_decel': 3,
            'sort_vehicles': False
        },
    ),

    # network-related parameters (see flow.core.params.NetParams and the
    # network's documentation or ADDITIONAL_NET_PARAMS component)
    net=NetParams(
        additional_params=ADDITIONAL_NET_PARAMS.copy(),
    ),

    # vehicles to be placed in the network at the start of a rollout (see
    # flow.core.params.VehicleParams)
    veh=vehicles,

    # parameters specifying the positioning of vehicles upon initialization/
    # reset (see flow.core.params.InitialConfig)
    initial=InitialConfig(),
)

The code of train.py is given below,

"""Runner script for single and multi-agent reinforcement learning experiments.

This script performs an RL experiment using the PPO algorithm. Choice of
hyperparameters can be seen and adjusted from the code below.

Usage
    python train.py EXP_CONFIG
"""
import argparse
import json
import os
import sys
from time import strftime
from copy import deepcopy

from flow.core.util import ensure_dir
from flow.utils.registry import env_constructor
from flow.utils.rllib import FlowParamsEncoder, get_flow_params
from flow.utils.registry import make_create_env


def parse_args(args):
    """Parse training options user can specify in command line.

    Returns
    -------
    argparse.Namespace
        the output parser object
    """
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description="Parse argument used when running a Flow simulation.",
        epilog="python train.py EXP_CONFIG")

    # required input parameters
    parser.add_argument(
        'exp_config', type=str,
        help='Name of the experiment configuration file, as located in '
             'exp_configs/rl/singleagent or exp_configs/rl/multiagent.')

    # optional input parameters
    parser.add_argument(
        '--rl_trainer', type=str, default="rllib",
        help='the RL trainer to use. either rllib or Stable-Baselines')

    parser.add_argument(
        '--num_cpus', type=int, default=1,
        help='How many CPUs to use')
    parser.add_argument(
        '--num_steps', type=int, default=5000,
        help='How many total steps to perform learning over')
    parser.add_argument(
        '--rollout_size', type=int, default=1000,
        help='How many steps are in a training batch.')
    parser.add_argument(
        '--checkpoint_path', type=str, default=None,
        help='Directory with checkpoint to restore training from.')

    return parser.parse_known_args(args)[0]


def run_model_stablebaseline(flow_params,
                             num_cpus=1,
                             rollout_size=50,
                             num_steps=50):
    """Run the model for num_steps if provided.

    Parameters
    ----------
    flow_params : dict
        flow-specific parameters
    num_cpus : int
        number of CPUs used during training
    rollout_size : int
        length of a single rollout
    num_steps : int
        total number of training steps
    The total rollout length is rollout_size.

    Returns
    -------
    stable_baselines.*
        the trained model
    """
    from stable_baselines.common.vec_env import DummyVecEnv, SubprocVecEnv
    from stable_baselines import PPO2

    if num_cpus == 1:
        constructor = env_constructor(params=flow_params, version=0)()
        # The algorithms require a vectorized environment to run
        env = DummyVecEnv([lambda: constructor])
    else:
        env = SubprocVecEnv([env_constructor(params=flow_params, version=i)
                             for i in range(num_cpus)])

    train_model = PPO2('MlpPolicy', env, verbose=1, n_steps=rollout_size)
    train_model.learn(total_timesteps=num_steps)
    return train_model


def setup_exps_rllib(flow_params,
                     n_cpus,
                     n_rollouts,
                     policy_graphs=None,
                     policy_mapping_fn=None,
                     policies_to_train=None):
    """Return the relevant components of an RLlib experiment.

    Parameters
    ----------
    flow_params : dict
        flow-specific parameters (see flow/utils/registry.py)
    n_cpus : int
        number of CPUs to run the experiment over
    n_rollouts : int
        number of rollouts per training iteration
    policy_graphs : dict, optional
        TODO
    policy_mapping_fn : function, optional
        TODO
    policies_to_train : list of str, optional
        TODO

    Returns
    -------
    str
        name of the training algorithm
    str
        name of the gym environment to be trained
    dict
        training configuration parameters
    """
    from ray import tune
    from ray.tune.registry import register_env
    try:
        from ray.rllib.agents.agent import get_agent_class
    except ImportError:
        from ray.rllib.agents.registry import get_agent_class

    horizon = flow_params['env'].horizon

    alg_run = "PPO"

    agent_cls = get_agent_class(alg_run)
    config = deepcopy(agent_cls._default_config)

    config["num_workers"] = n_cpus
    config["train_batch_size"] = horizon * n_rollouts
    config["gamma"] = 0.999  # discount rate
    config["model"].update({"fcnet_hiddens": [32, 32, 32]})
    config["use_gae"] = True
    config["lambda"] = 0.97
    config["kl_target"] = 0.02
    config["num_sgd_iter"] = 10
    config["horizon"] = horizon

    # save the flow params for replay
    flow_json = json.dumps(
        flow_params, cls=FlowParamsEncoder, sort_keys=True, indent=4)
    config['env_config']['flow_params'] = flow_json
    config['env_config']['run'] = alg_run

    # multiagent configuration
    if policy_graphs is not None:
        print("policy_graphs", policy_graphs)
        config['multiagent'].update({'policies': policy_graphs})
    if policy_mapping_fn is not None:
        config['multiagent'].update(
            {'policy_mapping_fn': tune.function(policy_mapping_fn)})
    if policies_to_train is not None:
        config['multiagent'].update({'policies_to_train': policies_to_train})

    create_env, gym_name = make_create_env(params=flow_params)

    # Register as rllib env
    register_env(gym_name, create_env)
    return alg_run, gym_name, config


def train_rllib(submodule, flags):
    """Train policies using the PPO algorithm in RLlib."""
    import ray
    from ray.tune import run_experiments

    flow_params = submodule.flow_params
    n_cpus = submodule.N_CPUS
    n_rollouts = submodule.N_ROLLOUTS
    policy_graphs = getattr(submodule, "POLICY_GRAPHS", None)
    policy_mapping_fn = getattr(submodule, "policy_mapping_fn", None)
    policies_to_train = getattr(submodule, "policies_to_train", None)

    alg_run, gym_name, config = setup_exps_rllib(
        flow_params, n_cpus, n_rollouts,
        policy_graphs, policy_mapping_fn, policies_to_train)

    ray.init(num_cpus=n_cpus + 1, object_store_memory=200 * 1024 * 1024)
    exp_config = {
        "run": alg_run,
        "env": gym_name,
        "config": {
            **config
        },
        "checkpoint_freq": 20,
        "checkpoint_at_end": True,
        #"max_failures": 999,
        "max_failures": 2,
        "stop": {
            "training_iteration": flags.num_steps,
        },
    }

    if flags.checkpoint_path is not None:
        exp_config['restore'] = flags.checkpoint_path
    run_experiments({flow_params["exp_tag"]: exp_config})


def train_h_baselines(env_name, args, multiagent):
    """Train policies using SAC and TD3 with h-baselines."""
    from hbaselines.algorithms import OffPolicyRLAlgorithm
    from hbaselines.utils.train import parse_options, get_hyperparameters

    # Get the command-line arguments that are relevant here
    args = parse_options(description="", example_usage="", args=args)

    # the base directory that the logged data will be stored in
    base_dir = "training_data"

    for i in range(args.n_training):
        # value of the next seed
        seed = args.seed + i

        # The time when the current experiment started.
        now = strftime("%Y-%m-%d-%H:%M:%S")

        # Create a save directory folder (if it doesn't exist).
        dir_name = os.path.join(base_dir, '{}/{}'.format(args.env_name, now))
        ensure_dir(dir_name)

        # Get the policy class.
        if args.alg == "TD3":
            if multiagent:
                from hbaselines.multi_fcnet.td3 import MultiFeedForwardPolicy
                policy = MultiFeedForwardPolicy
            else:
                from hbaselines.fcnet.td3 import FeedForwardPolicy
                policy = FeedForwardPolicy
        elif args.alg == "SAC":
            if multiagent:
                from hbaselines.multi_fcnet.sac import MultiFeedForwardPolicy
                policy = MultiFeedForwardPolicy
            else:
                from hbaselines.fcnet.sac import FeedForwardPolicy
                policy = FeedForwardPolicy
        else:
            raise ValueError("Unknown algorithm: {}".format(args.alg))

        # Get the hyperparameters.
        hp = get_hyperparameters(args, policy)

        # Add the seed for logging purposes.
        params_with_extra = hp.copy()
        params_with_extra['seed'] = seed
        params_with_extra['env_name'] = args.env_name
        params_with_extra['policy_name'] = policy.__name__
        params_with_extra['algorithm'] = args.alg
        params_with_extra['date/time'] = now

        # Add the hyperparameters to the folder.
        with open(os.path.join(dir_name, 'hyperparameters.json'), 'w') as f:
            json.dump(params_with_extra, f, sort_keys=True, indent=4)

        # Create the algorithm object.
        alg = OffPolicyRLAlgorithm(
            policy=policy,
            env="flow:{}".format(env_name),
            eval_env="flow:{}".format(env_name) if args.evaluate else None,
            **hp
        )

        # Perform training.
        alg.learn(
            total_steps=args.total_steps,
            log_dir=dir_name,
            log_interval=args.log_interval,
            eval_interval=args.eval_interval,
            save_interval=args.save_interval,
            initial_exploration_steps=args.initial_exploration_steps,
            seed=seed,
        )


def train_stable_baselines(submodule, flags):
    """Train policies using the PPO algorithm in stable-baselines."""
    from stable_baselines.common.vec_env import DummyVecEnv
    from stable_baselines import PPO2

    flow_params = submodule.flow_params
    # Path to the saved files
    exp_tag = flow_params['exp_tag']
    result_name = '{}/{}'.format(exp_tag, strftime("%Y-%m-%d-%H:%M:%S"))

    # Perform training.
    print('Beginning training.')
    model = run_model_stablebaseline(
        flow_params, flags.num_cpus, flags.rollout_size, flags.num_steps)

    # Save the model to a desired folder and then delete it to demonstrate
    # loading.
    print('Saving the trained model!')
    path = os.path.realpath(os.path.expanduser('~/baseline_results'))
    ensure_dir(path)
    save_path = os.path.join(path, result_name)
    model.save(save_path)

    # dump the flow params
    with open(os.path.join(path, result_name) + '.json', 'w') as outfile:
        json.dump(flow_params, outfile,
                  cls=FlowParamsEncoder, sort_keys=True, indent=4)

    # Replay the result by loading the model
    print('Loading the trained model and testing it out!')
    model = PPO2.load(save_path)
    flow_params = get_flow_params(os.path.join(path, result_name) + '.json')
    flow_params['sim'].render = True
    env = env_constructor(params=flow_params, version=0)()
    # The algorithms require a vectorized environment to run
    eval_env = DummyVecEnv([lambda: env])
    obs = eval_env.reset()
    reward = 0
    for _ in range(flow_params['env'].horizon):
        action, _states = model.predict(obs)
        obs, rewards, dones, info = eval_env.step(action)
        reward += rewards
    print('the final reward is {}'.format(reward))


def main(args):
    """Perform the training operations."""
    # Parse script-level arguments (not including package arguments).
    flags = parse_args(args)

    # Import relevant information from the exp_config script.
    module = __import__(
        "exp_configs.rl.singleagent", fromlist=[flags.exp_config])
    module_ma = __import__(
        "exp_configs.rl.multiagent", fromlist=[flags.exp_config])

    # Import the sub-module containing the specified exp_config and determine
    # whether the environment is single agent or multi-agent.
    if hasattr(module, flags.exp_config):
        submodule = getattr(module, flags.exp_config)
        multiagent = False
    elif hasattr(module_ma, flags.exp_config):
        submodule = getattr(module_ma, flags.exp_config)
        assert flags.rl_trainer.lower() in ["rllib", "h-baselines"], \
            "Currently, multiagent experiments are only supported through "\
            "RLlib. Try running this experiment using RLlib: " \
            "'python train.py EXP_CONFIG'"
        multiagent = True
    else:
        raise ValueError("Unable to find experiment config.")

    # Perform the training operation.
    if flags.rl_trainer.lower() == "rllib":
        train_rllib(submodule, flags)
    elif flags.rl_trainer.lower() == "stable-baselines":
        train_stable_baselines(submodule, flags)
    elif flags.rl_trainer.lower() == "h-baselines":
        train_h_baselines(flags.exp_config, args, multiagent)
    else:
        raise ValueError("rl_trainer should be either 'rllib', 'h-baselines', "
                         "or 'stable-baselines'.")


if __name__ == "__main__":
    main(sys.argv[1:])
0

There are 0 best solutions below