ros2/rosbag2

Stopping and Restarting a rosbag2 using snapshot feature through code

Nabil-Miri opened this issue · 2 comments

Goal: I want to develop a code that records rosbag and when ever a trigger happens, a snapshot service is called to save the data that is in the cache to a rosbag. The code also should be able to stop the recording and start it again whenever needed (for example, after 5 snapshot calls, save a rosbag and start recording again). I tried to use the Recorder() class from the rosbag2_py package but the code is stuck at self.recorder.record(storage_options, record_options) line and cant be do any other task. What would be a potential solution for developing such code?

This is a sample python code:

from rosbag2_snapshot.srv import Snapshot
import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
from rosbag2_py import Recorder, RecordOptions, StorageOptions

class BagRecorderNode(Node):
   def __init__(self):
       super().__init__('bag_recorder')
       self.snapshot_client = self.create_client(Snapshot, 'rosbag2_recorder/snapshot')
       self.snapshot_control_service = self.create_service(SetBool, 'rosbag_control', self.handle_rosbag_control_request)
       self.recorder = Recorder()
       self.recording = False
       self.snapshot_counter = 0
       self.start_recording('/')

   def handle_rosbag_control_request(self, request, response):
       self.get_logger().info('Snapshot control requested, calling snapshot service.')
       while not self.snapshot_client.wait_for_service(timeout_sec=1.0):
           self.get_logger().info('Snapshot service not available, waiting...')
       snapshot_request = Snapshot.Request()
       future = self.snapshot_client.call_async(snapshot_request)
       rclpy.spin_until_future_complete(self, future)
       if future.result() is not None:
           response.success = True
           self.get_logger().info('Snapshot service call successful')
           self.snapshot_counter += 1
           if self.snapshot_counter >= 5:
               self.stop_recording()
               self.get_logger().info('Stopped recording after 5 snapshots')
       else:
           response.success = False
           self.get_logger().info('Failed to call snapshot service')
       return response

   def start_recording(self, uri):
       storage_options = StorageOptions(uri=uri, snapshot_mode=True, max_cache_size=1024*1024*1, storage_id='mcap')
       record_options = RecordOptions()
       record_options.all = True
       record_options.is_discovery_disabled = False
       record_options.topics = ['/rosout']
       record_options.compression_format = 'zstd'
       record_options.compression_mode = 'file'
       self.recorder.record(storage_options, record_options)
       self.recording = True

   def stop_recording(self):
       if self.recording:
           self.recorder.cancel()
           self.recording = False
           self.snapshot_counter = 0
           self.start_recording('/')

def main(args=None):
   rclpy.init(args=args)
   node = BagRecorderNode()
   rclpy.spin(node)
   node.destroy_node()
   rclpy.shutdown()

if __name__ == '__main__':
   main()

System info:
Ubuntu 22.04 and ROS2 Humble

Hi I ran into the same problem. From the code I would say this is currently not possible:

But from the comment:

// Release the GIL for long-running record, so that calling Python code

I would guess that starting the recording in a python thread should work.

Nevertheless I decided to simply run the ros2 bag command via the Popen as the usage python recorded interface is currently not very well documented .

Yes I ended up doing the same using Popen and it worked. Not sure why there isn't another way.