Autonomous Racing  1
f1tenth Project Group of Technical University Dortmund, Germany
train_policy_gradient.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # coding: utf-8
3 
4 from training_node import TrainingNode
5 import random
6 import math
7 from parameters_policy_gradient import *
8 import numpy as np
9 
10 from torch.distributions import Categorical
11 
12 import torch
13 
14 import simulation_tools.reset_car as reset_car
15 from simulation_tools.track import track
16 
17 FLOAT_EPS = np.finfo(np.float32).eps
18 
19 
21  ''' ROS node to train the Policy Gradient model
22  '''
23 
24  def __init__(self):
25  TrainingNode.__init__(
26  self,
27  Policy(),
28  ACTIONS,
29  LASER_SAMPLE_COUNT,
30  MAX_EPISODE_LENGTH,
31  LEARNING_RATE)
32 
33  if CONTINUE:
34  self.policy.load()
35 
36  # Episode policy and reward history
37  self.policy_history = torch.zeros(MAX_EPISODE_LENGTH)
39 
40  def update_policy(self):
41  R = 0
42  rewards = []
43 
44  # Discount future rewards back to the present using gamma
45  for reward in self.current_episode_rewards[::-1]:
46  R = reward + DISCOUNT_FACTOR * R
47  rewards.insert(0, R)
48 
49  # Scale rewards
50  rewards = torch.tensor(rewards)
51  if rewards.numel() > 1:
52  rewards = (rewards - rewards.mean()) / (rewards.std() + FLOAT_EPS)
53 
54  # Calculate loss
55  loss = torch.sum(self.policy_history[:len(rewards)] * rewards) * -1
56 
57  # Update network weights
58  if loss != 0:
59  self.optimizer.zero_grad()
60  loss.backward()
61  self.optimizer.step()
62 
63  # Reset policy history and rewards
64  self.policy_history = torch.zeros(MAX_EPISODE_LENGTH)
65  self.current_episode_rewards = []
66 
67  def select_action(self, state):
68  # Select an action (0 or 1) by running policy model and choosing based
69  # on the probabilities in state
70  action_probabilities = self.policy(state.cpu())
71  action_distribution = Categorical(action_probabilities)
72  action = action_distribution.sample()
73 
74  # Add log probability of our chosen action to our history
75  self.policy_history[self.episode_length] = action_distribution.log_prob(action) # nopep8
76 
77  return action
78 
79  def get_reward(self):
80  # The faster the car, the more reward it gets
81  scaleForSpeed = 0
82  if self.action < 2:
83  scaleForSpeed = 0.4
84  elif self.action < 5:
85  scaleForSpeed = 0.7
86  else:
87  scaleForSpeed = 1
88 
89  track_position = track.localize(self.car_position)
90  distance = abs(track_position.distance_to_center)
91 
92  # The closer the car is to the middle of the race track, the more
93  # reward it gets
94  if distance < 0.2:
95  return 1 * scaleForSpeed
96  elif distance < 0.4:
97  return 0.7 * scaleForSpeed
98  else:
99  return -0.4 * scaleForSpeed
100 
102  self.update_policy()
103  TrainingNode.on_complete_episode(self)
104 
105  def on_complete_step(self, state, action, reward, next_state):
106  self.current_episode_rewards.append(reward)
107 
108 
109 rospy.init_node('policy_gradient_training', anonymous=True)
111 rospy.spin()
def on_complete_step(self, state, action, reward, next_state)