Autonomous Racing  1
f1tenth Project Group of Technical University Dortmund, Germany
train_q_learning.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # coding: utf-8
3 
4 from training_node import TrainingNode, device
5 import random
6 import math
7 from collections import deque
8 from parameters_q_learning import *
9 
10 import torch
11 
12 import simulation_tools.reset_car as reset_car
13 from simulation_tools.track import track
14 BATCH_INDICES = torch.arange(0, BATCH_SIZE, device=device, dtype=torch.long)
15 
16 
18  ''' ROS node to train the Q-Learning model
19  '''
20 
21  def __init__(self):
22  TrainingNode.__init__(
23  self,
24  NeuralQEstimator().to(device),
25  ACTIONS,
26  LASER_SAMPLE_COUNT,
27  MAX_EPISODE_LENGTH,
28  LEARNING_RATE)
29 
30  self.memory = deque(maxlen=MEMORY_SIZE)
32 
33  if CONTINUE:
34  self.policy.load()
35 
36  def replay(self):
37  if len(self.memory) < 500 or len(self.memory) < BATCH_SIZE:
38  return
39 
40  if self.optimization_step_count == 0:
41  rospy.loginfo("Model optimization started.")
42 
43  transitions = random.sample(self.memory, BATCH_SIZE) # nopep8
44  states, actions, rewards, next_states, is_terminal = tuple(zip(*transitions)) # nopep8
45 
46  states = torch.stack(states)
47  actions = torch.tensor(actions, device=device, dtype=torch.long)
48  rewards = torch.tensor(rewards, device=device, dtype=torch.float)
49  next_states = torch.stack(next_states)
50  is_terminal = torch.tensor(
51  is_terminal, device=device, dtype=torch.uint8)
52 
53  next_state_values = self.policy.forward(next_states).max(1)[0].detach()
54  q_updates = rewards + next_state_values * DISCOUNT_FACTOR
55  q_updates[is_terminal] = rewards[is_terminal]
56 
57  self.optimizer.zero_grad()
58  net_output = self.policy.forward(states)
59  loss = F.smooth_l1_loss(net_output[BATCH_INDICES, actions], q_updates)
60  loss.backward()
61  for param in self.policy.parameters():
62  param.grad.data.clamp_(-1, 1)
63  self.optimizer.step()
64  self.optimization_step_count += 1
65 
67  return EPS_END + (EPS_START - EPS_END) * \
68  math.exp(-1. * self.total_step_count / EPS_DECAY)
69 
70  def select_action(self, state):
71  use_epsilon_greedy = self.episode_count % 2 == 0
72  if use_epsilon_greedy and random.random() < self.get_epsilon_greedy_threshold():
73  return random.randrange(ACTION_COUNT)
74 
75  with torch.no_grad():
76  output = self.policy(state)
77  if self.episode_length < 10:
78  self.net_output_debug_string = ", ".join(
79  ["{0:.1f}".format(v).rjust(5) for v in output.tolist()])
80  return output.max(0)[1].item()
81 
82  def get_reward(self):
83  track_position = track.localize(self.car_position)
84  distance = abs(track_position.distance_to_center)
85 
86  if distance < 0.2:
87  return 1
88  elif distance < 0.4:
89  return 0.7
90  else:
91  return 0.4
92 
94  return TrainingNode.get_episode_summary(self) + ' ' \
95  + ("memory: {0:d} / {1:d}, ".format(len(self.memory), MEMORY_SIZE) if len(self.memory) < MEMORY_SIZE else "") \
96  + "ε-greedy: " + str(int(self.get_epsilon_greedy_threshold() * 100)) + "% random, " \
97  + "replays: " + str(self.optimization_step_count) + ", " \
98  + "q: [" + self.net_output_debug_string + "], "
99 
100  def on_complete_step(self, state, action, reward, next_state):
101  self.memory.append((state, action, reward, next_state, self.is_terminal_step)) # nopep8
102  self.replay()
103 
104 
105 rospy.init_node('q_learning_training', anonymous=True)
107 rospy.spin()
def on_complete_step(self, state, action, reward, next_state)