12 from rospkg
import RosPack
13 from drive_msgs.msg
import drive_param
14 from sensor_msgs.msg
import LaserScan
17 TOPIC_DRIVE_PARAMETERS =
"/input/drive_param/autonomous" 19 TOPIC_CRASH =
"/crash" 20 TOPIC_GAZEBO_MODEL_STATE =
"/gazebo/model_states" 33 normal_distribution = torch.distributions.normal.Normal(0, LEARN_RATE)
35 MODEL_FILENAME = os.path.join(
36 RosPack().get_path(
"evolutionary"),
37 "evolutionary_{:d}.to")
39 drive_parameters_publisher = rospy.Publisher(
40 TOPIC_DRIVE_PARAMETERS, drive_param, queue_size=1)
45 super(NeuralCarDriver, self).
__init__()
48 nn.Linear(STATE_SIZE, 8),
62 self.
scan_indices = [int(i * (len(laser_message.ranges) - 1) / (STATE_SIZE - 1))
for i
in range(STATE_SIZE)]
64 values = [laser_message.ranges[i]
for i
in self.
scan_indices]
65 values = [v
if not math.isinf(v)
else 100
for v
in values]
66 state = torch.tensor(values, dtype=torch.float)
69 action = self.layers.forward(state)
71 message = drive_param()
72 if random.random() < RANDOM_ACTIONS:
73 message.angle = random.uniform(-1, 1)
74 message.velocity = random.uniform(0, 1)
76 message.angle = action[0].item()
77 message.velocity = (MIN_SPEED + MAX_SPEED) / 2 + \
78 action[1].item() * (MAX_SPEED - MIN_SPEED) / 2
82 drive_parameters_publisher.publish(message)
85 state_dict = self.layers.state_dict()
86 tensors = [state_dict[key]
for key
in sorted(state_dict.keys())]
87 tensors = [torch.flatten(tensor)
for tensor
in tensors]
88 vector = torch.cat(tensors)
93 state_dict = self.layers.state_dict()
96 for key
in sorted(state_dict.keys()):
97 old_value = state_dict[key]
98 size = np.prod(old_value.shape)
99 new_value = vector[position:position +
100 size].reshape(old_value.shape)
101 state_dict[key] = new_value
103 self.layers.load_state_dict(state_dict)
107 parameters += normal_distribution.sample(parameters.shape)
109 offspring.load_vector(parameters)
113 self.load_state_dict(torch.load(MODEL_FILENAME.format(index)))
114 rospy.loginfo(
"Model parameters loaded.")
117 torch.save(self.state_dict(), MODEL_FILENAME.format(index))
120 if __name__ ==
'__main__':
121 rospy.init_node(
'evolutionary_driver', anonymous=
True)
127 message =
"Model parameters for the neural net not found. You need to train it first." 128 rospy.logerr(message)
129 rospy.signal_shutdown(message)
132 rospy.Subscriber(TOPIC_SCAN, LaserScan, driver.drive)
def drive(self, laser_message)
def load_vector(self, vector)