Autonomous Racing  1
f1tenth Project Group of Technical University Dortmund, Germany
reinforcement_learning_node.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import math
4 import rospy
5 from sensor_msgs.msg import LaserScan
6 from drive_msgs.msg import drive_param
7 
8 import torch
9 
10 from topics import TOPIC_DRIVE_PARAMETERS, TOPIC_SCAN
11 
12 
14  ''' Abstract class for methods that are used both
15  during training and during driving.
16  '''
17 
18  def __init__(self, actions, laser_sample_count):
19  self.scan_indices = None
20  self.laser_sample_count = laser_sample_count
21  self.actions = actions
22  self.drive_parameters_publisher = rospy.Publisher(
23  TOPIC_DRIVE_PARAMETERS, drive_param, queue_size=1)
24  rospy.Subscriber(TOPIC_SCAN, LaserScan, self.on_receive_laser_scan)
25 
26  def perform_action(self, action_index):
27  if action_index < 0 or action_index >= len(self.actions):
28  raise Exception("Invalid action: " + str(action_index))
29 
30  angle, velocity = self.actions[action_index]
31  message = drive_param()
32  message.angle = angle
33  message.velocity = velocity
34  self.drive_parameters_publisher.publish(message)
35 
36  def convert_laser_message_to_tensor(self, message, use_device=True):
37  if self.scan_indices is None:
38  self.scan_indices = [int(i * (len(message.ranges) - 1) / (self.laser_sample_count - 1)) for i in range(self.laser_sample_count)] # nopep8
39 
40  values = [message.ranges[i] for i in self.scan_indices]
41  values = [v if not math.isinf(v) else 100 for v in values]
42 
43  return torch.tensor(
44  values,
45  device=device if use_device else None,
46  dtype=torch.float)
47 
48  def on_receive_laser_scan(self, message):
49  raise Exception("on_receive_laser_scan is not implemented.")
50 
51 
52 device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def convert_laser_message_to_tensor(self, message, use_device=True)