No improvement in DQN Reinforcement Learning with Pytorch

61 Views Asked by At

I'm working on a reinforcement learning project using PyTorch, where an agent is trained to play a custom game. However, I've encountered an issue where the agent shows no signs of improvement over time. I've implemented a Deep Q-Network (DQN) model for the agent's learning process.

The game:

  1. The agent selects a move between 1-15
  2. If the move is valid, then the agent gets a reward of 0
  3. if the move is invalid then it gets a reward of -1
  4. after selecting 45 valid moves, the game ends, the agent receives a reward based on which valid moves it picked

I implemented a DQN model to try and solve this simple game, but after running it for over 80 iterations, the DQN model shows no improvement.

What I've noticed is that after about 3 iterations, the DQN model starts making the exact same actions over and over, it picks 2 or 3 of the 15 actions and only uses those for the rest of the training. For example, it might pick action 5 and action 12 and switch back and forth between those actions for the rest of the training, this is a problem because after a move has been picked, the action becomes invalid for the rest of the round, so that agent just picks invalid actions over and over. here is an example of what actions it is taking:

tensor([6728., 7771., 6208., 3454., 2145., 7752., 8582., 3225., 0., 1219., 5038., 9683., 6314., 8500., 6519.])predicted:[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] hit tensor([6728., 7771., 6208., 0., 2145., 7752., 8582., 3225., 0., 1219., 5038., 9683., 6314., 8500., 6519.])predicted:[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] missed tensor([6728., 7771., 6208., 0., 2145., 7752., 8582., 3225., 0., 1219., 5038., 9683., 6314., 8500., 6519.])predicted:[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] missed

hit means it was a valid action, missed means it wasn't

I think that this has something to do with how the rewards are handled, but I have modified the rewards a lot and the behavior doesn't change. I have also ensured that the game works as intended and that the observation space isn't an iteration behind or anything. Any suggestions to fixing this problem will be appreciated!

Heres my agent:

class Agent:

    def __init__(self):
        self.n_games = 0
        self.epsilon = 0  # randomness
        self.gamma = 0.9  # discount rate
        self.memory = deque(maxlen=MAX_MEMORY)  # popleft()
        self.model = Linear_QNet(15, 256, 256, 15)
        self.trainer = QTrainer(self.model, lr=LR, gamma=self.gamma)

    def get_state(self, game):


        return np.array(game.get_state(), dtype=int)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))  # popleft if MAX_MEMORY is reached

    def train_long_memory(self):
        if len(self.memory) > BATCH_SIZE:
            mini_sample = random.sample(self.memory, BATCH_SIZE)  # list of tuples
        else:
            mini_sample = self.memory

        states, actions, rewards, next_states, dones = zip(*mini_sample)
        self.trainer.train_step(states, actions, rewards, next_states, dones)
        # for state, action, reward, nexrt_state, done in mini_sample:
        #    self.trainer.train_step(state, action, reward, next_state, done)

    def train_short_memory(self, state, action, reward, next_state, done):
        self.trainer.train_step(state, action, reward, next_state, done)

    def get_action(self, state):
        # random moves: tradeoff exploration / exploitation
        self.epsilon = 80 - self.n_games
        final_move = [0] * 15
        if random.randint(0, 200) < self.epsilon:
            move = random.randint(0, 14)
            final_move[move] = 1
            print(str(torch.tensor(state, dtype=torch.float)) + "random:" + str(final_move))
        else:
            state0 = torch.tensor(state, dtype=torch.float)
            prediction = self.model(state0)
            move = torch.argmax(prediction).item()
            final_move[move] = 1
            print(str(state0) + "predicted:" + str(final_move))

        return final_move

Heres my training loop:

MAX_MEMORY = 100_000
BATCH_SIZE = 1000
LR = 0.001


def train():
    plot_scores = []
    plot_mean_scores = []
    total_score = 0
    record = 0
    agent = Agent()
    game = Draft()
    while True:
        # get old state
        state_old = agent.get_state(game)

        # get move
        final_move = agent.get_action(state_old)

        # perform move and get new state
        reward, done, score = game.play_step(final_move)
        state_new = agent.get_state(game)

        # train short memory
        agent.train_short_memory(state_old, final_move, reward, state_new, done)

        # remember
        agent.remember(state_old, final_move, reward, state_new, done)

        if done:
            # train long memory, plot result
            game.reset()
            agent.n_games += 1
            agent.train_long_memory()

            if score > record:
                record = score
                agent.model.save()

            print('Game', agent.n_games, 'Score', score, 'Record:', record)

            plot_scores.append(score)
            total_score += score
            mean_score = total_score / agent.n_games
            plot_mean_scores.append(mean_score)
            plot(plot_scores, plot_mean_scores)


if __name__ == '__main__':
    train()

Here is my game:

class Draft:
    def __init__(self):
        self.reset()

    def reset(self):
        self.round = 0
        self.collection = []
        self.state = [random.randint(1000, 9999) for _ in range(15)]

    def play_step(self, action):
        action = action.index(1)
        if all(item == 0000 for item in self.state):
            if self.round == 2:
                return (self.calc()/100), True, self.calc()
            if self.round != 2:
                self.state = [random.randint(1000, 9999) for _ in range(15)]
                self.round += 1
                return 0, False, 0
        else:
            if self.state[action] == 0000:
                print("missed")
                return -1, False, 0
            if self.state[action] != 0000:
                print("hit")
                self.collection.append(self.state[action])
                self.state[action] = 0000
                return 0, False, 0

    def get_state(self):
        return self.state
    def calc(self):
        return sum(self.collection)

Here is my model:

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import os


class Linear_QNet(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, output_size):
        super().__init__()
        self.linear1 = nn.Linear(input_size, hidden_size1)
        self.linear2 = nn.Linear(hidden_size1, hidden_size2)
        self.linear3 = nn.Linear(hidden_size2, output_size)

    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = self.linear3(x)
        return x

    def save(self, file_name='model.pth'):
        model_folder_path = './model'
        if not os.path.exists(model_folder_path):
            os.makedirs(model_folder_path)

        file_name = os.path.join(model_folder_path, file_name)
        torch.save(self.state_dict(), file_name)


class QTrainer:
    def __init__(self, model, lr, gamma):
        self.lr = lr
        self.gamma = gamma
        self.model = model
        self.optimizer = optim.Adam(model.parameters(), lr=self.lr)
        self.criterion = nn.MSELoss()

    def train_step(self, state, action, reward, next_state, done):
        state = torch.tensor(state, dtype=torch.float)
        next_state = torch.tensor(next_state, dtype=torch.float)
        action = torch.tensor(action, dtype=torch.long)
        reward = torch.tensor(reward, dtype=torch.float)
        # (n, x)

        if len(state.shape) == 1:
            # (1, x)
            state = torch.unsqueeze(state, 0)
            next_state = torch.unsqueeze(next_state, 0)
            action = torch.unsqueeze(action, 0)
            reward = torch.unsqueeze(reward, 0)
            done = (done,)

        # 1: predicted Q values with current state
        pred = self.model(state)

        target = pred.clone()
        for idx in range(len(done)):
            Q_new = reward[idx]
            if not done[idx]:
                Q_new = reward[idx] + self.gamma * torch.max(self.model(next_state[idx]))

            target[idx][torch.argmax(action[idx]).item()] = Q_new

        # 2: Q_new = r + y * max(next_predicted Q value) -> only do this if not done
        # pred.clone()
        # preds[argmax(action)] = Q_new
        self.optimizer.zero_grad()
        loss = self.criterion(target, pred)
        loss.backward()

        self.optimizer.step()
0

There are 0 best solutions below