Stopping and Restarting a rosbag2 using snapshot feature through code
Nabil-Miri opened this issue · 2 comments
Goal: I want to develop a code that records rosbag and when ever a trigger happens, a snapshot service is called to save the data that is in the cache to a rosbag. The code also should be able to stop the recording and start it again whenever needed (for example, after 5 snapshot calls, save a rosbag and start recording again). I tried to use the Recorder() class from the rosbag2_py package but the code is stuck at self.recorder.record(storage_options, record_options)
line and cant be do any other task. What would be a potential solution for developing such code?
This is a sample python code:
from rosbag2_snapshot.srv import Snapshot
import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
from rosbag2_py import Recorder, RecordOptions, StorageOptions
class BagRecorderNode(Node):
def __init__(self):
super().__init__('bag_recorder')
self.snapshot_client = self.create_client(Snapshot, 'rosbag2_recorder/snapshot')
self.snapshot_control_service = self.create_service(SetBool, 'rosbag_control', self.handle_rosbag_control_request)
self.recorder = Recorder()
self.recording = False
self.snapshot_counter = 0
self.start_recording('/')
def handle_rosbag_control_request(self, request, response):
self.get_logger().info('Snapshot control requested, calling snapshot service.')
while not self.snapshot_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Snapshot service not available, waiting...')
snapshot_request = Snapshot.Request()
future = self.snapshot_client.call_async(snapshot_request)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
response.success = True
self.get_logger().info('Snapshot service call successful')
self.snapshot_counter += 1
if self.snapshot_counter >= 5:
self.stop_recording()
self.get_logger().info('Stopped recording after 5 snapshots')
else:
response.success = False
self.get_logger().info('Failed to call snapshot service')
return response
def start_recording(self, uri):
storage_options = StorageOptions(uri=uri, snapshot_mode=True, max_cache_size=1024*1024*1, storage_id='mcap')
record_options = RecordOptions()
record_options.all = True
record_options.is_discovery_disabled = False
record_options.topics = ['/rosout']
record_options.compression_format = 'zstd'
record_options.compression_mode = 'file'
self.recorder.record(storage_options, record_options)
self.recording = True
def stop_recording(self):
if self.recording:
self.recorder.cancel()
self.recording = False
self.snapshot_counter = 0
self.start_recording('/')
def main(args=None):
rclpy.init(args=args)
node = BagRecorderNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
System info:
Ubuntu 22.04 and ROS2 Humble
Hi I ran into the same problem. From the code I would say this is currently not possible:
But from the comment:
I would guess that starting the recording in a python thread should work.
Nevertheless I decided to simply run the ros2 bag
command via the Popen
as the usage python recorded interface is currently not very well documented .
Yes I ended up doing the same using Popen and it worked. Not sure why there isn't another way.