Autonomous Racing  1
f1tenth Project Group of Technical University Dortmund, Germany
neural_car_driver.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import torch
4 import torch.nn as nn
5 import numpy as np
6 import math
7 import os
8 import random
9 
10 import rospy
11 
12 from rospkg import RosPack
13 from drive_msgs.msg import drive_param
14 from sensor_msgs.msg import LaserScan
15 
16 # TOPICS
17 TOPIC_DRIVE_PARAMETERS = "/input/drive_param/autonomous"
18 TOPIC_SCAN = "/scan"
19 TOPIC_CRASH = "/crash"
20 TOPIC_GAZEBO_MODEL_STATE = "/gazebo/model_states"
21 
22 STATE_SIZE = 8
23 
24 MIN_SPEED = 0.1
25 MAX_SPEED = 0.4
26 
27 POPULATION_SIZE = 10
28 SURVIVOR_COUNT = 4
29 
30 RANDOM_ACTIONS = 0.05
31 
32 LEARN_RATE = 0.1
33 normal_distribution = torch.distributions.normal.Normal(0, LEARN_RATE)
34 
35 MODEL_FILENAME = os.path.join(
36  RosPack().get_path("evolutionary"),
37  "evolutionary_{:d}.to")
38 
39 drive_parameters_publisher = rospy.Publisher(
40  TOPIC_DRIVE_PARAMETERS, drive_param, queue_size=1)
41 
42 
43 class NeuralCarDriver(nn.Module):
44  def __init__(self):
45  super(NeuralCarDriver, self).__init__()
46 
47  self.layers = nn.Sequential(
48  nn.Linear(STATE_SIZE, 8),
49  nn.ReLU(),
50  nn.Linear(8, 8),
51  nn.ReLU(),
52  nn.Linear(8, 2),
53  nn.Tanh()
54  )
55 
56  self.fitness = None
57  self.scan_indices = None
58  self.total_velocity = 0
59 
60  def drive(self, laser_message):
61  if self.scan_indices is None:
62  self.scan_indices = [int(i * (len(laser_message.ranges) - 1) / (STATE_SIZE - 1)) for i in range(STATE_SIZE)] # nopep8
63 
64  values = [laser_message.ranges[i] for i in self.scan_indices]
65  values = [v if not math.isinf(v) else 100 for v in values]
66  state = torch.tensor(values, dtype=torch.float)
67 
68  with torch.no_grad():
69  action = self.layers.forward(state)
70 
71  message = drive_param()
72  if random.random() < RANDOM_ACTIONS:
73  message.angle = random.uniform(-1, 1)
74  message.velocity = random.uniform(0, 1)
75  else:
76  message.angle = action[0].item()
77  message.velocity = (MIN_SPEED + MAX_SPEED) / 2 + \
78  action[1].item() * (MAX_SPEED - MIN_SPEED) / 2
79 
80  self.total_velocity += message.velocity
81 
82  drive_parameters_publisher.publish(message)
83 
84  def to_vector(self):
85  state_dict = self.layers.state_dict()
86  tensors = [state_dict[key] for key in sorted(state_dict.keys())]
87  tensors = [torch.flatten(tensor) for tensor in tensors]
88  vector = torch.cat(tensors)
89 
90  return vector
91 
92  def load_vector(self, vector):
93  state_dict = self.layers.state_dict()
94 
95  position = 0
96  for key in sorted(state_dict.keys()):
97  old_value = state_dict[key]
98  size = np.prod(old_value.shape)
99  new_value = vector[position:position +
100  size].reshape(old_value.shape)
101  state_dict[key] = new_value
102  position += size
103  self.layers.load_state_dict(state_dict)
104 
105  def mutate(self):
106  parameters = self.to_vector()
107  parameters += normal_distribution.sample(parameters.shape)
108  offspring = NeuralCarDriver()
109  offspring.load_vector(parameters)
110  return offspring
111 
112  def load(self, index):
113  self.load_state_dict(torch.load(MODEL_FILENAME.format(index)))
114  rospy.loginfo("Model parameters loaded.")
115 
116  def save(self, index):
117  torch.save(self.state_dict(), MODEL_FILENAME.format(index))
118 
119 
120 if __name__ == '__main__':
121  rospy.init_node('evolutionary_driver', anonymous=True)
122  driver = NeuralCarDriver()
123 
124  try:
125  driver.load(0)
126  except IOError:
127  message = "Model parameters for the neural net not found. You need to train it first."
128  rospy.logerr(message)
129  rospy.signal_shutdown(message)
130  exit(1)
131 
132  rospy.Subscriber(TOPIC_SCAN, LaserScan, driver.drive)
133  rospy.spin()
def drive(self, laser_message)