Kaiのプログラミング本レビュー

主にプログラミングの本のレビューをします

強化学習 Q-learningで迷路探索アルゴリズム

強化学習のQ-learningで迷路探索を行います。
Pythonソースコードはこちら。

import numpy as np
import matplotlib.pyplot as plt
import copy

def main():
	global is_end_episode
	global rewards

	for episode in range(NB_EPISODE):
		episode_reward = []
		while(is_end_episode == False):
			action = act_func()
			state_return, reward, is_end_episode = step_func(action)
			observe_func(state_return, reward)
			episode_reward.append(reward)
		rewards.append(np.sum(episode_reward))
		state_return2 = reset_state_func()
		observe_func(state_return2)
		is_end_episode = False

	# 結果のプロット
	plt.plot(np.arange(NB_EPISODE), rewards)
	plt.xlabel("episode")
	plt.ylabel("reward")
	plt.savefig("result.jpg")
	plt.show()

NB_EPISODE = 100
EPSILON = 0.1
ALPHA = 0.1
GAMMA = 0.9
ACTIONS = np.arange(4)

filed_type = {
	"N": 0,  # 通常
	"G": 1,  # ゴール
	"W": 2,  # 壁
	"D": 3,  # ダミー
}

actions_key = {
	"UP": 0,
	"DOWN": 1,
	"LEFT": 2,
	"RIGHT": 3
}

map_field =	[[3, 2, 0, 1],
			[ 0, 0, 0, 2],
			[ 2, 2, 0, 0],
			[ 0, 0, 0, 2],
			[ 0, 2, 0, 0]]

start_pos = 0, 4
agent_pos = copy.deepcopy(start_pos)

alpha = ALPHA
gamma = GAMMA
epsilon = EPSILON

actions = ACTIONS

previous_state = None
previous_action = None

state = start_pos

rewards = []

is_end_episode = False

def _init_q_values():
	global q_values
	global actions

	q_values = {}
	q_values[state] = np.repeat(0.0, len(actions))
	return q_values

q_values = _init_q_values()

def act_func():
	global epsilon
	global q_values
	global state
	global previous_action

	if np.random.uniform() < epsilon:
		action = np.random.randint(0, len(q_values[state]))
	else:
		action = np.argmax(q_values[state])

	previous_action = action
	return action

def step_func(action):
	global agent_pos
	global actions_key

	to_x, to_y = copy.deepcopy(agent_pos)

	if _is_possible_action(to_x, to_y, action) == False:
		return agent_pos, -1, False

	if action == actions_key["UP"]:
		to_y += -1
	elif action == actions_key["DOWN"]:
		to_y += 1
	elif action == actions_key["LEFT"]:
		to_x += -1
	elif action == actions_key["RIGHT"]:
		to_x += 1

	is_goal = _is_end_episode(to_x, to_y)
	reward = _compute_reward(to_x, to_y)
	agent_pos = to_x, to_y
	return agent_pos, reward, is_goal

def _is_possible_action(x, y, action):
	global actions_key
	global map_field

	to_x = x
	to_y = y

	if action == actions_key["UP"]:
		to_y += -1
	elif action == actions_key["DOWN"]:
		to_y += 1
	elif action == actions_key["LEFT"]:
		to_x += -1
	elif action == actions_key["RIGHT"]:
		to_x += 1

	if len(map_field) <= to_y or 0 > to_y:
		return False
	elif len(map_field[0]) <= to_x or 0 > to_x:
		return False
	elif _is_wall(to_x, to_y):
		return False

	return True

def _is_end_episode(x,y):
	global map_field
	global field_type

	if map_field[y][x] == filed_type["G"]:
		return True
	elif map_field[y][x] == filed_type["D"]:
		return True
	else:
		return False

def _compute_reward(x, y):
	global map_field
	global field_type

	if map_field[y][x] == filed_type["N"]:
		return 0
	elif map_field[y][x] == filed_type["G"]:
		return 100
	elif map_field[y][x] == filed_type["D"]:
		return -100

def _is_wall(x, y):
	global map_field
	global field_type

	if map_field[y][x] == filed_type["W"]:
		return True
	else:
		return False

def observe_func(next_state, reward=None):
	global q_values
	global actions
	global previous_state
	global state

	next_state = str(next_state)
	if next_state not in q_values:
		q_values[next_state] = np.repeat(0.0, len(actions))

	previous_state = copy.deepcopy(state)
	state = next_state

	if reward is not None:
		learn(reward)

def learn(reward):
	global q_values
	global previous_state
	global previous_action
	global state
	global alpha
	global gamma

	q = q_values[previous_state][previous_action]  # Q(s, a)
	max_q = max(q_values[state])  # max Q(s')
	# Q(s, a) = Q(s, a) + alpha*(r+gamma*maxQ(s')-Q(s, a))
	q_values[previous_state][previous_action] = q + (alpha * (reward + (gamma * max_q) - q))

