Differences
This shows you the differences between two versions of the page.
| documentation:robots:spot:spot_ros:spot_ros_example [2023-04-24 21:43] – created Ming See | documentation:robots:spot:spot_ros:spot_ros_example [2023-11-27 09:38] (current) – Marcus Klang | ||
|---|---|---|---|
| Line 3: | Line 3: | ||
| This was some example code used to traverse a GraphNav map and record fiducial positions along the way from the World Objects model of Spot. | This was some example code used to traverse a GraphNav map and record fiducial positions along the way from the World Objects model of Spot. | ||
| - |    | + | <code python> | 
| - | import typing | + | # | 
| - | import pickle | + | import typing | 
| - | import time | + | import pickle | 
| - | import rospy | + | import time | 
| - | import actionlib | + | import rospy | 
| - | from actionlib_msgs.msg import GoalStatus | + | import actionlib | 
| - | from rosservice import get_service_class_by_name | + | from actionlib_msgs.msg import GoalStatus | 
| - | from std_msgs.msg import String | + | from rosservice import get_service_class_by_name | 
| - | from geometry_msgs.msg import PoseStamped | + | from std_msgs.msg import String | 
| - |   from spot_msgs.msg import WorldObjectArray, | + | from geometry_msgs.msg import PoseStamped | 
| - | from spot_msgs.msg import AprilTagProperties | + | from spot_msgs.msg import WorldObjectArray, | 
| - |   from spot_msgs.msg import FrameTreeSnapshot, | + | from spot_msgs.msg import AprilTagProperties | 
| - |   from spot_msgs.msg import NavigateInitAction, | + | from spot_msgs.msg import FrameTreeSnapshot, | 
| - |   from spot_msgs.msg import NavigateToAction, | + | from spot_msgs.msg import NavigateInitAction, | 
| - | from spot_msgs.srv import ListGraphResponse | + | from spot_msgs.msg import NavigateToAction, | 
| - | from std_srvs.srv import TriggerRequest | + | from spot_msgs.srv import ListGraphResponse | 
| - | from fiducial import Fiducial | + | from std_srvs.srv import TriggerRequest | 
| - | class SpotNav: | + | from fiducial import Fiducial | 
| - |       def __init__(self): | + | class SpotNav: | 
| - | self.world_objects = None | + |     def __init__(self): | 
| - |           self.fiducials_seen: | + | self.world_objects = None | 
| - |           self.waypoint_fiducial: | + |         self.fiducials_seen: | 
| - |       def initialize_subscribers(self): | + |         self.waypoint_fiducial: | 
| - |           """ | + |     def initialize_subscribers(self): | 
| - |           # Create a subscriber for the / | + |         """ | 
| - | self.world_objects_sub = rospy.Subscriber( | + |         # Create a subscriber for the / | 
| - |               "/ | + | self.world_objects_sub = rospy.Subscriber( | 
| - | ) | + |             "/ | 
| - |       def initialize_publishers(self): | + | ) | 
| - |           """ | + |     def initialize_publishers(self): | 
| - | self.reached_waypoint_pub = rospy.Publisher( | + |         """ | 
| - |               "/ | + | self.reached_waypoint_pub = rospy.Publisher( | 
| - | ) | + |             "/ | 
| - |       def initialize_action_clients(self): | + | ) | 
| - |           """ | + |     def initialize_action_clients(self): | 
| - |           # Create an action client for the / | + |         """ | 
| - | self.navigate_to_client = actionlib.SimpleActionClient( | + |         # Create an action client for the / | 
| - |               "/ | + | self.navigate_to_client = actionlib.SimpleActionClient( | 
| - | ) | + |             "/ | 
| - |           # Create an action client for the / | + | ) | 
| - | self.navigate_init_client = actionlib.SimpleActionClient( | + |         # Create an action client for the / | 
| - |               "/ | + | self.navigate_init_client = actionlib.SimpleActionClient( | 
| - | ) | + |             "/ | 
| - |       def world_objects_callback(self, | + | ) | 
| - | # Save the message to a class variable tracking the fiducials in the message | + |     def world_objects_callback(self, | 
| - |           self.world_objects: | + | # Save the message to a class variable tracking the fiducials in the message | 
| - | # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary | + |         self.world_objects: | 
| - |           for world_object in self.world_objects: | + | # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary | 
| - | april_tag: AprilTagProperties = world_object.apriltag_properties | + |         for world_object in self.world_objects: | 
| - |               latest_snapshot: | + | april_tag: AprilTagProperties = world_object.apriltag_properties | 
| - | # Check if april_tag is None | + |             latest_snapshot: | 
| - | if april_tag is None: | + | # Check if april_tag is None | 
| - | continue | + | if april_tag is None: | 
| - | # Create the FrameTreeSnapshot as a dictionary | + | continue | 
| - |               frame_tree_snapshot: | + | # Create the FrameTreeSnapshot as a dictionary | 
| - | for child, parent_edge in zip( | + |             frame_tree_snapshot: | 
| - |                   latest_snapshot.child_edges, | + | for child, parent_edge in zip( | 
| - | ): | + |                 latest_snapshot.child_edges, | 
| - | parent_edge_transform = PoseStamped() | + | ): | 
| - | parent_edge_transform.header.stamp = world_object.acquisition_time | + | parent_edge_transform = PoseStamped() | 
| - | parent_edge_transform.header.frame_id = parent_edge.parent_frame_name | + | parent_edge_transform.header.stamp = world_object.acquisition_time | 
| - | parent_edge_transform.pose = parent_edge.parent_tform_child | + | parent_edge_transform.header.frame_id = parent_edge.parent_frame_name | 
| - | frame_tree_snapshot[child] = parent_edge_transform | + | parent_edge_transform.pose = parent_edge.parent_tform_child | 
| - |               april_tag_pose = frame_tree_snapshot[f" | + | frame_tree_snapshot[child] = parent_edge_transform | 
| - | april_tag_pose_filtered = frame_tree_snapshot[ | + |             april_tag_pose = frame_tree_snapshot[f" | 
| - |                   f" | + | april_tag_pose_filtered = frame_tree_snapshot[ | 
| - | ] | + |                 f" | 
| - | # Build the april_tag into the Fiducial class | + | ] | 
| - | fiducial = Fiducial( | + | # Build the april_tag into the Fiducial class | 
| - |                   tag_id=april_tag.tag_id, | + | fiducial = Fiducial( | 
| - |                   dim_x=april_tag.x, | + |                 tag_id=april_tag.tag_id, | 
| - |                   dim_y=april_tag.y, | + |                 dim_x=april_tag.x, | 
| - |                   fiducial_pose=april_tag_pose, | + |                 dim_y=april_tag.y, | 
| - |                   filtered_fiducial_pose=april_tag_pose_filtered, | + |                 fiducial_pose=april_tag_pose, | 
| - |                   pose_covariance=april_tag.detection_covariance, | + |                 filtered_fiducial_pose=april_tag_pose_filtered, | 
| - |                   pose_covariance_frame=april_tag.detection_covariance_reference_frame, | + |                 pose_covariance=april_tag.detection_covariance, | 
| - | ) | + |                 pose_covariance_frame=april_tag.detection_covariance_reference_frame, | 
| - | # Save the fiducial to the class variable | + | ) | 
| - |               if fiducial.tag_id in self.fiducials_seen: | + | # Save the fiducial to the class variable | 
| - | self.fiducials_seen[fiducial.tag_id].append(fiducial) | + |             if fiducial.tag_id in self.fiducials_seen: | 
| - | else: | + | self.fiducials_seen[fiducial.tag_id].append(fiducial) | 
| - | self.fiducials_seen[fiducial.tag_id] = [fiducial] | + | else: | 
| - | rospy.logdebug( | + | self.fiducials_seen[fiducial.tag_id] = [fiducial] | 
| - |                   f" | + | rospy.logdebug( | 
| - | ) | + |                 f" | 
| - |       def call_service(self, | + | ) | 
| - |           """ | + |     def call_service(self, | 
| - | try: | + |         """ | 
| - | rospy.wait_for_service(service_name) | + | try: | 
| - | service_type = get_service_class_by_name(service_name) | + | rospy.wait_for_service(service_name) | 
| - |               proxy = rospy.ServiceProxy(service_name, | + | service_type = get_service_class_by_name(service_name) | 
| - |               return proxy(*args, | + |             proxy = rospy.ServiceProxy(service_name, | 
| - | except rospy.ServiceException as e: | + |             return proxy(*args, | 
| - |               rospy.logerr(" | + | except rospy.ServiceException as e: | 
| - |       def walk_current_graph(self): | + |             rospy.logerr(" | 
| - |           """ | + |     def walk_current_graph(self): | 
| - |           rospy.loginfo(" | + |         """ | 
| - | # Call the ListGraph service | + |         rospy.loginfo(" | 
| - |           list_graph: ListGraphResponse = self.call_service("/ | + | # Call the ListGraph service | 
| - | waypoints = list_graph.waypoint_ids | + |         list_graph: ListGraphResponse = self.call_service("/ | 
| - |           # Call the / | + | waypoints = list_graph.waypoint_ids | 
| - | navigate_init_goal = NavigateInitGoal( | + |         # Call the / | 
| - |               upload_path="", | + | navigate_init_goal = NavigateInitGoal( | 
| - |               initial_localization_fiducial=True, | + |             upload_path="", | 
| - |               initial_localization_waypoint=" | + |             initial_localization_fiducial=True, | 
| - | ) | + |             initial_localization_waypoint=" | 
| - | self.navigate_init_client.send_goal(navigate_init_goal) | + | ) | 
| - | self.navigate_init_client.wait_for_result() | + | self.navigate_init_client.send_goal(navigate_init_goal) | 
| - | # Check if the action succeeded | + | self.navigate_init_client.wait_for_result() | 
| - |           if self.navigate_init_client.get_state() == GoalStatus.SUCCEEDED: | + | # Check if the action succeeded | 
| - |               rospy.loginfo(" | + |         if self.navigate_init_client.get_state() == GoalStatus.SUCCEEDED: | 
| - |           for waypoint in waypoints: | + |             rospy.loginfo(" | 
| - |               # Call the / | + |         for waypoint in waypoints: | 
| - | navigate_to_goal = NavigateToGoal(navigate_to=waypoint) | + |             # Call the / | 
| - | self.navigate_to_client.send_goal(navigate_to_goal) | + | navigate_to_goal = NavigateToGoal(navigate_to=waypoint) | 
| - | self.navigate_to_client.wait_for_result() | + | self.navigate_to_client.send_goal(navigate_to_goal) | 
| - | # Check if the action succeeded | + | self.navigate_to_client.wait_for_result() | 
| - |               if self.navigate_to_client.get_state() == GoalStatus.SUCCEEDED: | + | # Check if the action succeeded | 
| - |                   rospy.loginfo(f" | + |             if self.navigate_to_client.get_state() == GoalStatus.SUCCEEDED: | 
| - | latest_world_objects_msg = rospy.wait_for_message( | + |                 rospy.loginfo(f" | 
| - |                       "/ | + | latest_world_objects_msg = rospy.wait_for_message( | 
| - | ) | + |                     "/ | 
| - | latest_fiducial = self.extract_fiducials(latest_world_objects_msg) | + | ) | 
| - |                   if waypoint in self.waypoint_fiducial: | + | latest_fiducial = self.extract_fiducials(latest_world_objects_msg) | 
| - | self.waypoint_fiducial[waypoint].append(latest_fiducial) | + |                 if waypoint in self.waypoint_fiducial: | 
| - | else: | + | self.waypoint_fiducial[waypoint].append(latest_fiducial) | 
| - | self.waypoint_fiducial[waypoint] = [latest_fiducial] | + | else: | 
| - |                   # Publish to the / | + | self.waypoint_fiducial[waypoint] = [latest_fiducial] | 
| - | self.reached_waypoint_pub.publish(waypoint) | + |                 # Publish to the / | 
| - | time.sleep(0.5) | + | self.reached_waypoint_pub.publish(waypoint) | 
| - |       def extract_fiducials(self, | + | time.sleep(0.5) | 
| - |           self.world_objects: | + |     def extract_fiducials(self, | 
| - | fiducial_list = [] | + |         self.world_objects: | 
| - | # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary | + | fiducial_list = [] | 
| - |           for world_object in self.world_objects: | + | # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary | 
| - | april_tag: AprilTagProperties = world_object.apriltag_properties | + |         for world_object in self.world_objects: | 
| - |               latest_snapshot: | + | april_tag: AprilTagProperties = world_object.apriltag_properties | 
| - | # Check if april_tag is None | + |             latest_snapshot: | 
| - | if april_tag is None: | + | # Check if april_tag is None | 
| - | continue | + | if april_tag is None: | 
| - | # Create the FrameTreeSnapshot as a dictionary | + | continue | 
| - |               frame_tree_snapshot: | + | # Create the FrameTreeSnapshot as a dictionary | 
| - | for child, parent_edge in zip( | + |             frame_tree_snapshot: | 
| - |                   latest_snapshot.child_edges, | + | for child, parent_edge in zip( | 
| - | ): | + |                 latest_snapshot.child_edges, | 
| - | parent_edge_transform = PoseStamped() | + | ): | 
| - | parent_edge_transform.header.stamp = world_object.acquisition_time | + | parent_edge_transform = PoseStamped() | 
| - | parent_edge_transform.header.frame_id = parent_edge.parent_frame_name | + | parent_edge_transform.header.stamp = world_object.acquisition_time | 
| - | parent_edge_transform.pose = parent_edge.parent_tform_child | + | parent_edge_transform.header.frame_id = parent_edge.parent_frame_name | 
| - | frame_tree_snapshot[child] = parent_edge_transform | + | parent_edge_transform.pose = parent_edge.parent_tform_child | 
| - | # Use tf2 to get the april_tag pose in the body frame | + | frame_tree_snapshot[child] = parent_edge_transform | 
| - |               april_tag_pose = frame_tree_snapshot[f" | + | # Use tf2 to get the april_tag pose in the body frame | 
| - | april_tag_pose_filtered = frame_tree_snapshot[ | + |             april_tag_pose = frame_tree_snapshot[f" | 
| - |                   f" | + | april_tag_pose_filtered = frame_tree_snapshot[ | 
| - | ] | + |                 f" | 
| - | # Build the april_tag into the Fiducial class | + | ] | 
| - | fiducial = Fiducial( | + | # Build the april_tag into the Fiducial class | 
| - |                   tag_id=april_tag.tag_id, | + | fiducial = Fiducial( | 
| - |                   dim_x=april_tag.x, | + |                 tag_id=april_tag.tag_id, | 
| - |                   dim_y=april_tag.y, | + |                 dim_x=april_tag.x, | 
| - |                   fiducial_pose=april_tag_pose, | + |                 dim_y=april_tag.y, | 
| - |                   filtered_fiducial_pose=april_tag_pose_filtered, | + |                 fiducial_pose=april_tag_pose, | 
| - |                   pose_covariance=april_tag.detection_covariance, | + |                 filtered_fiducial_pose=april_tag_pose_filtered, | 
| - |                   pose_covariance_frame=april_tag.detection_covariance_reference_frame, | + |                 pose_covariance=april_tag.detection_covariance, | 
| - | ) | + |                 pose_covariance_frame=april_tag.detection_covariance_reference_frame, | 
| - | # Append the fiducial to the list | + | ) | 
| - | fiducial_list.append(fiducial) | + | # Append the fiducial to the list | 
| - | return fiducial_list | + | fiducial_list.append(fiducial) | 
| - |       def startup(self): | + | return fiducial_list | 
| - |           rospy.loginfo(" | + |     def startup(self): | 
| - |           # Call the / | + |         rospy.loginfo(" | 
| - |           self.call_service("/ | + |         # Call the / | 
| - |           self.call_service("/ | + |         self.call_service("/ | 
| - |       def shutdown(self): | + |         self.call_service("/ | 
| - |           rospy.loginfo(" | + |     def shutdown(self): | 
| - | # Save the fiducials to a pickle file | + |         rospy.loginfo(" | 
| - |           with open(f" | + | # Save the fiducials to a pickle file | 
| - |               pickle.dump(self.fiducials_seen, | + |         with open(f" | 
| - |           with open(f" | + |             pickle.dump(self.fiducials_seen, | 
| - |               pickle.dump(self.waypoint_fiducial, | + |         with open(f" | 
| - |           # Call the /spot/sit, / | + |             pickle.dump(self.waypoint_fiducial, | 
| - |           self.call_service("/ | + |         # Call the /spot/sit, / | 
| - | time.sleep(5) | + |         self.call_service("/ | 
| - |           self.call_service("/ | + | time.sleep(5) | 
| - |           self.call_service("/ | + |         self.call_service("/ | 
| - |       def main(self): | + |         self.call_service("/ | 
| - |           rospy.init_node(" | + |     def main(self): | 
| - |           self.rates = rospy.get_param(" | + |         rospy.init_node(" | 
| - |           if " | + |         self.rates = rospy.get_param(" | 
| - |               loop_rate = self.rates[" | + |         if " | 
| - | else: | + |             loop_rate = self.rates[" | 
| - | loop_rate = 50 | + | else: | 
| - | # Initialize the node | + | loop_rate = 50 | 
| - | rate = rospy.Rate(loop_rate) | + | # Initialize the node | 
| - |           rospy.loginfo(" | + | rate = rospy.Rate(loop_rate) | 
| - | self.initialize_subscribers() | + |         rospy.loginfo(" | 
| - | self.initialize_publishers() | + | self.initialize_subscribers() | 
| - | self.initialize_action_clients() | + | self.initialize_publishers() | 
| - | rospy.on_shutdown(self.shutdown) | + | self.initialize_action_clients() | 
| - | # Walk the current graph | + | rospy.on_shutdown(self.shutdown) | 
| - | self.startup() | + | # Walk the current graph | 
| - | self.walk_current_graph() | + | self.startup() | 
| - |           while not rospy.is_shutdown(): | + | self.walk_current_graph() | 
| - | rate.sleep() | + |         while not rospy.is_shutdown(): | 
| - |   if __name__ == " | + | rate.sleep() | 
| - | spot_nav = SpotNav() | + | if __name__ == " | 
| - | spot_nav.main() | + | spot_nav = SpotNav() | 
| + | spot_nav.main() | ||
| + | </ | ||