[Reinforcement Learning] Policy iteration and value iteration taking a while to converge /u/gitgud_x Python Education

Hi, I’m studying reinforcement learning, closely following the textbook by Sutton and Barlow (more so the tutorial videos by Mutual Information here).

I’ve tried to implement the simple example (from here) of an agent in a 4×4 gridworld. The agent can move within the grid (up/down/left/right) and two of the opposite diagonal squares are terminal states. Each step accrues reward -1, so the goal is to reach a terminal state as quickly as possible. My code that gets this done is below.

import numpy as np # 16 different states arranged in a 4x4 square grid # Two opposite corners are terminal states # The reward for every transition is always -1 (we want to minimize the number of steps) class GridWorld: ''' The states of the grid are enumerated as: [[ 0, 1, 2, 3], [ 4, 5, 6, 7], [ 8, 9, 10, 11], [12, 13, 14, 15]] The agent has 4 potential actions: - 0: up - 1: right - 2: down - 3: left unless the action would move the agent off the grid, in which case the agent remains in place. The agent receives a reward of -1 at each time step until it reaches a terminal state. There are two terminal states in the grid - states 0 and 15. ''' ACTIONS_DICT = {0: 'Up', 1: 'Right', 2: 'Down', 3: 'Left'} def __init__(self, size=4): self.size = size self.n_states = size * size self.n_actions = GridWorld.ACTIONS_DICT.__len__() # initialize a random policy - choose uniformly at random from the 4 actions in all states self.policy = np.ones(shape=[self.n_states, self.n_actions]) / self.n_actions # self.policy[s, a] = π(a | s) self.gamma = 1 # discount factor - no discounting def get_state_from_s(self, s: int): ''' Given an enumerated state s, return the coordinates of the state in the grid. ''' return (s // self.size, s % self.size) def get_action_from_a(self, a: int): ''' Given an enumerated action a, return the name of the action as a string. ''' return GridWorld.ACTIONS_DICT.get(a) def state_transition(self, s: int, a: int) -> tuple[int, int]: ''' Given the current state s and action a, return the next state s' and reward r. Samples from p(s', r | s, a). ''' x, y = self.get_state_from_s(s) if s == 0 or s == 15: return s, 0 if a == 0: x = max(0, x - 1) elif a == 1: y = min(self.size - 1, y + 1) elif a == 2: x = min(self.size - 1, x + 1) elif a == 3: y = max(0, y - 1) s_prime = x * self.size + y return s_prime, -1 def environment_model(self, s_new: int, r: int, s: int, a: int): # Deterministic environment # Returns the value of p(s', r | s, a). if self.state_transition(s, a) == (s_new, r): return 1 else: return 0 def policy_evaluation(self, num_sweeps: int = 0) -> np.ndarray: ''' Apply the Bellman equations for V and Q to estimate the value function at the current policy self.policy. ### Arguments #### Optional - `num_sweeps` (int, default = 0): number of iterations to run the policy evaluation for. If 0, run until convergence. ### Returns - `np.ndarray`: estimated value function V, where `V[s]` = V(s) ''' # initialize the state value function randomly self.V = np.random.random(self.n_states) # self.V[s] = V(s), the state value function self.Q = np.random.random(size=[self.n_states, self.n_actions]) # self.Q[s, a] = Q(s, a), the state-action value function self.V[0] = self.V[15] = 0 # set the value of the terminal states to 0 self.Q[0, :] = self.Q[15, :] = 0 # set the value of the terminal states to 0 sweep = 0 V_new = np.zeros(self.n_states) Q_new = np.zeros([self.n_states, self.n_actions]) while True: for s in range(self.n_states): if s == 0 or s == 15: pass # terminal states always have V(s) = 0 else: V_new[s] = sum(self.policy[s, a] *  sum(self.environment_model(s_prime, -1, s, a) * (-1 + self.gamma * self.V[s_prime])  for s_prime in range(self.n_states))  for a in range(self.n_actions)) for a in range(self.n_actions): Q_new[s, a] = sum(self.environment_model(s_prime, -1, s, a) * ( -1 + self.gamma * sum(self.policy[s_prime, a_prime] * self.Q[s_prime, a_prime]  for a_prime in range(self.n_actions)))  for s_prime in range(self.n_states)) sweep += 1 if (np.allclose(self.V, V_new) and np.allclose(self.Q, Q_new)) or sweep == num_sweeps: self.V = V_new self.Q = Q_new break else: self.V = V_new self.Q = Q_new def policy_improvement(self): ''' Update the policy to be greedy with respect to q(s, a). The new policy is deterministic rather than stochastic. ''' new_policy = np.zeros_like(self.policy) for s in range(self.n_states): a_opt = np.argmax(self.Q[s, :]) new_policy[s, a_opt] = 1 self.policy = new_policy def policy_iteration(self): ''' Perform policy iteration to find the optimal policy. ''' i = 0 new_policy = self.policy while True: self.policy_evaluation() # until convergence self.policy_improvement() if (self.policy == new_policy).all(): break else: new_policy = self.policy i += 1 print(f'Converged after {i} iterations.') def value_iteration(self): ''' Perform value iteration to find the optimal policy. ''' i = 0 new_policy = self.policy while True: self.policy_evaluation(num_sweeps=1) self.policy_improvement() if (self.policy == new_policy).all(): break else: new_policy = self.policy i += 1 print(f'Converged after {i} iterations.') 

