diff --git a/pylot/drivers/carla_camera_driver_operator.py b/pylot/drivers/carla_camera_driver_operator.py index 447451036..36ee1eaf4 100644 --- a/pylot/drivers/carla_camera_driver_operator.py +++ b/pylot/drivers/carla_camera_driver_operator.py @@ -56,8 +56,8 @@ def __init__(self, ground_vehicle_id_stream: erdos.ReadStream, self._vehicle = None # The camera sensor actor object we obtain from the simulator. self._camera = None - self._pickle_lock = threading.Lock() - self._pickled_messages = {} + self._message_lock = threading.Lock() + self._messages = {} # Lock to ensure that the callbacks do not execute simultaneously. self._lock = threading.Lock() # If false then the operator does not send data until it receives @@ -81,14 +81,13 @@ def release_data(self, timestamp): self._logger.debug("@{}: {} releasing sensor data".format( timestamp, self.config.name)) watermark_msg = erdos.WatermarkMessage(timestamp) - self._camera_stream.send_pickled(timestamp, - self._pickled_messages[timestamp]) + self._camera_stream.send(self._messages[timestamp]) # Note: The operator is set not to automatically propagate # watermark messages received on input streams. Thus, we can # issue watermarks only after the simulator callback is invoked. self._camera_stream.send(watermark_msg) - with self._pickle_lock: - del self._pickled_messages[timestamp] + with self._message_lock: + del self._messages[timestamp] def run(self): # Read the vehicle id from the vehicle id stream @@ -170,9 +169,6 @@ def process_images(self, simulator_image): self._camera_stream.send(msg) self._camera_stream.send(watermark_msg) else: - # Pickle the data, and release it upon release msg receipt. - pickled_msg = pickle.dumps( - msg, protocol=pickle.HIGHEST_PROTOCOL) - with self._pickle_lock: - self._pickled_messages[msg.timestamp] = pickled_msg + with self._message_lock: + self._messages[msg.timestamp] = msg self._notify_reading_stream.send(watermark_msg) diff --git a/pylot/drivers/carla_lidar_driver_operator.py b/pylot/drivers/carla_lidar_driver_operator.py index e7f9c92fa..377bec14c 100644 --- a/pylot/drivers/carla_lidar_driver_operator.py +++ b/pylot/drivers/carla_lidar_driver_operator.py @@ -50,8 +50,8 @@ def __init__(self, ground_vehicle_id_stream: erdos.ReadStream, self._vehicle = None # Handle to the Lidar simulator actor. self._lidar = None - self._pickle_lock = threading.Lock() - self._pickled_messages = {} + self._message_lock = threading.Lock() + self._messages = {} self._lock = threading.Lock() # If false then the operator does not send data until it receives # release data watermark. Otherwise, it sends as soon as it @@ -71,14 +71,13 @@ def release_data(self, timestamp): self._release_data = True else: watermark_msg = erdos.WatermarkMessage(timestamp) - self._lidar_stream.send_pickled(timestamp, - self._pickled_messages[timestamp]) + self._lidar_stream.send(self._messages[timestamp]) # Note: The operator is set not to automatically propagate # watermark messages received on input streams. Thus, we can # issue watermarks only after the simulator callback is invoked. self._lidar_stream.send(watermark_msg) - with self._pickle_lock: - del self._pickled_messages[timestamp] + with self._message_lock: + del self._messages[timestamp] def process_point_clouds(self, simulator_pc): """ Invoked when a point cloud is received from the simulator. @@ -105,11 +104,8 @@ def process_point_clouds(self, simulator_pc): self._lidar_stream.send(msg) self._lidar_stream.send(watermark_msg) else: - # Pickle the data, and release it upon release msg receipt. - pickled_msg = pickle.dumps( - msg, protocol=pickle.HIGHEST_PROTOCOL) - with self._pickle_lock: - self._pickled_messages[msg.timestamp] = pickled_msg + with self._message_lock: + self._messages[msg.timestamp] = msg self._notify_reading_stream.send(watermark_msg) def run(self):