Differences
This shows you the differences between two versions of the page.
| documentation:robots:spot:spot_ros:spot_ros_example [2023-04-24 21:43] – created Ming See | documentation:robots:spot:spot_ros:spot_ros_example [2023-11-27 09:38] (current) – Marcus Klang | ||
|---|---|---|---|
| Line 3: | Line 3: | ||
| This was some example code used to traverse a GraphNav map and record fiducial positions along the way from the World Objects model of Spot. | This was some example code used to traverse a GraphNav map and record fiducial positions along the way from the World Objects model of Spot. | ||
| - | | + | <code python> |
| - | import typing | + | # |
| - | import pickle | + | import typing |
| - | import time | + | import pickle |
| - | import rospy | + | import time |
| - | import actionlib | + | import rospy |
| - | from actionlib_msgs.msg import GoalStatus | + | import actionlib |
| - | from rosservice import get_service_class_by_name | + | from actionlib_msgs.msg import GoalStatus |
| - | from std_msgs.msg import String | + | from rosservice import get_service_class_by_name |
| - | from geometry_msgs.msg import PoseStamped | + | from std_msgs.msg import String |
| - | from spot_msgs.msg import WorldObjectArray, | + | from geometry_msgs.msg import PoseStamped |
| - | from spot_msgs.msg import AprilTagProperties | + | from spot_msgs.msg import WorldObjectArray, |
| - | from spot_msgs.msg import FrameTreeSnapshot, | + | from spot_msgs.msg import AprilTagProperties |
| - | from spot_msgs.msg import NavigateInitAction, | + | from spot_msgs.msg import FrameTreeSnapshot, |
| - | from spot_msgs.msg import NavigateToAction, | + | from spot_msgs.msg import NavigateInitAction, |
| - | from spot_msgs.srv import ListGraphResponse | + | from spot_msgs.msg import NavigateToAction, |
| - | from std_srvs.srv import TriggerRequest | + | from spot_msgs.srv import ListGraphResponse |
| - | from fiducial import Fiducial | + | from std_srvs.srv import TriggerRequest |
| - | class SpotNav: | + | from fiducial import Fiducial |
| - | def __init__(self): | + | class SpotNav: |
| - | self.world_objects = None | + | def __init__(self): |
| - | self.fiducials_seen: | + | self.world_objects = None |
| - | self.waypoint_fiducial: | + | self.fiducials_seen: |
| - | def initialize_subscribers(self): | + | self.waypoint_fiducial: |
| - | """ | + | def initialize_subscribers(self): |
| - | # Create a subscriber for the / | + | """ |
| - | self.world_objects_sub = rospy.Subscriber( | + | # Create a subscriber for the / |
| - | "/ | + | self.world_objects_sub = rospy.Subscriber( |
| - | ) | + | "/ |
| - | def initialize_publishers(self): | + | ) |
| - | """ | + | def initialize_publishers(self): |
| - | self.reached_waypoint_pub = rospy.Publisher( | + | """ |
| - | "/ | + | self.reached_waypoint_pub = rospy.Publisher( |
| - | ) | + | "/ |
| - | def initialize_action_clients(self): | + | ) |
| - | """ | + | def initialize_action_clients(self): |
| - | # Create an action client for the / | + | """ |
| - | self.navigate_to_client = actionlib.SimpleActionClient( | + | # Create an action client for the / |
| - | "/ | + | self.navigate_to_client = actionlib.SimpleActionClient( |
| - | ) | + | "/ |
| - | # Create an action client for the / | + | ) |
| - | self.navigate_init_client = actionlib.SimpleActionClient( | + | # Create an action client for the / |
| - | "/ | + | self.navigate_init_client = actionlib.SimpleActionClient( |
| - | ) | + | "/ |
| - | def world_objects_callback(self, | + | ) |
| - | # Save the message to a class variable tracking the fiducials in the message | + | def world_objects_callback(self, |
| - | self.world_objects: | + | # Save the message to a class variable tracking the fiducials in the message |
| - | # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary | + | self.world_objects: |
| - | for world_object in self.world_objects: | + | # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary |
| - | april_tag: AprilTagProperties = world_object.apriltag_properties | + | for world_object in self.world_objects: |
| - | latest_snapshot: | + | april_tag: AprilTagProperties = world_object.apriltag_properties |
| - | # Check if april_tag is None | + | latest_snapshot: |
| - | if april_tag is None: | + | # Check if april_tag is None |
| - | continue | + | if april_tag is None: |
| - | # Create the FrameTreeSnapshot as a dictionary | + | continue |
| - | frame_tree_snapshot: | + | # Create the FrameTreeSnapshot as a dictionary |
| - | for child, parent_edge in zip( | + | frame_tree_snapshot: |
| - | latest_snapshot.child_edges, | + | for child, parent_edge in zip( |
| - | ): | + | latest_snapshot.child_edges, |
| - | parent_edge_transform = PoseStamped() | + | ): |
| - | parent_edge_transform.header.stamp = world_object.acquisition_time | + | parent_edge_transform = PoseStamped() |
| - | parent_edge_transform.header.frame_id = parent_edge.parent_frame_name | + | parent_edge_transform.header.stamp = world_object.acquisition_time |
| - | parent_edge_transform.pose = parent_edge.parent_tform_child | + | parent_edge_transform.header.frame_id = parent_edge.parent_frame_name |
| - | frame_tree_snapshot[child] = parent_edge_transform | + | parent_edge_transform.pose = parent_edge.parent_tform_child |
| - | april_tag_pose = frame_tree_snapshot[f" | + | frame_tree_snapshot[child] = parent_edge_transform |
| - | april_tag_pose_filtered = frame_tree_snapshot[ | + | april_tag_pose = frame_tree_snapshot[f" |
| - | f" | + | april_tag_pose_filtered = frame_tree_snapshot[ |
| - | ] | + | f" |
| - | # Build the april_tag into the Fiducial class | + | ] |
| - | fiducial = Fiducial( | + | # Build the april_tag into the Fiducial class |
| - | tag_id=april_tag.tag_id, | + | fiducial = Fiducial( |
| - | dim_x=april_tag.x, | + | tag_id=april_tag.tag_id, |
| - | dim_y=april_tag.y, | + | dim_x=april_tag.x, |
| - | fiducial_pose=april_tag_pose, | + | dim_y=april_tag.y, |
| - | filtered_fiducial_pose=april_tag_pose_filtered, | + | fiducial_pose=april_tag_pose, |
| - | pose_covariance=april_tag.detection_covariance, | + | filtered_fiducial_pose=april_tag_pose_filtered, |
| - | pose_covariance_frame=april_tag.detection_covariance_reference_frame, | + | pose_covariance=april_tag.detection_covariance, |
| - | ) | + | pose_covariance_frame=april_tag.detection_covariance_reference_frame, |
| - | # Save the fiducial to the class variable | + | ) |
| - | if fiducial.tag_id in self.fiducials_seen: | + | # Save the fiducial to the class variable |
| - | self.fiducials_seen[fiducial.tag_id].append(fiducial) | + | if fiducial.tag_id in self.fiducials_seen: |
| - | else: | + | self.fiducials_seen[fiducial.tag_id].append(fiducial) |
| - | self.fiducials_seen[fiducial.tag_id] = [fiducial] | + | else: |
| - | rospy.logdebug( | + | self.fiducials_seen[fiducial.tag_id] = [fiducial] |
| - | f" | + | rospy.logdebug( |
| - | ) | + | f" |
| - | def call_service(self, | + | ) |
| - | """ | + | def call_service(self, |
| - | try: | + | """ |
| - | rospy.wait_for_service(service_name) | + | try: |
| - | service_type = get_service_class_by_name(service_name) | + | rospy.wait_for_service(service_name) |
| - | proxy = rospy.ServiceProxy(service_name, | + | service_type = get_service_class_by_name(service_name) |
| - | return proxy(*args, | + | proxy = rospy.ServiceProxy(service_name, |
| - | except rospy.ServiceException as e: | + | return proxy(*args, |
| - | rospy.logerr(" | + | except rospy.ServiceException as e: |
| - | def walk_current_graph(self): | + | rospy.logerr(" |
| - | """ | + | def walk_current_graph(self): |
| - | rospy.loginfo(" | + | """ |
| - | # Call the ListGraph service | + | rospy.loginfo(" |
| - | list_graph: ListGraphResponse = self.call_service("/ | + | # Call the ListGraph service |
| - | waypoints = list_graph.waypoint_ids | + | list_graph: ListGraphResponse = self.call_service("/ |
| - | # Call the / | + | waypoints = list_graph.waypoint_ids |
| - | navigate_init_goal = NavigateInitGoal( | + | # Call the / |
| - | upload_path="", | + | navigate_init_goal = NavigateInitGoal( |
| - | initial_localization_fiducial=True, | + | upload_path="", |
| - | initial_localization_waypoint=" | + | initial_localization_fiducial=True, |
| - | ) | + | initial_localization_waypoint=" |
| - | self.navigate_init_client.send_goal(navigate_init_goal) | + | ) |
| - | self.navigate_init_client.wait_for_result() | + | self.navigate_init_client.send_goal(navigate_init_goal) |
| - | # Check if the action succeeded | + | self.navigate_init_client.wait_for_result() |
| - | if self.navigate_init_client.get_state() == GoalStatus.SUCCEEDED: | + | # Check if the action succeeded |
| - | rospy.loginfo(" | + | if self.navigate_init_client.get_state() == GoalStatus.SUCCEEDED: |
| - | for waypoint in waypoints: | + | rospy.loginfo(" |
| - | # Call the / | + | for waypoint in waypoints: |
| - | navigate_to_goal = NavigateToGoal(navigate_to=waypoint) | + | # Call the / |
| - | self.navigate_to_client.send_goal(navigate_to_goal) | + | navigate_to_goal = NavigateToGoal(navigate_to=waypoint) |
| - | self.navigate_to_client.wait_for_result() | + | self.navigate_to_client.send_goal(navigate_to_goal) |
| - | # Check if the action succeeded | + | self.navigate_to_client.wait_for_result() |
| - | if self.navigate_to_client.get_state() == GoalStatus.SUCCEEDED: | + | # Check if the action succeeded |
| - | rospy.loginfo(f" | + | if self.navigate_to_client.get_state() == GoalStatus.SUCCEEDED: |
| - | latest_world_objects_msg = rospy.wait_for_message( | + | rospy.loginfo(f" |
| - | "/ | + | latest_world_objects_msg = rospy.wait_for_message( |
| - | ) | + | "/ |
| - | latest_fiducial = self.extract_fiducials(latest_world_objects_msg) | + | ) |
| - | if waypoint in self.waypoint_fiducial: | + | latest_fiducial = self.extract_fiducials(latest_world_objects_msg) |
| - | self.waypoint_fiducial[waypoint].append(latest_fiducial) | + | if waypoint in self.waypoint_fiducial: |
| - | else: | + | self.waypoint_fiducial[waypoint].append(latest_fiducial) |
| - | self.waypoint_fiducial[waypoint] = [latest_fiducial] | + | else: |
| - | # Publish to the / | + | self.waypoint_fiducial[waypoint] = [latest_fiducial] |
| - | self.reached_waypoint_pub.publish(waypoint) | + | # Publish to the / |
| - | time.sleep(0.5) | + | self.reached_waypoint_pub.publish(waypoint) |
| - | def extract_fiducials(self, | + | time.sleep(0.5) |
| - | self.world_objects: | + | def extract_fiducials(self, |
| - | fiducial_list = [] | + | self.world_objects: |
| - | # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary | + | fiducial_list = [] |
| - | for world_object in self.world_objects: | + | # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary |
| - | april_tag: AprilTagProperties = world_object.apriltag_properties | + | for world_object in self.world_objects: |
| - | latest_snapshot: | + | april_tag: AprilTagProperties = world_object.apriltag_properties |
| - | # Check if april_tag is None | + | latest_snapshot: |
| - | if april_tag is None: | + | # Check if april_tag is None |
| - | continue | + | if april_tag is None: |
| - | # Create the FrameTreeSnapshot as a dictionary | + | continue |
| - | frame_tree_snapshot: | + | # Create the FrameTreeSnapshot as a dictionary |
| - | for child, parent_edge in zip( | + | frame_tree_snapshot: |
| - | latest_snapshot.child_edges, | + | for child, parent_edge in zip( |
| - | ): | + | latest_snapshot.child_edges, |
| - | parent_edge_transform = PoseStamped() | + | ): |
| - | parent_edge_transform.header.stamp = world_object.acquisition_time | + | parent_edge_transform = PoseStamped() |
| - | parent_edge_transform.header.frame_id = parent_edge.parent_frame_name | + | parent_edge_transform.header.stamp = world_object.acquisition_time |
| - | parent_edge_transform.pose = parent_edge.parent_tform_child | + | parent_edge_transform.header.frame_id = parent_edge.parent_frame_name |
| - | frame_tree_snapshot[child] = parent_edge_transform | + | parent_edge_transform.pose = parent_edge.parent_tform_child |
| - | # Use tf2 to get the april_tag pose in the body frame | + | frame_tree_snapshot[child] = parent_edge_transform |
| - | april_tag_pose = frame_tree_snapshot[f" | + | # Use tf2 to get the april_tag pose in the body frame |
| - | april_tag_pose_filtered = frame_tree_snapshot[ | + | april_tag_pose = frame_tree_snapshot[f" |
| - | f" | + | april_tag_pose_filtered = frame_tree_snapshot[ |
| - | ] | + | f" |
| - | # Build the april_tag into the Fiducial class | + | ] |
| - | fiducial = Fiducial( | + | # Build the april_tag into the Fiducial class |
| - | tag_id=april_tag.tag_id, | + | fiducial = Fiducial( |
| - | dim_x=april_tag.x, | + | tag_id=april_tag.tag_id, |
| - | dim_y=april_tag.y, | + | dim_x=april_tag.x, |
| - | fiducial_pose=april_tag_pose, | + | dim_y=april_tag.y, |
| - | filtered_fiducial_pose=april_tag_pose_filtered, | + | fiducial_pose=april_tag_pose, |
| - | pose_covariance=april_tag.detection_covariance, | + | filtered_fiducial_pose=april_tag_pose_filtered, |
| - | pose_covariance_frame=april_tag.detection_covariance_reference_frame, | + | pose_covariance=april_tag.detection_covariance, |
| - | ) | + | pose_covariance_frame=april_tag.detection_covariance_reference_frame, |
| - | # Append the fiducial to the list | + | ) |
| - | fiducial_list.append(fiducial) | + | # Append the fiducial to the list |
| - | return fiducial_list | + | fiducial_list.append(fiducial) |
| - | def startup(self): | + | return fiducial_list |
| - | rospy.loginfo(" | + | def startup(self): |
| - | # Call the / | + | rospy.loginfo(" |
| - | self.call_service("/ | + | # Call the / |
| - | self.call_service("/ | + | self.call_service("/ |
| - | def shutdown(self): | + | self.call_service("/ |
| - | rospy.loginfo(" | + | def shutdown(self): |
| - | # Save the fiducials to a pickle file | + | rospy.loginfo(" |
| - | with open(f" | + | # Save the fiducials to a pickle file |
| - | pickle.dump(self.fiducials_seen, | + | with open(f" |
| - | with open(f" | + | pickle.dump(self.fiducials_seen, |
| - | pickle.dump(self.waypoint_fiducial, | + | with open(f" |
| - | # Call the /spot/sit, / | + | pickle.dump(self.waypoint_fiducial, |
| - | self.call_service("/ | + | # Call the /spot/sit, / |
| - | time.sleep(5) | + | self.call_service("/ |
| - | self.call_service("/ | + | time.sleep(5) |
| - | self.call_service("/ | + | self.call_service("/ |
| - | def main(self): | + | self.call_service("/ |
| - | rospy.init_node(" | + | def main(self): |
| - | self.rates = rospy.get_param(" | + | rospy.init_node(" |
| - | if " | + | self.rates = rospy.get_param(" |
| - | loop_rate = self.rates[" | + | if " |
| - | else: | + | loop_rate = self.rates[" |
| - | loop_rate = 50 | + | else: |
| - | # Initialize the node | + | loop_rate = 50 |
| - | rate = rospy.Rate(loop_rate) | + | # Initialize the node |
| - | rospy.loginfo(" | + | rate = rospy.Rate(loop_rate) |
| - | self.initialize_subscribers() | + | rospy.loginfo(" |
| - | self.initialize_publishers() | + | self.initialize_subscribers() |
| - | self.initialize_action_clients() | + | self.initialize_publishers() |
| - | rospy.on_shutdown(self.shutdown) | + | self.initialize_action_clients() |
| - | # Walk the current graph | + | rospy.on_shutdown(self.shutdown) |
| - | self.startup() | + | # Walk the current graph |
| - | self.walk_current_graph() | + | self.startup() |
| - | while not rospy.is_shutdown(): | + | self.walk_current_graph() |
| - | rate.sleep() | + | while not rospy.is_shutdown(): |
| - | if __name__ == " | + | rate.sleep() |
| - | spot_nav = SpotNav() | + | if __name__ == " |
| - | spot_nav.main() | + | spot_nav = SpotNav() |
| + | spot_nav.main() | ||
| + | </ | ||