Autonomous Racing  1
f1tenth Project Group of Technical University Dortmund, Germany
train.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import math
4 import rospy
5 from sensor_msgs.msg import LaserScan
6 import random
7 from std_msgs.msg import Empty
8 from gazebo_msgs.msg import ModelStates
9 import torch
10 from neural_car_driver import *
11 import simulation_tools.reset_car as reset_car
12 
13 MAX_EPISODE_LENGTH = 5000
14 INITIAL_RANDOM_POPULATION_SIZE = 250
15 CONTINUE_TRAINING = True
16 
17 
18 class TrainingNode():
19  def __init__(self):
20  self.scan_indices = None
21  rospy.Subscriber(TOPIC_SCAN, LaserScan, self.on_receive_laser_scan)
22  rospy.Subscriber(TOPIC_CRASH, Empty, self.on_crash)
23 
24  self.population = []
25 
26  self.is_terminal_step = False
27  self.episode_length = 0
28  self.generation = 0
29  self.test = 0
30 
32  if CONTINUE_TRAINING:
33  for i in range(POPULATION_SIZE):
34  driver = NeuralCarDriver()
35  driver.load(i)
36  self.untested_population.append(driver)
37  else:
38  self.untested_population = [
39  NeuralCarDriver() for _ in range(INITIAL_RANDOM_POPULATION_SIZE)]
40 
42 
43  def on_receive_laser_scan(self, message):
44  self.current_driver.drive(message)
45  self.episode_length += 1
46 
47  if self.is_terminal_step or self.episode_length > MAX_EPISODE_LENGTH:
48  self.on_complete_test()
49 
50  def get_fitness(self):
51  return self.episode_length * \
52  (self.current_driver.total_velocity / self.episode_length)
53 
54  def on_complete_test(self):
55  self.current_driver.fitness = self.get_fitness()
56 
57  self.population.append(self.current_driver)
58  self.untested_population.remove(self.current_driver)
59  self.episode_length = 0
60  self.is_terminal_step = False
61 
62  if len(self.untested_population) == 0:
63  self.current_driver = None
65  else:
66  self.current_driver = self.untested_population[0]
67  self.test += 1
68 
69  reset_car.reset_random(0, 0, self.generation % 2 == 0)
70 
72  self.population.sort(key=lambda driver: driver.fitness, reverse=True)
73 
74  for i in range(POPULATION_SIZE):
75  self.population[i].save(i)
76 
77  rospy.loginfo("Generation {:d}: Fitness of the population: {:s}".format(
78  self.generation + 1,
79  ", ".join(str(int(driver.fitness)) for driver in self.population)
80  ))
81  self.population = self.population[:SURVIVOR_COUNT]
82 
83  self.untested_population = list()
84  for _ in range(POPULATION_SIZE - SURVIVOR_COUNT):
85  parent = random.choice(self.population)
86  offspring = parent.mutate()
87  self.untested_population.append(offspring)
88 
89  self.current_driver = self.untested_population[0]
90  self.generation += 1
91 
92  def on_crash(self, _):
93  if self.episode_length > 5:
94  self.is_terminal_step = True
95 
96 
97 rospy.init_node('evolutionary_training', anonymous=True)
98 reset_car.register_service()
99 node = TrainingNode()
100 rospy.spin()
def on_complete_generation(self)
Definition: train.py:71
def on_receive_laser_scan(self, message)
Definition: train.py:43
def get_fitness(self)
Definition: train.py:50
def on_crash(self, _)
Definition: train.py:92
def __init__(self)
Definition: train.py:19
def on_complete_test(self)
Definition: train.py:54