Both policy iteration and value iteration converge to the optimal solution. However, it takes a long time.

grid = GridWorld() grid.value_iteration() # or use: grid.policy_iteration() print(grid.policy) 

I would think for this very simple situation, it should be a lot faster.

Policy iteration takes about 200 iterations (a few seconds). Value iteration takes about 10,000 iterations (around a minute).

Have I done something to make this code really inefficient? I’m not doing anything fancy here. Thanks for any advice!

submitted by /u/gitgud_x
[link] [comments]

​r/learnpython Hi, I’m studying reinforcement learning, closely following the textbook by Sutton and Barlow (more so the tutorial videos by Mutual Information here). I’ve tried to implement the simple example (from here) of an agent in a 4×4 gridworld. The agent can move within the grid (up/down/left/right) and two of the opposite diagonal squares are terminal states. Each step accrues reward -1, so the goal is to reach a terminal state as quickly as possible. My code that gets this done is below. import numpy as np # 16 different states arranged in a 4×4 square grid # Two opposite corners are terminal states # The reward for every transition is always -1 (we want to minimize the number of steps) class GridWorld: ”’ The states of the grid are enumerated as: [[ 0, 1, 2, 3], [ 4, 5, 6, 7], [ 8, 9, 10, 11], [12, 13, 14, 15]] The agent has 4 potential actions: – 0: up – 1: right – 2: down – 3: left unless the action would move the agent off the grid, in which case the agent remains in place. The agent receives a reward of -1 at each time step until it reaches a terminal state. There are two terminal states in the grid – states 0 and 15. ”’ ACTIONS_DICT = {0: ‘Up’, 1: ‘Right’, 2: ‘Down’, 3: ‘Left’} def __init__(self, size=4): self.size = size self.n_states = size * size self.n_actions = GridWorld.ACTIONS_DICT.__len__() # initialize a random policy – choose uniformly at random from the 4 actions in all states self.policy = np.ones(shape=[self.n_states, self.n_actions]) / self.n_actions # self.policy[s, a] = π(a | s) self.gamma = 1 # discount factor – no discounting def get_state_from_s(self, s: int): ”’ Given an enumerated state s, return the coordinates of the state in the grid. ”’ return (s // self.size, s % self.size) def get_action_from_a(self, a: int): ”’ Given an enumerated action a, return the name of the action as a string. ”’ return GridWorld.ACTIONS_DICT.get(a) def state_transition(self, s: int, a: int) -> tuple[int, int]: ”’ Given the current state s and action a, return the next state s’ and reward r. Samples from p(s’, r | s, a). ”’ x, y = self.get_state_from_s(s) if s == 0 or s == 15: return s, 0 if a == 0: x = max(0, x – 1) elif a == 1: y = min(self.size – 1, y + 1) elif a == 2: x = min(self.size – 1, x + 1) elif a == 3: y = max(0, y – 1) s_prime = x * self.size + y return s_prime, -1 def environment_model(self, s_new: int, r: int, s: int, a: int): # Deterministic environment # Returns the value of p(s’, r | s, a). if self.state_transition(s, a) == (s_new, r): return 1 else: return 0 def policy_evaluation(self, num_sweeps: int = 0) -> np.ndarray: ”’ Apply the Bellman equations for V and Q to estimate the value function at the current policy self.policy. ### Arguments #### Optional – `num_sweeps` (int, default = 0): number of iterations to run the policy evaluation for. If 0, run until convergence. ### Returns – `np.ndarray`: estimated value function V, where `V[s]` = V(s) ”’ # initialize the state value function randomly self.V = np.random.random(self.n_states) # self.V[s] = V(s), the state value function self.Q = np.random.random(size=[self.n_states, self.n_actions]) # self.Q[s, a] = Q(s, a), the state-action value function self.V[0] = self.V[15] = 0 # set the value of the terminal states to 0 self.Q[0, :] = self.Q[15, :] = 0 # set the value of the terminal states to 0 sweep = 0 V_new = np.zeros(self.n_states) Q_new = np.zeros([self.n_states, self.n_actions]) while True: for s in range(self.n_states): if s == 0 or s == 15: pass # terminal states always have V(s) = 0 else: V_new[s] = sum(self.policy[s, a] * sum(self.environment_model(s_prime, -1, s, a) * (-1 + self.gamma * self.V[s_prime]) for s_prime in range(self.n_states)) for a in range(self.n_actions)) for a in range(self.n_actions): Q_new[s, a] = sum(self.environment_model(s_prime, -1, s, a) * ( -1 + self.gamma * sum(self.policy[s_prime, a_prime] * self.Q[s_prime, a_prime] for a_prime in range(self.n_actions))) for s_prime in range(self.n_states)) sweep += 1 if (np.allclose(self.V, V_new) and np.allclose(self.Q, Q_new)) or sweep == num_sweeps: self.V = V_new self.Q = Q_new break else: self.V = V_new self.Q = Q_new def policy_improvement(self): ”’ Update the policy to be greedy with respect to q(s, a). The new policy is deterministic rather than stochastic. ”’ new_policy = np.zeros_like(self.policy) for s in range(self.n_states): a_opt = np.argmax(self.Q[s, :]) new_policy[s, a_opt] = 1 self.policy = new_policy def policy_iteration(self): ”’ Perform policy iteration to find the optimal policy. ”’ i = 0 new_policy = self.policy while True: self.policy_evaluation() # until convergence self.policy_improvement() if (self.policy == new_policy).all(): break else: new_policy = self.policy i += 1 print(f’Converged after {i} iterations.’) def value_iteration(self): ”’ Perform value iteration to find the optimal policy. ”’ i = 0 new_policy = self.policy while True: self.policy_evaluation(num_sweeps=1) self.policy_improvement() if (self.policy == new_policy).all(): break else: new_policy = self.policy i += 1 print(f’Converged after {i} iterations.’) Both policy iteration and value iteration converge to the optimal solution. However, it takes a long time. grid = GridWorld() grid.value_iteration() # or use: grid.policy_iteration() print(grid.policy) I would think for this very simple situation, it should be a lot faster. Policy iteration takes about 200 iterations (a few seconds). Value iteration takes about 10,000 iterations (around a minute). Have I done something to make this code really inefficient? I’m not doing anything fancy here. Thanks for any advice! submitted by /u/gitgud_x [link] [comments] 

