Welcome to use the Python programming tutorial to learn the third parts of Carla automatic driving / self driving car. In this tutorial, we will use our knowledge of the Carla API and try to turn this problem into a reinforcement learning problem.
After OpenAI took the lead in developing the open source of reinforcement learning environment and solutions, we finally have a standardized method close to reinforcement learning environment.
The idea here is that your environment will have a step method that will return: observation, reward, done, extra_info, and a reset method that will restart the environment based on some done flag.
All we need to do is create code to represent this. We start with Carla's regular import:
import glob import os import sys try: sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % ( sys.version_info.major, sys.version_info.minor, 'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError: pass import carla
Next, we will create a class for the environment, which we will call CarEnv. It will be convenient to set some constants for the model, agent and environment at the top of our script, so the first thing I want to do is to create our first constant, which will display a preview:
SHOW_PREVIEW = False
Now let's set some initial values for our environment class:
class CarEnv: SHOW_CAM = SHOW_PREVIEW STEER_AMT = 1.0 im_width = IMG_WIDTH im_height = IMG_HEIGHT actor_list = [] front_camera = None collision_hist = []
I think these are quite self-evident, but SHOW_CAM is whether we want to display a preview. Viewing it may be useful for debugging purposes, but you don't necessarily want it to be displayed all the time, because performing all these operations can be resource intensive.
STEER_AMT is how much we want to apply to steering. Now it's a big turn. After that, we may find that the intensity of control is smaller, and maybe we can do some cumulative things instead of. Now, full speed!
Using collision_hist is because the collision sensor reports the history of the event. Basically, if there's anything on this list, we'll say we crashed. Now the init method. All you saw in the previous tutorial:
def __init__(self): self.client = carla.Client('localhost', 2000) self.client.set_timeout(2.0) # Once we have a client we can retrieve the world that is currently # running. self.world = self.client.get_world() # The world contains the list blueprints that we can use for adding new # actors into the simulation. blueprint_library = self.world.get_blueprint_library() # Now let's filter all the blueprints of type 'vehicle' and choose one # at random. #print(blueprint_library.filter('vehicle')) self.model_3 = blueprint_library.filter('model3')[0]
Next, we will create our reset method.
def reset(self): self.collision_hist = [] self.actor_list = [] self.transform = random.choice(self.world.get_map().get_spawn_points()) self.vehicle = self.world.spawn_actor(self.model_3, self.transform) self.actor_list.append(self.vehicle) self.rgb_cam = self.world.get_blueprint_library().find('sensor.camera.rgb') self.rgb_cam.set_attribute('image_size_x', f'{self.im_width}') self.rgb_cam.set_attribute('image_size_y', f'{self.im_height}') self.rgb_cam.set_attribute('fov', '110') transform = carla.Transform(carla.Location(x=2.5, z=0.7)) self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle) self.actor_list.append(self.sensor) self.sensor.listen(lambda data: self.process_img(data)) self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0)) # initially passing some commands seems to help with time. Not sure why.
Everything here is what you have seen before. There is nothing new, just OOP. Next, let's add a collision sensor to this method:
time.sleep(4) # sleep to get things started and to not detect a collision when the car spawns/falls from sky. colsensor = self.world.get_blueprint_library().find('sensor.other.collision') self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle) self.actor_list.append(self.colsensor) self.colsensor.listen(lambda event: self.collision_data(event))
We slept here for four seconds because the car really "fell" into the simulator. Usually, when a car hits the ground, we receive a collision record. Moreover, initially, these sensors will take some time to initialize and return values, so we will only use a safe and reliable four seconds. In case it takes longer, we can do this:
while self.front_camera is None: time.sleep(0.01)
We can't do this, however, because we need to make sure the car is finished falling from the sky. Finally, let's record the actual start-up time of this episode, ensure that the brake and accelerator are not used, and return to our first observation:
self.episode_start = time.time() self.vehicle.apply_control(carla.VehicleControl(brake=0.0, throttle=0.0)) return self.front_camera
Now, let's add collision_data and process_img method:
def collision_data(self, event): self.collision_hist.append(event) def process_img(self, image): i = np.array(image.raw_data) #np.save("iout.npy", i) i2 = i.reshape((self.im_height, self.im_width, 4)) i3 = i2[:, :, :3] if self.SHOW_CAM: cv2.imshow("",i3) cv2.waitKey(1) self.front_camera = i3
Now we need to use footwork. This method takes an action and then returns to observation, reward, completion, any additional information, just like the usual reinforcement learning paradigm. Start:
def step(self, action): ''' For now let's just pass steer left, center, right? 0, 1, 2 ''' if action == 0: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=0)) if action == 1: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT)) if action == 2: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))
The above shows how we act according to the numerical actions passed to us. Now we only need to deal with observations, possible collisions and rewards:
v = self.vehicle.get_velocity() kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2)) if len(self.collision_hist) != 0: done = True reward = -200 elif kmh < 50: done = False reward = -1 else: done = False reward = 1 if self.episode_start + SECONDS_PER_EPISODE < time.time(): done = True return self.front_camera, reward, done, None
We are grabbing the speed of the vehicle and changing from speed to speed per kilometer per hour. I did this to prevent agents from learning to drive in a small circle. If we need a certain speed to get rewards, this should inhibit this behavior.
Next, we check to see if we've run out of episode time, and then return to everything. In this way, we and our environment are over!
Complete code:
import glob import os import sys try: sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % ( sys.version_info.major, sys.version_info.minor, 'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError: pass import carla SHOW_PREVIEW = False class CarEnv: SHOW_CAM = SHOW_PREVIEW STEER_AMT = 1.0 im_width = IMG_WIDTH im_height = IMG_HEIGHT actor_list = [] front_camera = None collision_hist = [] def __init__(self): self.client = carla.Client('localhost', 2000) self.client.set_timeout(2.0) # Once we have a client we can retrieve the world that is currently # running. self.world = self.client.get_world() # The world contains the list blueprints that we can use for adding new # actors into the simulation. blueprint_library = self.world.get_blueprint_library() # Now let's filter all the blueprints of type 'vehicle' and choose one # at random. #print(blueprint_library.filter('vehicle')) self.model_3 = blueprint_library.filter('model3')[0] def reset(self): self.collision_hist = [] self.actor_list = [] self.transform = random.choice(self.world.get_map().get_spawn_points()) self.vehicle = self.world.spawn_actor(self.model_3, self.transform) self.actor_list.append(self.vehicle) self.rgb_cam = self.world.get_blueprint_library().find('sensor.camera.rgb') self.rgb_cam.set_attribute('image_size_x', f'{self.im_width}') self.rgb_cam.set_attribute('image_size_y', f'{self.im_height}') self.rgb_cam.set_attribute('fov', '110') transform = carla.Transform(carla.Location(x=2.5, z=0.7)) self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle) self.actor_list.append(self.sensor) self.sensor.listen(lambda data: self.process_img(data)) self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0)) time.sleep(4) # sleep to get things started and to not detect a collision when the car spawns/falls from sky. colsensor = self.world.get_blueprint_library().find('sensor.other.collision') self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle) self.actor_list.append(self.colsensor) self.colsensor.listen(lambda event: self.collision_data(event)) while self.front_camera is None: time.sleep(0.01) self.episode_start = time.time() self.vehicle.apply_control(carla.VehicleControl(brake=0.0, throttle=0.0)) return self.front_camera def collision_data(self, event): self.collision_hist.append(event) def process_img(self, image): i = np.array(image.raw_data) #np.save("iout.npy", i) i2 = i.reshape((self.im_height, self.im_width, 4)) i3 = i2[:, :, :3] if self.SHOW_CAM: cv2.imshow("",i3) cv2.waitKey(1) self.front_camera = i3 def step(self, action): ''' For now let's just pass steer left, center, right? 0, 1, 2 ''' if action == 0: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=0)) if action == 1: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT)) if action == 2: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT)) v = self.vehicle.get_velocity() kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2)) if len(self.collision_hist) != 0: done = True reward = -200 elif kmh < 50: done = False reward = -1 else: done = False reward = 1 if self.episode_start + SECONDS_PER_EPISODE < time.time(): done = True return self.front_camera, reward, done, None
In the next tutorial, we will write our Agent class, which will interact with this environment and accommodate our actual reinforcement learning model.