def reset_state_func():
	global agent_pos
	global start_pos

	agent_pos = start_pos
	return start_pos

if __name__ == '__main__':
	main()

今回エージェントが探索する迷路はこちら。

f:id:snow4726:20220329211231j:plain

探索結果がこちら。
横軸が何回迷路をスタートからゴールまで繰り返して探索したか、縦軸が報酬値。
回数を繰り返す毎に多くの報酬が貰えてる事がわかる。

f:id:snow4726:20220329211348j:plain

以下、ソースコードの解説をします。
興味のある方はどうぞ。

def main():
	global is_end_episode
	global rewards

	for episode in range(NB_EPISODE):
		episode_reward = []
		while(is_end_episode == False):
			action = act_func()
			state_return, reward, is_end_episode = step_func(action)
			observe_func(state_return, reward)
			episode_reward.append(reward)
		rewards.append(np.sum(episode_reward))
		state_return2 = reset_state_func()
		observe_func(state_return2)
		is_end_episode = False

	# 結果のプロット
	plt.plot(np.arange(NB_EPISODE), rewards)
	plt.xlabel("episode")
	plt.ylabel("reward")
	plt.savefig("result.jpg")
	plt.show()

メイン関数です。
for文でepisode(今回は100episode)を繰り返します。
まずepisode_reward(1episodeの経路毎の報酬値)を初期化します。
その後while文でスタートからゴールに辿り着くまでを繰り返します。
最初にact_func()関数で上下左右どの方向に決めてその値をaction変数に返します。
次にstep_func()関数でマス目を進めて、次のマス目の座標、次のマス目の報酬値、ゴールしてるかのフラグを返します。
observe_func()関数で現在位置と過去位置の更新を行った後にQ-valueの値を更新します。
最後にepisode_reward配列の末尾にマス目の報酬値を追加してwhile文の一周を終えます。
while文を抜けたら1経路分の報酬値の合計値をrewards配列の末尾に追加します。
reset_state_func()関数でスタート位置を返してobserve_func()関数で現在位置をスタート位置に更新します。
is_end_episodeフラグ(ゴールしたかのフラグ)をFalse(ゴールしてない)に戻します。
これがfor文の繰り返し内容になります。
for文を抜けたらグラフ表示を行い、横軸がepisode数、縦軸が報酬値となります。
ついでにカレントディレクトリにjpgファイルを保存してます。

NB_EPISODE = 100
EPSILON = 0.1
ALPHA = 0.1
GAMMA = 0.9
ACTIONS = np.arange(4)

filed_type = {
	"N": 0,  # 通常
	"G": 1,  # ゴール
	"W": 2,  # 壁
	"D": 3,  # ダミー
}

actions_key = {
	"UP": 0,
	"DOWN": 1,
	"LEFT": 2,
	"RIGHT": 3
}

map_field =	[[3, 2, 0, 1],
			[ 0, 0, 0, 2],
			[ 2, 2, 0, 0],
			[ 0, 0, 0, 2],
			[ 0, 2, 0, 0]]

start_pos = 0, 4
agent_pos = copy.deepcopy(start_pos)

alpha = ALPHA
gamma = GAMMA
epsilon = EPSILON

actions = ACTIONS

previous_state = None
previous_action = None

state = start_pos

rewards = []

is_end_episode = False

NB_EPISODEはepisode数、EPSILONはε-greedy法の行動決定率(後述)、ALPHAは学習率(0~1)(後述)、GAMMAは割引率(0~1)(後述)、ACTIONSは上下左右の要素数(4)、field_typeは迷路の経路, ゴール, 壁, ダミーゴールの判別、actions_keyは上下左右を表す、map_fieldは迷路配列、start_posはスタート位置、agent_posはエージェントの現在位置、alpha,gamma,epsilon,actionsに定数値を代入、previous_stateは過去(一つ前)の座標、previous_actionは過去(一つ前)の移動方向値、stateは移動後座標、rewards配列は報酬値保存用配列、is_end_episodeはゴールしたかどうかのフラグです。

def _init_q_values():
	global q_values
	global actions

	q_values = {}
	q_values[state] = np.repeat(0.0, len(actions))
	return q_values

q_values = _init_q_values()

_init_q_values()関数の処理です。
q_values配列のstate座標(スタート座標)を上下左右の4要素を0.0で初期化しておきます。
q_values変数にあらかじめ代入しておきます。

def act_func():
	global epsilon
	global q_values
	global state
	global previous_action

	if np.random.uniform() < epsilon:
		action = np.random.randint(0, len(q_values[state]))
	else:
		action = np.argmax(q_values[state])

	previous_action = action
	return action

