2023.6.26 Q学習による迷路探索
2023.6.27 修正
fig1で出力するグラフのタテヨコを入れ替えた。
マップを複雑にしてみた。
2023.6.27 バグ取り
Env.stepの戻り値に「移動できたか」のチェックを追加
main.pyのメインループにて、「移動できた」場合のみpathを追加することで、壁に阻まれて足踏みした場合を除外した。
初stepにおいて、start_posをpathに追加することで、スタートからの経路切れをなくした。
使い方
2つのファイルから構成されている。同じフォルダに格納し、main.pyを実行してください。
main.py
myQL.py
分かり易さのために、Q学習のメインルーチンと必要最小限の前/後処理のみmainに置いた。
全ての結果をresultフォルダ以下に保存する。特に、fig1_pathには
障害物を黒い四角
スタートを青丸
ゴールを赤丸
全てのパスを色付きの線
最短経路を青(線+X)
でプロットしている。最短経路については全PATHのlenを調べてその最小値を採用している。つまり最初に発見した最短経路を採用していることに注意。
ファイル名:main.py
code:python
import numpy, os, sys
import matplotlib.pyplot as plt
import myQL
fp1, dirname, filename = myQL.get_outputs()
args_env, args_agent, args_qlearn = myQL.get_args()
env = myQL.Env(args_env)
agent = myQL.Agent(args_agent)
# sys.exit()
print('args_env:', args_env)
print('args_env: ', args_env, file=fp1)
print('args_agent:', args_agent)
print('args_agent:', args_agent, file=fp1)
### Q学習の本体部分
EPISODES = args_qlearn'EPISODES'
STEPS = args_qlearn'STEPS'
mapdata = args_env'MAPDATA'
start_pos = args_env'START_POS'
goal_pos = args_env'GOAL_POS'
is_moved, is_goal = False, False
episode_path = []
episode_step = []
episode_reward_sum = []
for episode in range(EPISODES):
env.reset()
step_path = start_pos
reward_sum = 0
for step in range(STEPS):
action = agent.get_act() # 行動選択
state, reward, is_moved, is_goal = env.step(action) # 行動
if is_moved == True:
step_path.append(env.agent_pos)
reward_sum += reward
agent.observe(state, reward)
if is_goal:
break
is_end_episode = False
episode_path.append(numpy.array(step_path))
episode_step.append(step)
episode_reward_sum.append(reward_sum)
### 計算終了、あとは結果の出力
args_result = {}
args_result'MAPDATA' = mapdata
args_result'EPISODE_PATH' = episode_path
args_result'EPISODE_STEP' = episode_step
args_result'EPISODE_REWARD_SUM' = episode_reward_sum
args_result'GOAL_POS' = goal_pos
args_result'START_POS' = start_pos
args_result'DIRNAME' = dirname
args_result'AGENT' = agent
args_result'FP1' = fp1
args_result'DIRNAME' = dirname
myQL.show_result(args_result)
fp1.close()
ファイル名:myQL.py
code:python
import numpy, datetime
import matplotlib.pyplot as plt
import os, sys
def get_args():
# agent_pos_init, goal_pos, map はリスト型で与える。
# mapは予約語なのでmapdataとした
start_pos = 0, 0
goal_pos = 7, 11
mapdata = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 0,
0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0,
0, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1,
0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0,
0, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0,
0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0,
0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0,
0, 1, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0,
]
args_env = {}
args_env'START_POS' = numpy.array(start_pos)
args_env'GOAL_POS' = numpy.array(goal_pos)
args_env'MAPDATA' = numpy.array(mapdata)
args_env'FILEDS' = {'FLOOR':0, 'GOAL':2, 'WALL':1}
args_env'ACTIONS' = {'NORTH':0, 'SOUTH':1, 'WEST':2, 'EAST':3}
args_env'MOVE' = {0:-1, 0, 1: 1, 0, 2:0, -1, 3:0, 1}
args_env'REWARD_GOAL' = 1000
args_env'REWARD_WALL' = -1000
args_agent = {}
args_agent'ALPHA' = 0.3
args_agent'GAMMA' = 0.9
args_agent'EPSILON' = 0.3
args_agent'ACTIONS' = numpy.array(0, 1, 2, 3)
args_agent'START_POS' = numpy.array(start_pos)
args_qlearn ={}
args_qlearn'EPISODES' = 1000
args_qlearn'STEPS' = 1000
return args_env, args_agent, args_qlearn
#################################################################
class Env:
def __init__(self, args):
self.start_pos = args'START_POS'
self.goal_pos = args'GOAL_POS'
self.fields = args'FILEDS'
self.actions = args'ACTIONS'
self.mapdata = args'MAPDATA'
self.mapdata[self.goal_pos0][self.goal_pos1] = self.fields'GOAL' # Goalの設定
self.agent_pos = self.start_pos
self.move = args'MOVE'
self.reward_goal = args'REWARD_GOAL'
self.reward_wall = args'REWARD_WALL'
for key, data in self.move.items():
self.movekey = numpy.array(data) # dataはlistで受けていると仮定
for i in self.mapdata:
for j in i:
print(j, end=' ')
print()
def step(self, action):
# agent_pos_to :移動先
agent_pos_to = self.agent_pos.copy() + self.moveaction
# if action not in 0, 1, 2, 3:
# print('NASHI!!')
if self._is_possible_action(agent_pos_to) == False:
# 移動できなかった
is_moved, is_goal = False, False
return self.agent_pos, self.reward_wall, is_moved, is_goal
reward = self._get_reward(agent_pos_to) # 移動先の報酬を求める
self.agent_pos = agent_pos_to # 移動する
is_moved = True
is_goal = self._is_reach_goal() # 移動先はゴールかを判定
return self.agent_pos, reward, is_moved, is_goal
# posに移動できるかのチェック
def _is_possible_action(self, pos):
if self._is_inside_map(pos) == False:
return False # マップ外側のため移動「できない」
if self._is_wall(pos) == True:
return False # 壁のため移動「できない」
return True # 移動「できる」
# 指定した位置が壁ならTrue
def _is_wall(self, pos):
y, x = pos
# print(x, y)
if self.mapdatayx == self.fields'WALL':
return True
return False
# 指定した位置が地図内ならTrue
def _is_inside_map(self, pos):
if 0 <= pos0 < self.mapdata.shape0 and 0 <= pos1 < self.mapdata.shape1:
return True
return False
# ゴールに到達したらTrue
def _is_reach_goal(self):
# ndarray同士の比較だとBool, Boolを返しNGなので、タプル化して比較
if tuple(self.agent_pos) == tuple(self.goal_pos):
return True
return False
# 報酬を計算する、ゴールなら1000、床なら0、壁にはめり込まない前提
def _get_reward(self, pos):
if self.mapdata[pos0][pos1] == self.fields'GOAL':
# print('_get_reward: GOAL')
return self.reward_goal
if self.mapdata[pos0][pos1] == self.fields'WALL':
print('_get_reward: WALL')
return 0 # つまり、Floorの場合
def reset(self):
self.agent_pos = self.start_pos
#################################################################
class Agent:
def __init__(self, args):
self.alpha = args'ALPHA' # float
self.gamma = args'GAMMA' # float
self.epsilon = args'EPSILON' # float
self.actions = args'ACTIONS' # ndarray 0,1,2,3
self.state = str(args'START_POS')
# self.state_init = str(args'START_POS')
self.state_prev = None
self.action_prev = None
self.action_n = len(self.actions)
self.q_values = self._init_q_values()
def _init_q_values(self):
q_values = {}
q_valuesself.state = self._set_q_value_random()
return q_values # {''state'', Q0, Q1, Q2, Q3}
def get_act(self):
# epsilon-greedy
if numpy.random.uniform() < self.epsilon:
action = numpy.random.randint(0, self.action_n)
else:
action = numpy.argmax(self.q_valuesself.state)
self.action_prev = action
return action
def _set_q_value_random(self):
return numpy.random.rand(self.action_n)
def observe(self, state_next, reward):
# state_next:ndarray, reward:float
state_next = str(state_next)
# 初めて訪れる状態であればQ値をランダムに設定する
if state_next not in self.q_values:
self.q_valuesstate_next = self._set_q_value_random()
self.state_prev = self.state
self.state = state_next
self.learn(reward)
def learn(self, reward):
q = self.q_valuesself.state_prevself.action_prev
q_max = max(self.q_valuesself.state)
td = self.alpha *(reward + self.gamma*q_max - q)
self.q_valuesself.state_prevself.action_prev = q + td
def show_q_values(self):
print(len(self.q_values))
for state, q_value in self.q_values.items():
print(state, q_value)
#################################################################
def _get_datename():
dt = datetime.datetime.now()
result = str(dt.year)
result = result + str(dt.month).zfill(2)
result = result + str(dt.day).zfill(2) + '_'
result = result + str(dt.hour).zfill(2)
result = result + str(dt.minute).zfill(2)
result = result + str(dt.second).zfill(2)
return result
#################################################################
def get_outputs():
dirname = './result/' + _get_datename()
os.makedirs(dirname)
filename = '/result.txt'
fp = open(dirname + filename, mode='w')
return fp, dirname, filename
#################################################################
def show_result(args):
mapdata = args'MAPDATA'
episode_path = args'EPISODE_PATH'
episode_step = args'EPISODE_STEP'
episode_reward_sum = args'EPISODE_REWARD_SUM'
goal_pos = args'GOAL_POS'
start_pos = args'START_POS'
dirname = args'DIRNAME'
agent = args'AGENT'
fp1 = args'FP1'
dirname = args'DIRNAME'
#
# 全エピソードの経路
for path in episode_path:
plt.plot(path:,1, path:,0, '-', color='gray')
# 壁
for y in range(len(mapdata)):
for x in range(len(mapdata0)):
if mapdatayx == 1:
plt.plot(x, y, 's', color='black')
# 最短経路
path_shortest_index = (numpy.array(len(e) for e in episode_path)).argmin()
path_shortest = numpy.array(episode_pathpath_shortest_index)
plt.plot(path_shortest:,1, path_shortest:,0, 'x-', label='shortest_path', linewidth=2)
# スタートとゴール
plt.plot(start_pos1, start_pos0, 'o', color='blue', label='start')
plt.plot(goal_pos1, goal_pos0, 'o', color='red', label='goal')
plt.grid()
# plt.legend()
plt.savefig(dirname + '/fig1_path.svg')
plt.clf()
# エピソード - ステップ数
plt.plot(episode_step, label='step')
plt.grid()
plt.legend()
plt.savefig(dirname + '/fig2_step.svg')
plt.clf()
# エピソード - 報酬の総和
plt.plot(episode_reward_sum, label='reward_sum')
plt.grid()
plt.legend()
plt.savefig(dirname + '/fig3_reward.svg')
plt.clf()
# Qテーブル
fp_qtable = open(dirname + '/qtable.txt', mode='w')
for state, q_value in agent.q_values.items():
print(state, q_value, file=fp_qtable)
fp_qtable.close()