알고리즘
TensorFlow DQN Agent 구현
WDmil
2024. 6. 28. 03:01
728x90
import tensorflow as tf
import numpy as np
import random
import math
import os
"""# 설정값 지정"""
# 학습에 필요한 설정값들을 선언합니다.
epsilon = 1 # epsilon-Greedy 기법에 사용할 최초의 epsilon값
epsilonMinimumValue = 0.001 # epsilon의 최소값 (이 값 이하로 Decay하지 않습니다)
num_actions = 3 # 에이전트가 취할 수 있는 행동의 개수 - (좌로 움직이기, 가만히 있기, 우로 움직이기)
num_epochs = 2000 # 학습에 사용할 반복횟수
hidden_size = 128 # 히든레이어의 노드 개수
maxMemory = 500 # Replay Memory의 크기
batch_size = 50 # 학습에 사용할 배치 개수
gridSize = 10 # 에이전트가 플레이하는 게임 화면 크기 (10x10 grid)
state_size = gridSize * gridSize # 게임 환경의 현재상태 (10x10 grid)
discount = 0.9 # Discount Factor \gamma
learning_rate = 0.2 # 러닝 레이트
"""# DQN 클래스 정의"""
# s와 e사이의 랜덤한 값을 리턴하는 유틸리티 함수를 정의합니다.
def randf(s, e):
return (float(random.randrange(0, (e - s) * 9999)) / 10000) + s
def truncated_normal_intializer(stddev):
return tf.keras.initializers.TruncatedNormal(mean=0.0, stddev=stddev, seed=None)
# tf.keras.Model을 이용해서 DQN 모델을 정의합니다.
class DQN(tf.keras.Model):
def __init__(self):
super(DQN, self).__init__()
# 100(현재 상태 - 10x10 Grid) -> 128 -> 128 -> 3(예측된 각 행동의 Q값)
self.hidden_layer_1 = tf.keras.layers.Dense(hidden_size,
activation='relu',
kernel_initializer=truncated_normal_intializer(1.0 / math.sqrt(float(state_size))),
bias_initializer=truncated_normal_intializer(0.01))
self.hidden_layer_2 = tf.keras.layers.Dense(hidden_size,
activation='relu',
kernel_initializer=truncated_normal_intializer(1.0 / math.sqrt(float(hidden_size))),
bias_initializer=truncated_normal_intializer(0.01))
self.output_layer = tf.keras.layers.Dense(num_actions,
activation=None,
kernel_initializer=truncated_normal_intializer(1.0 / math.sqrt(float(hidden_size))),
bias_initializer=truncated_normal_intializer(0.01))
def call(self, x):
H1_output = self.hidden_layer_1(x)
H2_output = self.hidden_layer_2(H1_output)
output_layer = self.output_layer(H2_output)
return tf.squeeze(output_layer)
"""# 손실함수 정의"""
# MSE 손실 함수를 정의합니다.
def mse_loss(y_pred, y):
return tf.reduce_sum(tf.square(y-y_pred)) / (2*batch_size) # MSE 손실 함수
# 옵티마이저를 정의합니다.
optimizer = tf.optimizers.SGD(learning_rate)
# 최적화를 위한 function을 정의합니다.
def train_step(model, x, y):
with tf.GradientTape() as tape:
y_pred = model(x)
loss = mse_loss(y_pred, y)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# DQN 모델을 선언합니다.
DQN_model = DQN()
"""# CatchGame Environment 설정"""
# CatchGame을 수행하는 Environment를 구현합니다.
class CatchEnvironment():
# 상태의 초기값을 지정합니다.
def __init__(self, gridSize):
self.gridSize = gridSize
self.state_size = self.gridSize * self.gridSize
self.state = np.empty(3, dtype = np.uint8)
# 관찰 결과를 리턴합니다.
def observe(self):
canvas = self.drawState()
canvas = np.reshape(canvas, (-1,self.state_size))
return canvas.astype('float32')
# 현재 상태(fruit, basket)를 화면에 출력합니다.
def drawState(self):
canvas = np.zeros((self.gridSize, self.gridSize))
# fruit를 화면에 그립니다.
canvas[self.state[0]-1, self.state[1]-1] = 1
# basket을 화면에 그립니다.
canvas[self.gridSize-1, self.state[2] -1 - 1] = 1
canvas[self.gridSize-1, self.state[2] -1] = 1
canvas[self.gridSize-1, self.state[2] -1 + 1] = 1
return canvas
# 게임을 초기 상태로 리셋합니다.
def reset(self):
initialFruitColumn = random.randrange(1, self.gridSize + 1)
initialBucketPosition = random.randrange(2, self.gridSize + 1 - 1)
self.state = np.array([1, initialFruitColumn, initialBucketPosition])
return self.getState()
# 현재 상태를 불러옵니다.
def getState(self):
stateInfo = self.state
fruit_row = stateInfo[0]
fruit_col = stateInfo[1]
basket = stateInfo[2]
return fruit_row, fruit_col, basket
# 에이전트가 취한 행동에 대한 보상을 줍니다.
def getReward(self):
fruitRow, fruitColumn, basket = self.getState()
# 만약 fruit가 바닥에 닿았을 때
if (fruitRow == self.gridSize - 1):
# basket이 fruit을 받아내면 1의 reward를 줍니다.
if (abs(fruitColumn - basket) <= 1):
return 1
# fruit를 받아내지 못하면 -1의 reward를 줍니다.
else:
return -1
# fruit가 바닥에 닿지 않은 중립적인 상태는 0의 reward를 줍니다.
else:
return 0
# 게임이 끝났는지를 체크합니다.(fruit가 바닥에 닿으면 한게임이 종료됩니다.)
def isGameOver(self):
if (self.state[0] == self.gridSize - 1):
return True
else:
return False
# action(좌로 한칸 이동, 제자리, 우로 한칸이동)에 따라 basket의 위치를 업데이트합니다.
def updateState(self, action):
move = 0
if (action == 0):
move = -1
elif (action == 1):
move = 0
elif (action == 2):
move = 1
fruitRow, fruitColumn, basket = self.getState()
newBasket = min(max(2, basket + move), self.gridSize - 1) # min/max는 basket이 grid밖으로 벗어나는것을 방지합니다.
fruitRow = fruitRow + 1 # fruit는 매 행동을 취할때마다 1칸씩 아래로 떨어집니다.
self.state = np.array([fruitRow, fruitColumn, newBasket])
# 행동을 취합니다. 0 : 왼쪽으로 이동, 1 : 가만히 있기, 2 : 오른쪽으로 이동
def act(self, action):
self.updateState(action)
reward = self.getReward()
gameOver = self.isGameOver()
return self.observe(), reward, gameOver, self.getState()
"""# Replay Memory 설정"""
# Replay Memory를 class로 정의합니다.
class ReplayMemory:
def __init__(self, gridSize, maxMemory, discount):
self.maxMemory = maxMemory
self.gridSize = gridSize
self.state_size = self.gridSize * self.gridSize
self.discount = discount
self.inputState = np.empty((self.maxMemory, 100), dtype = np.float32)
self.actions = np.zeros(self.maxMemory, dtype = np.uint8)
self.nextState = np.empty((self.maxMemory, 100), dtype = np.float32)
self.gameOver = np.empty(self.maxMemory, dtype = np.bool)
self.rewards = np.empty(self.maxMemory, dtype = np.int8)
self.count = 0
self.current = 0
# 경험을 Replay Memory에 저장합니다.
def remember(self, currentState, action, reward, nextState, gameOver):
self.actions[self.current] = action
self.rewards[self.current] = reward
self.inputState[self.current, ...] = currentState
self.nextState[self.current, ...] = nextState
self.gameOver[self.current] = gameOver
self.count = max(self.count, self.current + 1)
self.current = (self.current + 1) % self.maxMemory
def getBatch(self, DQN_model, batch_size, num_actions, state_size):
# 취할 수 있는 가장 큰 배치 사이즈를 선택합니다. (학습 초기에는 batch_size만큼의 기억이 없습니다.)
memoryLength = self.count
chosenBatchSize = min(batch_size, memoryLength)
# 인풋 데이터와 타겟데이터를 선언합니다.
inputs = np.zeros((chosenBatchSize, state_size))
targets = np.zeros((chosenBatchSize, num_actions))
# 배치안의 값을 설정합니다.
for i in range(chosenBatchSize):
# 배치에 포함될 기억을 랜덤으로 선택합니다.
randomIndex = random.randrange(0, memoryLength)
# 현재 상태와 Q값을 불러옵니다.
current_inputState = np.reshape(self.inputState[randomIndex], (1, 100))
target = DQN_model(current_inputState).numpy()
# 현재 상태 바로 다음 상태를 불러오고 다음 상태에서 취할수 있는 가장 큰 Q값을 계산합니다.
current_nextState = np.reshape(self.nextState[randomIndex], (1, 100))
nextStateQ = DQN_model(current_nextState).numpy()
nextStateMaxQ = np.amax(nextStateQ)
# 만약 게임오버라면 reward로 Q값을 업데이트하고
if (self.gameOver[randomIndex] == True):
target[self.actions[randomIndex]] = self.rewards[randomIndex]
# 게임오버가 아니라면 타겟 Q값(최적의 Q값)을 아래 수식을 이용해서 계산합니다.
# Q* = reward + discount(gamma) * max_a' Q(s',a')
else:
target[self.actions[randomIndex]] = self.rewards[randomIndex] + self.discount * nextStateMaxQ
# 인풋과 타겟 데이터에 값을 지정합니다.
inputs[i] = current_inputState
targets[i] = target
return inputs.astype('float32'), targets.astype('float32')
print("트레이닝을 시작합니다.")
# 게임 플레이 환경을 선언합니다.
env = CatchEnvironment(gridSize)
# Replay Memory를 선언합니다.
memory = ReplayMemory(gridSize, maxMemory, discount)
winCount = 0
for i in range(num_epochs+1):
# 환경을 초기화합니다.
err = 0
env.reset()
isGameOver = False
# 최초의 상태를 불러옵니다.
currentState = env.observe()
while (isGameOver != True):
action = -9999 # Q값을 초기화합니다.
# epsilon-Greedy 기법에 따라 랜덤한 행동을 할지 최적의 행동을 할지를 결정합니다.
epsilon
if (randf(0, 1) <= epsilon):
# epsilon 확률만큼 랜덤한 행동을 합니다.
action = random.randrange(0, num_actions)
else:
# (1-epsilon) 확률만큼 최적의 행동을 합니다.
# 현재 상태를 DQN의 인풋으로 넣어서 예측된 최적의 Q(s,a)값들을 리턴받습니다.
q = DQN_model(currentState).numpy()
# Q(s,a)가 가장 높은 행동을 선택합니다.
action = q.argmax()
# epsilon값을 0.9999만큼 Decay합니다.
if (epsilon > epsilonMinimumValue):
epsilon = epsilon * 0.999
# 에이전트가 행동을 하고 다음 보상과 다음 상태에 대한 정보를 리턴 받습니다.
nextState, reward, gameOver, stateInfo = env.act(action)
# 만약 과일을 제대로 받아냈다면 승리 횟수를 1 올립니다.
if (reward == 1):
winCount = winCount + 1
# 에이전트가 수집한 정보를 Replay Memory에 저장합니다.
memory.remember(currentState, action, reward, nextState, gameOver)
# 현재 상태를 다음 상태로 업데이트하고 GameOver유무를 체크합니다.
currentState = nextState
isGameOver = gameOver
# Replay Memory로부터 학습에 사용할 Batch 데이터를 불러옵니다.
inputs, targets = memory.getBatch(DQN_model, batch_size, num_actions, state_size)
# 최적화를 수행하고 손실함수를 리턴받습니다.
_, loss_print = train_step(DQN_model, inputs, targets), mse_loss(DQN_model(inputs), targets)
err = err + loss_print
print("반복(Epoch): %d, 에러(err): %.4f, 승리횟수(Win count): %d, 승리비율(Win ratio): %.4f" % (i, err, winCount, float(winCount)/float(i+1)*100))
# 학습이 모두 끝나면 파라미터를 지정된 경로에 저장합니다.
print("트레이닝 완료")
"""# 학습된 에이전트의 플레이결과 시각화하기"""
# Commented out IPython magic to ensure Python compatibility.
# %matplotlib
from IPython import display
import matplotlib.patches as patches
import pylab as pl
import time
import tensorflow as tf
import os
# 설정값들을 정의합니다.
gridSize = 10
maxGames = 30
env = CatchEnvironment(gridSize)
winCount = 0
loseCount = 0
numberOfGames = 0
# 화면을 그리기 위한 설정들을 정의합니다.
ground = 1
plot = pl.figure(figsize=(12,12))
axis = plot.add_subplot(111, aspect='equal')
axis.set_xlim([-1, 12])
axis.set_ylim([0, 12])
# 현재 상태를 그리기 위한 drawState 함수를 정의합니다.
def drawState(fruitRow, fruitColumn, basket, gridSize):
# 과일이 몇번째 세로축에 있는지 정의합니다.
fruitX = fruitColumn
# 과일이 몇번째 가로축에 있는지 정의합니다.
fruitY = (gridSize - fruitRow + 1)
# 승리 횟수, 패배 횟수, 전체 게임 횟수를 화면 상단에 출력합니다.
statusTitle = "Wins: " + str(winCount) + " Losses: " + str(loseCount) + " TotalGame: " + str(numberOfGames)
axis.set_title(statusTitle, fontsize=30)
for p in [
# 배경의 위치를 지정합니다.
patches.Rectangle(
((ground - 1), (ground)), 11, 10,
facecolor="#000000" # Black
),
# 바구니의 위치를 지정합니다.
patches.Rectangle(
(basket - 1, ground), 2, 0.5,
facecolor="#FF0000" # Red
),
# 과일의 위치를 지정합니다.
patches.Rectangle(
(fruitX - 0.5, fruitY - 0.5), 1, 1,
facecolor="#0000FF" # Blue
),
]:
axis.add_patch(p)
display.clear_output(wait=True)
display.display(pl.gcf())
# maxGames 횟수만큼 게임을 플레이합니다.
while (numberOfGames < maxGames):
numberOfGames = numberOfGames + 1
# 최초의 상태를 정의합니다.
isGameOver = False
fruitRow, fruitColumn, basket = env.reset()
currentState = env.observe()
drawState(fruitRow, fruitColumn, basket, gridSize)
while (isGameOver != True):
# 현재 상태를 DQN의 입력값으로 넣고 구한 Q값중 가장 큰 Q값을 갖는 행동을 취합니다.
q = DQN_model(currentState).numpy()
action = q.argmax()
# 행동을 취하고 다음 상태로 넘어갑니다.
nextState, reward, gameOver, stateInfo = env.act(action)
fruitRow = stateInfo[0]
fruitColumn = stateInfo[1]
basket = stateInfo[2]
# 과일을 받아내면 winCount를 1 늘리고 과일을 받아내지 못하면 loseCount를 1 늘립니다.
if (reward == 1):
winCount = winCount + 1
elif (reward == -1):
loseCount = loseCount + 1
currentState = nextState
isGameOver = gameOver
drawState(fruitRow, fruitColumn, basket, gridSize)
# 다음 행동을 취하기 전에 0.05초의 일시정지를 줍니다.
time.sleep(0.05)
# 최종 출력결과 이미지를 하나로 정리합니다.
display.clear_output(wait=True)
https://colab.research.google.com/drive/1m3OxeUCt3XthHJDg3iZ2Rma6PpGXfIAx?usp=sharing
728x90