Q-learning解決懸崖問題
Q-learning是一個經典的強化學習算法,是一種基于價值(Value-based)的算法,通過維護和更新一個價值表格(Q表格)進行學習和預測。
Q-learning是一種off-policy的策略,也就是說,它的行動策略和Q表格的更新策略是不一樣的。
行動時,Q-learning會采用epsilon-greedy的方式嘗試多種可能動作。
更新時,Q-learning會采用潛在收益最大化的動作進行價值更新。
總體來說,Q-learning是一個非常勇敢的策略,在有限動作和有限狀態(tài)情況下,它能夠收斂找到全局最優(yōu)策略。
公眾號算法美食屋后臺回復關鍵詞:torchkeras,獲取本文notebook源代碼~
〇,強化學習基本概念
1, 環(huán)境(env)和智能體(agent)
在第n步,agent處于狀態(tài) state(n)中,然后采取行動action(n),env給agent獎勵reward(n+1),同時agent的狀態(tài)變成 state(n+1)
---reward(n+1), state(n+1)-->
env agent(state)
<------ action(n) ----------
以我們玩一個俄羅斯方塊游戲為例。
環(huán)境env就是這個游戲背后的程序,智能體agent就是玩家。
假設現在是第n步,state(n)就是目前游戲所處的狀態(tài),可以表示為一個矩陣,也就是游戲界面每個格子的明暗狀態(tài)。
我們可以采取某個 action(n) (向左,向右,向下,變形)。
然后我們會獲得一個獎勵reward(n),即得分。獎勵很多時候是稀疏的,即大部分時候為0,操作很多步才有一個不為0的獎勵。
同時游戲界面發(fā)生變化,狀態(tài)由 state(n) 變成 state(n+1)。
2, 馬爾科夫交互鏈
env和agent交互作用若干個步驟,到達結束狀態(tài),通常叫做一個episode(片段)。

在俄羅斯方塊游戲的例子中,一局完整的游戲構成一個馬爾科夫交互鏈,叫做一個episode.
之所以叫做馬爾科夫交互鏈,是因為這個過程滿足馬爾科夫假設。
第n+1步驟的狀態(tài)state(n+1)和獎勵reward(n+1)只和第n步驟的狀態(tài)stage(n)和action(n)有關,而與之前的狀態(tài)和action無關。
馬爾科夫假設要求我們在設計state和action的時候,要考慮到所有相關變量。
并且,只要設計出合理的state變量和action變量,任何游戲都可以表示為這樣一個馬爾科夫交互鏈。
3, 獎勵折現公式
為了衡量每個步驟中action的價值,需要將該步驟之后的獎勵,以及未來的全部獎勵按照類似金融學中的折現算法求和。
在俄羅斯方塊游戲的例子中,一個操作action的價值,不僅跟這個操作完成后立刻獲得的獎勵reward有關,還要考慮到這個操作的長遠影響。
但這種長遠影響不太好精確地計算,因為后面獲得的獎勵,不僅跟當前的action有關,還跟后面的操作有關,所以跟當前操作的相關性是小于1的。
作為簡化起見,我們通過類似金融學中現金流折現的方式將未來的獎勵全部折算到當前步驟。折算因子gamma一般取值在0.9~1.0之間。

4, epsilon-greedy 學習策略
訓練時使用epsilon探索,預測時使用greedy貪心。
訓練階段: 以一定epsilon概率選擇隨機動作,以(1-epsilon)選擇最大化Q(s,a)的動作。
預測階段: 貪心策略,直接選擇最大化Q(s,a)的動作。
為了讓模型去探索更優(yōu)策略,我們在訓練過程中會允許模型以一定的概率去實施隨機動作,以便評估不同動作的價值。
這樣也能夠讓模型對狀態(tài)動作空間進行更分散的采樣,學到的結果也會更加魯棒。
但在測試過程,為了獲得更好的結果,我們應該采用預期價值最大的動作。
5, Q表格軟更新

