05-04-Planning by Dynamic Programming.pdf
import tkinter as tk
from tkinter import Button
import time
import numpy as np
from PIL import ImageTk, Image
PhotoImage = ImageTk.PhotoImage
UNIT = 100 # ν½μ
μ
HEIGHT = 5 # 그리λμλ μΈλ‘
WIDTH = 5 # 그리λμλ κ°λ‘
TRANSITION_PROB = 1
POSSIBLE_ACTIONS = [0, 1, 2, 3] # μ’, μ°, μ, ν
ACTIONS = [(-1, 0), (1, 0), (0, -1), (0, 1)] # μ’νλ‘ λνλΈ νλ
REWARDS = []
class GraphicDisplay(tk.Tk):
def __init__(self, agent):
super(GraphicDisplay, self).__init__()
self.title('Policy Iteration')
self.geometry('{0}x{1}'.format(HEIGHT * UNIT, HEIGHT * UNIT + 50))
self.texts = []
self.arrows = []
self.env = Env()
self.agent = agent
self.evaluation_count = 0
self.improvement_count = 0
self.is_moving = 0
(self.up, self.down, self.left, self.right), self.shapes = self.load_images()
self.canvas = self._build_canvas()
self.text_reward(2, 2, "R : 1.0")
self.text_reward(1, 2, "R : -1.0")
self.text_reward(2, 1, "R : -1.0")
def _build_canvas(self):
canvas = tk.Canvas(self, bg='white',
height=HEIGHT * UNIT,
width=WIDTH * UNIT)
# λ²νΌ μ΄κΈ°ν
iteration_button = Button(self, text="Evaluate",
command=self.evaluate_policy)
iteration_button.configure(width=10, activebackground="#33B5E5")
canvas.create_window(WIDTH * UNIT * 0.13, HEIGHT * UNIT + 10,
window=iteration_button)
policy_button = Button(self, text="Improve",
command=self.improve_policy)
policy_button.configure(width=10, activebackground="#33B5E5")
canvas.create_window(WIDTH * UNIT * 0.37, HEIGHT * UNIT + 10,
window=policy_button)
policy_button = Button(self, text="move", command=self.move_by_policy)
policy_button.configure(width=10, activebackground="#33B5E5")
canvas.create_window(WIDTH * UNIT * 0.62, HEIGHT * UNIT + 10,
window=policy_button)
policy_button = Button(self, text="reset", command=self.reset)
policy_button.configure(width=10, activebackground="#33B5E5")
canvas.create_window(WIDTH * UNIT * 0.87, HEIGHT * UNIT + 10,
window=policy_button)
# 그리λ μμ±
for col in range(0, WIDTH * UNIT, UNIT): # 0~400 by 80
x0, y0, x1, y1 = col, 0, col, HEIGHT * UNIT
canvas.create_line(x0, y0, x1, y1)
for row in range(0, HEIGHT * UNIT, UNIT): # 0~400 by 80
x0, y0, x1, y1 = 0, row, HEIGHT * UNIT, row
canvas.create_line(x0, y0, x1, y1)
# μΊλ²μ€μ μ΄λ―Έμ§ μΆκ°
self.rectangle = canvas.create_image(50, 50, image=self.shapes[0])
canvas.create_image(250, 150, image=self.shapes[1])
canvas.create_image(150, 250, image=self.shapes[1])
canvas.create_image(250, 250, image=self.shapes[2])
canvas.pack()
return canvas
def load_images(self):
up = PhotoImage(Image.open("../img/up.png").resize((13, 13)))
right = PhotoImage(Image.open("../img/right.png").resize((13, 13)))
left = PhotoImage(Image.open("../img/left.png").resize((13, 13)))
down = PhotoImage(Image.open("../img/down.png").resize((13, 13)))
rectangle = PhotoImage(Image.open("../img/rectangle.png").resize((65, 65)))
triangle = PhotoImage(Image.open("../img/triangle.png").resize((65, 65)))
circle = PhotoImage(Image.open("../img/circle.png").resize((65, 65)))
return (up, down, left, right), (rectangle, triangle, circle)
def reset(self):
if self.is_moving == 0:
self.evaluation_count = 0
self.improvement_count = 0
for i in self.texts:
self.canvas.delete(i)
for i in self.arrows:
self.canvas.delete(i)
self.agent.value_table = [[0.0] * WIDTH for _ in range(HEIGHT)]
self.agent.policy_table = ([[[0.25, 0.25, 0.25, 0.25]] * WIDTH
for _ in range(HEIGHT)])
self.agent.policy_table[2][2] = []
x, y = self.canvas.coords(self.rectangle)
self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)
def text_value(self, row, col, contents, font='Helvetica', size=10,
style='normal', anchor="nw"):
origin_x, origin_y = 85, 70
x, y = origin_y + (UNIT * col), origin_x + (UNIT * row)
font = (font, str(size), style)
text = self.canvas.create_text(x, y, fill="black", text=contents,
font=font, anchor=anchor)
return self.texts.append(text)
def text_reward(self, row, col, contents, font='Helvetica', size=10,
style='normal', anchor="nw"):
origin_x, origin_y = 5, 5
x, y = origin_y + (UNIT * col), origin_x + (UNIT * row)
font = (font, str(size), style)
text = self.canvas.create_text(x, y, fill="black", text=contents,
font=font, anchor=anchor)
return self.texts.append(text)
def rectangle_move(self, action):
base_action = np.array([0, 0])
location = self.find_rectangle()
self.render()
if action == 0 and location[0] > 0: # μ
base_action[1] -= UNIT
elif action == 1 and location[0] < HEIGHT - 1: # ν
base_action[1] += UNIT
elif action == 2 and location[1] > 0: # μ’
base_action[0] -= UNIT
elif action == 3 and location[1] < WIDTH - 1: # μ°
base_action[0] += UNIT
# move agent
self.canvas.move(self.rectangle, base_action[0], base_action[1])
def find_rectangle(self):
temp = self.canvas.coords(self.rectangle)
x = (temp[0] / 100) - 0.5
y = (temp[1] / 100) - 0.5
return int(y), int(x)
def move_by_policy(self):
if self.improvement_count != 0 and self.is_moving != 1:
self.is_moving = 1
x, y = self.canvas.coords(self.rectangle)
self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)
x, y = self.find_rectangle()
while len(self.agent.policy_table[x][y]) != 0:
self.after(100,
self.rectangle_move(self.agent.get_action([x, y])))
x, y = self.find_rectangle()
self.is_moving = 0
def draw_one_arrow(self, col, row, policy):
if col == 2 and row == 2:
return
if policy[0] > 0: # up
origin_x, origin_y = 50 + (UNIT * row), 10 + (UNIT * col)
self.arrows.append(self.canvas.create_image(origin_x, origin_y,
image=self.up))
if policy[1] > 0: # down
origin_x, origin_y = 50 + (UNIT * row), 90 + (UNIT * col)
self.arrows.append(self.canvas.create_image(origin_x, origin_y,
image=self.down))
if policy[2] > 0: # left
origin_x, origin_y = 10 + (UNIT * row), 50 + (UNIT * col)
self.arrows.append(self.canvas.create_image(origin_x, origin_y,
image=self.left))
if policy[3] > 0: # right
origin_x, origin_y = 90 + (UNIT * row), 50 + (UNIT * col)
self.arrows.append(self.canvas.create_image(origin_x, origin_y,
image=self.right))
def draw_from_policy(self, policy_table):
for i in range(HEIGHT):
for j in range(WIDTH):
self.draw_one_arrow(i, j, policy_table[i][j])
def print_value_table(self, value_table):
for i in range(WIDTH):
for j in range(HEIGHT):
self.text_value(i, j, value_table[i][j])
def render(self):
time.sleep(0.1)
self.canvas.tag_raise(self.rectangle)
self.update()
def evaluate_policy(self):
self.evaluation_count += 1
for i in self.texts:
self.canvas.delete(i)
self.agent.policy_evaluation()
self.print_value_table(self.agent.value_table)
def improve_policy(self):
self.improvement_count += 1
for i in self.arrows:
self.canvas.delete(i)
self.agent.policy_improvement()
self.draw_from_policy(self.agent.policy_table)
class Env:
def __init__(self):
self.transition_probability = TRANSITION_PROB
self.width = WIDTH
self.height = HEIGHT
self.reward = [[0] * WIDTH for _ in range(HEIGHT)]
self.possible_actions = POSSIBLE_ACTIONS
self.reward[2][2] = 1 # (2,2) μ’ν λκ·ΈλΌλ―Έ μμΉμ 보μ 1
self.reward[1][2] = -1 # (1,2) μ’ν μΈλͺ¨ μμΉμ 보μ -1
self.reward[2][1] = -1 # (2,1) μ’ν μΈλͺ¨ μμΉμ 보μ -1
self.all_state = []
for x in range(WIDTH):
for y in range(HEIGHT):
state = [x, y]
self.all_state.append(state)
def get_reward(self, state, action):
next_state = self.state_after_action(state, action)
return self.reward[next_state[0]][next_state[1]]
def state_after_action(self, state, action_index):
action = ACTIONS[action_index]
return self.check_boundary([state[0] + action[0], state[1] + action[1]])
@staticmethod
def check_boundary(state):
state[0] = (0 if state[0] < 0 else WIDTH - 1
if state[0] > WIDTH - 1 else state[0])
state[1] = (0 if state[1] < 0 else HEIGHT - 1
if state[1] > HEIGHT - 1 else state[1])
return state
def get_transition_prob(self, state, action):
return self.transition_probability
def get_all_states(self):
return self.all_state
import random
from environment import GraphicDisplay, Env
class PolicyIteration:
def __init__(self, env):
# νκ²½μ λν κ°μ²΄ μ μΈ
self.env = env
# κ°μΉν¨μλ₯Ό 2μ°¨μ 리μ€νΈλ‘ μ΄κΈ°ν
self.value_table = [[0.0] * env.width for _ in range(env.height)]
# μ ν μ’ μ° λμΌν νλ₯ λ‘ μ μ±
μ΄κΈ°ν
self.policy_table = [[[0.25, 0.25, 0.25, 0.25]] * env.width
for _ in range(env.height)]
# λ§μΉ¨ μνμ μ€μ
self.policy_table[2][2] = []
# κ°κ°μ¨
self.discount_factor = 0.9
def policy_evaluation(self):
# λ€μ κ°μΉν¨μ μ΄κΈ°ν
next_value_table = [[0.00] * self.env.width
for _ in range(self.env.height)]
# λͺ¨λ μνμ λν΄μ λ²¨λ§ κΈ°λλ°©μ μμ κ³μ°
for state in self.env.get_all_states():
value = 0.0
# λ§μΉ¨ μνμ κ°μΉ ν¨μ = 0
if state == [2, 2]:
next_value_table[state[0]][state[1]] = value
continue
# λ²¨λ§ κΈ°λ λ°©μ μ
for action in self.env.possible_actions:
next_state = self.env.state_after_action(state, action)
reward = self.env.get_reward(state, action)
next_value = self.get_value(next_state)
value += (self.get_policy(state)[action] *
(reward + self.discount_factor * next_value))
next_value_table[state[0]][state[1]] = round(value, 2)
self.value_table = next_value_table
# νμ¬ κ°μΉ ν¨μμ λν΄μ νμ μ μ±
λ°μ
def policy_improvement(self):
next_policy = self.policy_table
for state in self.env.get_all_states():
if state == [2, 2]:
continue
value = -99999
max_index = []
# λ°νν μ μ±
μ΄κΈ°ν
result = [0.0, 0.0, 0.0, 0.0]
# λͺ¨λ νλμ λν΄μ [보μ + (κ°κ°μ¨ * λ€μ μν κ°μΉν¨μ)] κ³μ°
for index, action in enumerate(self.env.possible_actions):
next_state = self.env.state_after_action(state, action)
reward = self.env.get_reward(state, action)
next_value = self.get_value(next_state)
temp = reward + self.discount_factor * next_value
# λ°μ 보μμ΄ μ΅λμΈ νλμ index(μ΅λκ° λ³΅μλΌλ©΄ λͺ¨λ)λ₯Ό μΆμΆ
if temp == value:
max_index.append(index)
elif temp > value:
value = temp
max_index.clear()
max_index.append(index)
# νλμ νλ₯ κ³μ°
prob = 1 / len(max_index)
for index in max_index:
result[index] = prob
next_policy[state[0]][state[1]] = result
self.policy_table = next_policy
# νΉμ μνμμ μ μ±
μ λ°λ₯Έ νλμ λ°ν
def get_action(self, state):
# 0 ~ 1 μ¬μ΄μ κ°μ 무μμλ‘ μΆμΆ
random_pick = random.randrange(100) / 100
policy = self.get_policy(state)
policy_sum = 0.0
# μ μ±
μ λ΄κΈ΄ νλ μ€μ 무μμλ‘ ν νλμ μΆμΆ
for index, value in enumerate(policy):
policy_sum += value
if random_pick < policy_sum:
return index
# μνμ λ°λ₯Έ μ μ±
λ°ν
def get_policy(self, state):
if state == [2, 2]:
return 0.0
return self.policy_table[state[0]][state[1]]
# κ°μΉ ν¨μμ κ°μ λ°ν
def get_value(self, state):
# μμ«μ λμ§Έ μ리κΉμ§λ§ κ³μ°
return round(self.value_table[state[0]][state[1]], 2)
if __name__ == "__main__":
env = Env()
policy_iteration = PolicyIteration(env)
grid_world = GraphicDisplay(policy_iteration)
grid_world.mainloop()