2023.6.26 Q学習による迷路探索
2023.6.27 修正
fig1で出力するグラフのタテヨコを入れ替えた。
マップを複雑にしてみた。
2023.6.27 バグ取り
Env.stepの戻り値に「移動できたか」のチェックを追加
main.pyのメインループにて、「移動できた」場合のみpathを追加することで、壁に阻まれて足踏みした場合を除外した。
初stepにおいて、start_posをpathに追加することで、スタートからの経路切れをなくした。
使い方
2つのファイルから構成されている。同じフォルダに格納し、main.pyを実行してください。
main.py
myQL.py
分かり易さのために、Q学習のメインルーチンと必要最小限の前/後処理のみmainに置いた。
全ての結果をresultフォルダ以下に保存する。特に、fig1_pathには
障害物を黒い四角
スタートを青丸
ゴールを赤丸
全てのパスを色付きの線
最短経路を青(線+X)
でプロットしている。最短経路については全PATHのlenを調べてその最小値を採用している。つまり最初に発見した最短経路を採用していることに注意。
ファイル名:main.py
code:python
import numpy, os, sys
import matplotlib.pyplot as plt
import myQL
fp1, dirname, filename = myQL.get_outputs()
args_env, args_agent, args_qlearn = myQL.get_args()
env = myQL.Env(args_env)
agent = myQL.Agent(args_agent)
# sys.exit()
print('args_env:', args_env)
print('args_env: ', args_env, file=fp1)
print('args_agent:', args_agent)
print('args_agent:', args_agent, file=fp1)
### Q学習の本体部分
is_moved, is_goal = False, False
episode_path = []
episode_step = []
episode_reward_sum = []
for episode in range(EPISODES):
env.reset()
reward_sum = 0
for step in range(STEPS):
action = agent.get_act() # 行動選択
state, reward, is_moved, is_goal = env.step(action) # 行動
if is_moved == True:
step_path.append(env.agent_pos)
reward_sum += reward
agent.observe(state, reward)
if is_goal:
break
is_end_episode = False
episode_path.append(numpy.array(step_path))
episode_step.append(step)
episode_reward_sum.append(reward_sum)
### 計算終了、あとは結果の出力
args_result = {}
myQL.show_result(args_result)
fp1.close()
ファイル名:myQL.py
code:python
import numpy, datetime
import matplotlib.pyplot as plt
import os, sys
def get_args():
# agent_pos_init, goal_pos, map はリスト型で与える。
# mapは予約語なのでmapdataとした
mapdata = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, ]
args_env = {}
args_env'FILEDS' = {'FLOOR':0, 'GOAL':2, 'WALL':1} args_env'ACTIONS' = {'NORTH':0, 'SOUTH':1, 'WEST':2, 'EAST':3} args_agent = {}
args_qlearn ={}
return args_env, args_agent, args_qlearn
#################################################################
class Env:
def __init__(self, args):
self.mapdata[self.goal_pos0][self.goal_pos1] = self.fields'GOAL' # Goalの設定 self.agent_pos = self.start_pos
for key, data in self.move.items():
self.movekey = numpy.array(data) # dataはlistで受けていると仮定 for i in self.mapdata:
for j in i:
print(j, end=' ')
print()
def step(self, action):
# agent_pos_to :移動先
agent_pos_to = self.agent_pos.copy() + self.moveaction # print('NASHI!!')
if self._is_possible_action(agent_pos_to) == False:
# 移動できなかった
is_moved, is_goal = False, False
return self.agent_pos, self.reward_wall, is_moved, is_goal
reward = self._get_reward(agent_pos_to) # 移動先の報酬を求める
self.agent_pos = agent_pos_to # 移動する
is_moved = True
is_goal = self._is_reach_goal() # 移動先はゴールかを判定
return self.agent_pos, reward, is_moved, is_goal
# posに移動できるかのチェック
def _is_possible_action(self, pos):
if self._is_inside_map(pos) == False:
return False # マップ外側のため移動「できない」
if self._is_wall(pos) == True:
return False # 壁のため移動「できない」
return True # 移動「できる」
# 指定した位置が壁ならTrue
def _is_wall(self, pos):
y, x = pos
# print(x, y)
return True
return False
# 指定した位置が地図内ならTrue
def _is_inside_map(self, pos):
if 0 <= pos0 < self.mapdata.shape0 and 0 <= pos1 < self.mapdata.shape1: return True
return False
# ゴールに到達したらTrue
def _is_reach_goal(self):
if tuple(self.agent_pos) == tuple(self.goal_pos):
return True
return False
# 報酬を計算する、ゴールなら1000、床なら0、壁にはめり込まない前提
def _get_reward(self, pos):
if self.mapdata[pos0][pos1] == self.fields'GOAL': # print('_get_reward: GOAL')
return self.reward_goal
if self.mapdata[pos0][pos1] == self.fields'WALL': print('_get_reward: WALL')
return 0 # つまり、Floorの場合
def reset(self):
self.agent_pos = self.start_pos
#################################################################
class Agent:
def __init__(self, args):
self.state_prev = None
self.action_prev = None
self.action_n = len(self.actions)
self.q_values = self._init_q_values()
def _init_q_values(self):
q_values = {}
def get_act(self):
# epsilon-greedy
if numpy.random.uniform() < self.epsilon:
action = numpy.random.randint(0, self.action_n)
else:
self.action_prev = action
return action
def _set_q_value_random(self):
return numpy.random.rand(self.action_n)
def observe(self, state_next, reward):
# state_next:ndarray, reward:float
state_next = str(state_next)
# 初めて訪れる状態であればQ値をランダムに設定する
if state_next not in self.q_values:
self.q_valuesstate_next = self._set_q_value_random() self.state_prev = self.state
self.state = state_next
self.learn(reward)
def learn(self, reward):
td = self.alpha *(reward + self.gamma*q_max - q)
def show_q_values(self):
print(len(self.q_values))
for state, q_value in self.q_values.items():
print(state, q_value)
#################################################################
def _get_datename():
dt = datetime.datetime.now()
result = str(dt.year)
result = result + str(dt.month).zfill(2)
result = result + str(dt.day).zfill(2) + '_'
result = result + str(dt.hour).zfill(2)
result = result + str(dt.minute).zfill(2)
result = result + str(dt.second).zfill(2)
return result
#################################################################
def get_outputs():
dirname = './result/' + _get_datename()
os.makedirs(dirname)
filename = '/result.txt'
fp = open(dirname + filename, mode='w')
return fp, dirname, filename
#################################################################
def show_result(args):
#
# 全エピソードの経路
for path in episode_path:
plt.plot(path:,1, path:,0, '-', color='gray') # 壁
for y in range(len(mapdata)):
for x in range(len(mapdata0)): plt.plot(x, y, 's', color='black')
# 最短経路
plt.plot(path_shortest:,1, path_shortest:,0, 'x-', label='shortest_path', linewidth=2) # スタートとゴール
plt.plot(start_pos1, start_pos0, 'o', color='blue', label='start') plt.plot(goal_pos1, goal_pos0, 'o', color='red', label='goal') plt.grid()
# plt.legend()
plt.savefig(dirname + '/fig1_path.svg')
plt.clf()
# エピソード - ステップ数
plt.plot(episode_step, label='step')
plt.grid()
plt.legend()
plt.savefig(dirname + '/fig2_step.svg')
plt.clf()
# エピソード - 報酬の総和
plt.plot(episode_reward_sum, label='reward_sum')
plt.grid()
plt.legend()
plt.savefig(dirname + '/fig3_reward.svg')
plt.clf()
# Qテーブル
fp_qtable = open(dirname + '/qtable.txt', mode='w')
for state, q_value in agent.q_values.items():
print(state, q_value, file=fp_qtable)
fp_qtable.close()