獎勵折現公式對每個action的價值的計算方法是一種粗糙的估計算法。
不同的step或者不同的episode中,按照獎勵折現公式對相同state下相同action價值的評估的結果可能差異很大。
為了保持學習過程的穩(wěn)定性,讓Q值不會過分受到某次評估的影響,我們采用一種軟更新的方式。
也就是我們在更新Q表格的時候,只讓Q值朝著折現公式計算結果靠近一點點(縮小差值),而不是直接調整為折現公式的計算結果。
這樣,我們最終的Q表格中action的價值結果相當是許多次不同episode不同step下獎勵折現公式計算結果的某種平均值。
一,準備環(huán)境
gym是一個常用的強化學習測試環(huán)境,可以用make創(chuàng)建環(huán)境。
env具有reset,step,render幾個方法。
-
懸崖問題
環(huán)境設計如下:
環(huán)境一共有48個state狀態(tài)。
其中T為目標位置,到達目標位置游戲結束。
10個用C表示的為懸崖,掉入懸崖會拉回到起始位置。
智能體設計如下:
智能體有4種動作action,0表示往上,1往右,2往下,3往左。
reward設計如下:
智能體每走一步都會有-1的reward。
這個問題希望訓練一個能夠盡可能快的從起始位置到達目標位置T的智能體Agent。
import gym
import numpy as np
import time
import matplotlib
import matplotlib.pyplot as plt
from IPython import display
print("gym.__version__=",gym.__version__)
%matplotlib inline
#可視化函數:
def show_state(env, step, info=""):
plt.figure(num=0,dpi=180)
plt.clf()
plt.imshow(env.render())
plt.title("Step: %d %s" % (step, info))
plt.axis('off')
display.clear_output(wait=True)
display.display(plt.gcf())
env = gym.make("CliffWalking-v0",render_mode="rgb_array") # 0 up, 1 right, 2 down, 3 left
env.reset()
for step in range(20):
time.sleep(0.2)
action = np.random.randint(0, 4)
obs, reward, done,truncated, info = env.step(action)
#env.render()
show_state(env,step=step)
#print('step {}: action {}, obs {}, reward {}, done {}, truncated {}, info {}'.format(\
# step, action, obs, reward, done, truncated,info))
display.clear_output(wait=True)
我們先來看看沒有訓練模型,按照隨機的方式會怎么走。