act_func()関数ですε-greedy法に則ってepsilon(ε)を確率とする(0~1までの値)値で上下左右のランダム行動かもしくはQ-value値による上下左右の最大のQ-value値の選択行動か選びます。今回はepsilonを0.1に設定してるので10%の確率でランダム行動、90%でQ-value値のmax値を選択する行動になります。previous_action変数に移動方向を保存してaction変数(移動方向)を返します。

def step_func(action):
	global agent_pos
	global actions_key

	to_x, to_y = copy.deepcopy(agent_pos)

	if _is_possible_action(to_x, to_y, action) == False:
		return agent_pos, -1, False

	if action == actions_key["UP"]:
		to_y += -1
	elif action == actions_key["DOWN"]:
		to_y += 1
	elif action == actions_key["LEFT"]:
		to_x += -1
	elif action == actions_key["RIGHT"]:
		to_x += 1

	is_goal = _is_end_episode(to_x, to_y)
	reward = _compute_reward(to_x, to_y)
	agent_pos = to_x, to_y
	return agent_pos, reward, is_goal

step_func()関数の処理です。
現在のエージェントの位置のx座標、y座標を求めます。
_is_possible_action()関数で壁や迷路の枠を越えているかを判定します。
移動不可能の場合は現在位置を次の位置、報酬を-1、ゴールしてないとフラグを設定してその3つを返戻値として返します。
action変数が上下左右か確認して次の座標位置を定めます。
_is_end_episode()関数でゴールしてるかの判定を行い、_compute_reward()関数で報酬値を得て、エージェントの位置を次の位置にしてその3つを返戻値として返します。

def _is_possible_action(x, y, action):
	global actions_key
	global map_field

	to_x = x
	to_y = y

	if action == actions_key["UP"]:
		to_y += -1
	elif action == actions_key["DOWN"]:
		to_y += 1
	elif action == actions_key["LEFT"]:
		to_x += -1
	elif action == actions_key["RIGHT"]:
		to_x += 1

	if len(map_field) <= to_y or 0 > to_y:
		return False
	elif len(map_field[0]) <= to_x or 0 > to_x:
		return False
	elif _is_wall(to_x, to_y):
		return False

	return True

_is_possible_action()関数の処理です。
次の位置が壁かマップの外かを判定して真偽値を返します。

def _is_end_episode(x,y):
	global map_field
	global field_type

	if map_field[y][x] == filed_type["G"]:
		return True
	elif map_field[y][x] == filed_type["D"]:
		return True
	else:
		return False

_is_end_episode()関数の処理です。
ゴールしてるかの判定を行います。
ダミーゴールでもゴールしたものとみなします。

def _compute_reward(x, y):
	global map_field
	global field_type

	if map_field[y][x] == filed_type["N"]:
		return 0
	elif map_field[y][x] == filed_type["G"]:
		return 100
	elif map_field[y][x] == filed_type["D"]:
		return -100

_compute_reward()関数の処理です。
指定したマス目の報酬値を返します。
通常なら0、ゴールなら100、ダミーゴールなら-100を返します。

def _is_wall(x, y):
	global map_field
	global field_type

	if map_field[y][x] == filed_type["W"]:
		return True
	else:
		return False

_is_wall()関数の処理です。
指定したマス目が壁かどうかの判定を行います。

def observe_func(next_state, reward=None):
	global q_values
	global actions
	global previous_state
	global state

	next_state = str(next_state)
	if next_state not in q_values:
		q_values[next_state] = np.repeat(0.0, len(actions))

	previous_state = copy.deepcopy(state)
	state = next_state

	if reward is not None:
		learn(reward)

observe_func()関数の処理です。
次に進む位置のq_value配列の要素が存在しなければ要素を追加して、エージェントの現在座標、過去座標の更新を行い、報酬値(reward)がNoneじゃなければQ学習によるQ-valueの更新を行います。

def learn(reward):
	global q_values
	global previous_state
	global previous_action
	global state
	global alpha
	global gamma

	q = q_values[previous_state][previous_action]  # Q(s, a)
	max_q = max(q_values[state])  # max Q(s')
	# Q(s, a) = Q(s, a) + alpha*(r+gamma*maxQ(s')-Q(s, a))
	q_values[previous_state][previous_action] = q + (alpha * (reward + (gamma * max_q) - q))

learn()関数の処理です。
公式に従ってQ-valueの更新を行います。
Q-valueの更新の公式は以下です。

f:id:snow4726:20220404163013j:plain

def reset_state_func():
	global agent_pos
	global start_pos

	agent_pos = start_pos
	return start_pos

reset_state_func()関数の処理です。
エージェントの位置を初期位置に戻して返します。

if __name__ == '__main__':
	main()

main()関数を実行します。

以上がQ-learningにおける迷路探索の強化学習アルゴリズムの説明になります。

こちらの書籍もどうぞ。