import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
class PolicyNetwork(nn.Module):
def __init__(self, num_inputs, num_actions):
super(PolicyNetwork, self).__init__()
self.fc1 = nn.Linear(num_inputs, 64)
self.fc2 = nn.Linear(64, num_actions)
def forward(self, x):
x = F.relu(self.fc1(x))
action_scores = self.fc2(x)
return action_scores
def get_action_probs(self, state):
state = torch.from_numpy(state).float().unsqueeze(0)
action_scores = self.forward(state)
action_probs = F.softmax(action_scores, dim=1).data.squeeze()
return action_probs
def get_action(self, state):
action_probs = self.get_action_probs(state)
dist = Categorical(action_probs)
action = dist.sample()
return action.item(), action_probs
class ValueNetwork(nn.Module):
def __init__(self, num_inputs):
super(ValueNetwork, self).__init__()
self.fc1 = nn.Linear(num_inputs, 64)
self.fc2 = nn.Linear(64, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
state_values = self.fc2(x)
return state_values
def get_value(self, state):
state = torch.from_numpy(state).float().unsqueeze(0)
state_value = self.forward(state).data.squeeze().item()
return state_value
def compute_advantages(batch_size, gamma, lambd, values, rewards, masks, next_values):
advantages = torch.zeros(batch_size)
adv_t = 0
for t in reversed(range(len(rewards))):
adv_t = rewards[t] + gamma * adv_t * masks[t]
advantages[t] = adv_t - values[t]
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
return advantages
def train_ppo(env, policy_net, value_net, optimizer_policy, optimizer_value, gamma=0.99, lambd=0.95, num_episodes=1000, batch_size=64):
for i_episode in range(num_episodes):
state = env.reset()
log_probs = []
values = []
rewards = []
masks = []
for t in range(100):
action, log_prob = policy_net.get_action_probs(state)
next_state, reward, done, _ = env.step(action)
log_probs.append(log_prob)
values.append(value
评论已关闭