二,定義Agent
import torch
from torch import nn
class QAgent(nn.Module):
def __init__(self, obs_n, act_n, learning_rate=0.01, gamma=0.9, e_greed=0.1):
super().__init__()
self.act_n = act_n # 動作維度,有幾個動作可選
self.lr = learning_rate # 學習率
self.gamma = gamma # reward的衰減率
self.epsilon = e_greed # 按一定概率隨機選動作
self.Q = nn.Parameter(torch.zeros((obs_n, act_n)),requires_grad=False)
# 根據輸入觀察值,采樣輸出的動作值,帶探索
def sample(self, obs):
if np.random.uniform(0, 1) < (1.0 - self.epsilon): #根據table的Q值選動作
action = self.predict(obs)
else:
action = np.random.choice(self.act_n) #有一定概率隨機探索選取一個動作
return action
# 根據輸入觀察值,預測輸出的動作值
def forward(self,obs):
Q_list = self.Q[obs, :]
maxQ = Q_list.max()
action_list = torch.where(Q_list == maxQ)[0].tolist() # maxQ可能對應多個action
action = np.random.choice(action_list)
return action
@torch.no_grad()
def predict(self,obs):
self.eval()
return self.forward(obs)
# 學習方法,也就是更新Q-table的方法
def learn(self, obs, action, reward, next_obs, done):
""" on-policy
obs: 交互前的obs, s_t
action: 本次交互選擇的action, a_t
reward: 本次動作獲得的獎勵r
next_obs: 本次交互后的obs, s_t+1
next_action: 根據當前Q表格, 針對next_obs會選擇的動作, a_t+1
done: episode是否結束
"""
predict_Q = self.Q[obs, action]
if done:
target_Q = reward # 沒有下一個狀態(tài)了
else:
target_Q = reward + self.gamma * self.Q[next_obs, :].max() # Q-learning
self.Q[obs, action] += self.lr * (target_Q - predict_Q) # 修正q
我們創(chuàng)建一下env和agent.
# 使用gym創(chuàng)建懸崖環(huán)境
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left
# 創(chuàng)建一個agent實例,輸入超參數
agent = QAgent(
obs_n=env.observation_space.n,
act_n=env.action_space.n,
learning_rate=0.1,
gamma=0.9,
e_greed=0.1)
三,訓練Agent
下面我們將套用torchkeras的訓練模版來對Agent進行訓練。
由于強化學習問題與常用的監(jiān)督學習范式有很大的差異,所以我們對torchkeras的訓練模版在
StepRunner, EpochRunner這2個層級上都有少量的修改。
class DataLoader:
def __init__(self,env,agent,stage='train'):
self.env = env
self.agent = agent
self.stage = stage
def __iter__(self):
obs,info = self.env.reset() # 重置環(huán)境, 重新開一局(即開始新的一個episode)
action = self.agent.sample(obs) # 根據算法選擇一個動作
while True:
next_obs, reward, done, _, _ = self.env.step(action) # 與環(huán)境進行一個交互
if self.stage =='train':
next_action = self.agent.sample(next_obs) # 訓練階段使用探索-利用策略
else:
next_action = self.agent.predict(next_obs) # 驗證階段使用模型預測結果
yield obs, action, reward, next_obs, done
action = next_action
obs = next_obs
if done:
break
dl_train = DataLoader(env,agent,stage='train')
dl_train.size = 1000
dl_val = DataLoader(env,agent,stage='val')
dl_val.size = 200
import sys,datetime
from tqdm import tqdm
import numpy as np
from accelerate import Accelerator
from torchkeras import KerasModel
import pandas as pd
from torchkeras.utils import is_jupyter,colorful
from copy import deepcopy
class StepRunner:
def __init__(self, net, loss_fn, accelerator=None, stage = "train", metrics_dict = None,
optimizer = None, lr_scheduler = None
):
self.net,self.loss_fn,self.metrics_dict,self.stage = net,loss_fn,metrics_dict,stage
self.optimizer,self.lr_scheduler = optimizer,lr_scheduler
self.accelerator = accelerator if accelerator is not None else Accelerator()
def __call__(self, batch):
obs, action, reward, next_obs, done = batch
#backward()
if self.stage=="train":
self.net.learn(obs, action, reward, next_obs, done)
#losses (or plain metric)
step_losses = {self.stage+'_reward':reward,
self.stage+'_done':1.0 if done else 0.0}
#metrics (stateful metric)
step_metrics = {}
if self.stage=='train':
step_metrics['lr'] = self.net.lr
return step_losses,step_metrics
class EpochRunner:
def __init__(self,steprunner,quiet=False):
self.steprunner = steprunner
self.stage = steprunner.stage
self.accelerator = steprunner.accelerator
self.net = steprunner.net
self.quiet = quiet
def __call__(self,dataloader):
dataloader.agent = self.net
n = dataloader.size if hasattr(dataloader,'size') else len(dataloader)
loop = tqdm(enumerate(dataloader,start=1),
total=n,
file=sys.stdout,
disable=not self.accelerator.is_local_main_process or self.quiet,
ncols=100
)
epoch_losses = {}
for step, batch in loop:
step_losses,step_metrics = self.steprunner(batch)
step_log = dict(step_losses,**step_metrics)
for k,v in step_losses.items():
epoch_losses[k] = epoch_losses.get(k,0.0)+v
if step_log[self.stage+'_done']<1 and step<n:
loop.set_postfix(**step_log)
elif step_log[self.stage+'_done']>0.5 or step==n:
epoch_metrics = step_metrics
epoch_metrics.update({self.stage+"_"+name:metric_fn.compute().item()
for name,metric_fn in self.steprunner.metrics_dict.items()})
epoch_losses = {k:v for k,v in epoch_losses.items()}
epoch_log = dict(epoch_losses,**epoch_metrics)
epoch_log[self.stage+'_step']=step
loop.set_postfix(**epoch_log)
for name,metric_fn in self.steprunner.metrics_dict.items():
metric_fn.reset()
loop.close()
else:
break
return epoch_log
KerasModel.StepRunner = StepRunner
KerasModel.EpochRunner = EpochRunner
keras_model = KerasModel(net= agent,loss_fn=None)
dfhistory = keras_model.fit(train_data = dl_train,
val_data=dl_val,
epochs=600,
ckpt_path='checkpoint.pt',
patience=500,
monitor='val_reward',
mode='max',
callbacks=None,
quiet=True,
plot=True,
cpu=True)

dfhistory['val_reward'].max()
-13.0
keras_model.load_ckpt('checkpoint.pt')
agent = keras_model.net
四,測試Agent
def test_agent(env, agent):
total_reward = 0
obs,info = env.reset()
step=0
while True:
action = agent.predict(obs) # greedy
next_obs, reward, done, _ ,_ = env.step(action)
total_reward += reward
obs = next_obs
time.sleep(0.5)
show_state(env,step)
step+=1
if done:
break
plt.close()
return total_reward
# 全部訓練結束,查看算法效果
env = gym.make("CliffWalking-v0",render_mode="rgb_array") # 0 up, 1 right, 2 down, 3 left
test_reward = test_agent(env, agent)
print('test reward = %.1f' % (test_reward))

test reward = -13.0
可以看到,訓練完成后,這個agent非常機智地在懸崖邊上走了一個最優(yōu)路線,但卻沒有掉到懸崖里去。??
五,保存Agent
torch.save(keras_model.net.state_dict(),'best_ckpt.pt')
公眾號算法美食屋后臺回復關鍵詞:torchkeras,獲取本文notebook源代碼以及更多有趣范例。