Hi, I’m studying reinforcement learning, closely following the textbook by Sutton and Barlow (more so the tutorial videos by Mutual Information here).

I’ve tried to implement the simple example (from here) of an agent in a 4×4 gridworld. The agent can move within the grid (up/down/left/right) and two of the opposite diagonal squares are terminal states. Each step accrues reward -1, so the goal is to reach a terminal state as quickly as possible. My code that gets this done is below.

import numpy as np # 16 different states arranged in a 4x4 square grid # Two opposite corners are terminal states # The reward for every transition is always -1 (we want to minimize the number of steps) class GridWorld: ''' The states of the grid are enumerated as: [[ 0, 1, 2, 3], [ 4, 5, 6, 7], [ 8, 9, 10, 11], [12, 13, 14, 15]] The agent has 4 potential actions: - 0: up - 1: right - 2: down - 3: left unless the action would move the agent off the grid, in which case the agent remains in place. The agent receives a reward of -1 at each time step until it reaches a terminal state. There are two terminal states in the grid - states 0 and 15. ''' ACTIONS_DICT = {0: 'Up', 1: 'Right', 2: 'Down', 3: 'Left'} def __init__(self, size=4): self.size = size self.n_states = size * size self.n_actions = GridWorld.ACTIONS_DICT.__len__() # initialize a random policy - choose uniformly at random from the 4 actions in all states self.policy = np.ones(shape=[self.n_states, self.n_actions]) / self.n_actions # self.policy[s, a] = π(a | s) self.gamma = 1 # discount factor - no discounting def get_state_from_s(self, s: int): ''' Given an enumerated state s, return the coordinates of the state in the grid. ''' return (s // self.size, s % self.size) def get_action_from_a(self, a: int): ''' Given an enumerated action a, return the name of the action as a string. ''' return GridWorld.ACTIONS_DICT.get(a) def state_transition(self, s: int, a: int) -> tuple[int, int]: ''' Given the current state s and action a, return the next state s' and reward r. Samples from p(s', r | s, a). ''' x, y = self.get_state_from_s(s) if s == 0 or s == 15: return s, 0 if a == 0: x = max(0, x - 1) elif a == 1: y = min(self.size - 1, y + 1) elif a == 2: x = min(self.size - 1, x + 1) elif a == 3: y = max(0, y - 1) s_prime = x * self.size + y return s_prime, -1 def environment_model(self, s_new: int, r: int, s: int, a: int): # Deterministic environment # Returns the value of p(s', r | s, a). if self.state_transition(s, a) == (s_new, r): return 1 else: return 0 def policy_evaluation(self, num_sweeps: int = 0) -> np.ndarray: ''' Apply the Bellman equations for V and Q to estimate the value function at the current policy self.policy. ### Arguments #### Optional - `num_sweeps` (int, default = 0): number of iterations to run the policy evaluation for. If 0, run until convergence. ### Returns - `np.ndarray`: estimated value function V, where `V[s]` = V(s) ''' # initialize the state value function randomly self.V = np.random.random(self.n_states) # self.V[s] = V(s), the state value function self.Q = np.random.random(size=[self.n_states, self.n_actions]) # self.Q[s, a] = Q(s, a), the state-action value function self.V[0] = self.V[15] = 0 # set the value of the terminal states to 0 self.Q[0, :] = self.Q[15, :] = 0 # set the value of the terminal states to 0 sweep = 0 V_new = np.zeros(self.n_states) Q_new = np.zeros([self.n_states, self.n_actions]) while True: for s in range(self.n_states): if s == 0 or s == 15: pass # terminal states always have V(s) = 0 else: V_new[s] = sum(self.policy[s, a] *  sum(self.environment_model(s_prime, -1, s, a) * (-1 + self.gamma * self.V[s_prime])  for s_prime in range(self.n_states))  for a in range(self.n_actions)) for a in range(self.n_actions): Q_new[s, a] = sum(self.environment_model(s_prime, -1, s, a) * ( -1 + self.gamma * sum(self.policy[s_prime, a_prime] * self.Q[s_prime, a_prime]  for a_prime in range(self.n_actions)))  for s_prime in range(self.n_states)) sweep += 1 if (np.allclose(self.V, V_new) and np.allclose(self.Q, Q_new)) or sweep == num_sweeps: self.V = V_new self.Q = Q_new break else: self.V = V_new self.Q = Q_new def policy_improvement(self): ''' Update the policy to be greedy with respect to q(s, a). The new policy is deterministic rather than stochastic. ''' new_policy = np.zeros_like(self.policy) for s in range(self.n_states): a_opt = np.argmax(self.Q[s, :]) new_policy[s, a_opt] = 1 self.policy = new_policy def policy_iteration(self): ''' Perform policy iteration to find the optimal policy. ''' i = 0 new_policy = self.policy while True: self.policy_evaluation() # until convergence self.policy_improvement() if (self.policy == new_policy).all(): break else: new_policy = self.policy i += 1 print(f'Converged after {i} iterations.') def value_iteration(self): ''' Perform value iteration to find the optimal policy. ''' i = 0 new_policy = self.policy while True: self.policy_evaluation(num_sweeps=1) self.policy_improvement() if (self.policy == new_policy).all(): break else: new_policy = self.policy i += 1 print(f'Converged after {i} iterations.') 

Both policy iteration and value iteration converge to the optimal solution. However, it takes a long time.

grid = GridWorld() grid.value_iteration() # or use: grid.policy_iteration() print(grid.policy) 

I would think for this very simple situation, it should be a lot faster.

Policy iteration takes about 200 iterations (a few seconds). Value iteration takes about 10,000 iterations (around a minute).

Have I done something to make this code really inefficient? I’m not doing anything fancy here. Thanks for any advice!

submitted by /u/gitgud_x
[link] [comments] 

Leave a Reply

Your email address will not be published. Required fields are marked *