Differences
This shows you the differences between two versions of the page.
documentation:robots:spot:spot_ros:spot_ros_example [2023/04/24 19:43] – created mings | documentation:robots:spot:spot_ros:spot_ros_example [2023/11/27 08:38] (current) – marcusk | ||
---|---|---|---|
Line 3: | Line 3: | ||
This was some example code used to traverse a GraphNav map and record fiducial positions along the way from the World Objects model of Spot. | This was some example code used to traverse a GraphNav map and record fiducial positions along the way from the World Objects model of Spot. | ||
- | | + | <code python> |
- | import typing | + | # |
- | import pickle | + | import typing |
- | import time | + | import pickle |
- | import rospy | + | import time |
- | import actionlib | + | import rospy |
- | from actionlib_msgs.msg import GoalStatus | + | import actionlib |
- | from rosservice import get_service_class_by_name | + | from actionlib_msgs.msg import GoalStatus |
- | from std_msgs.msg import String | + | from rosservice import get_service_class_by_name |
- | from geometry_msgs.msg import PoseStamped | + | from std_msgs.msg import String |
- | from spot_msgs.msg import WorldObjectArray, | + | from geometry_msgs.msg import PoseStamped |
- | from spot_msgs.msg import AprilTagProperties | + | from spot_msgs.msg import WorldObjectArray, |
- | from spot_msgs.msg import FrameTreeSnapshot, | + | from spot_msgs.msg import AprilTagProperties |
- | from spot_msgs.msg import NavigateInitAction, | + | from spot_msgs.msg import FrameTreeSnapshot, |
- | from spot_msgs.msg import NavigateToAction, | + | from spot_msgs.msg import NavigateInitAction, |
- | from spot_msgs.srv import ListGraphResponse | + | from spot_msgs.msg import NavigateToAction, |
- | from std_srvs.srv import TriggerRequest | + | from spot_msgs.srv import ListGraphResponse |
- | from fiducial import Fiducial | + | from std_srvs.srv import TriggerRequest |
- | class SpotNav: | + | from fiducial import Fiducial |
- | def __init__(self): | + | class SpotNav: |
- | self.world_objects = None | + | def __init__(self): |
- | self.fiducials_seen: | + | self.world_objects = None |
- | self.waypoint_fiducial: | + | self.fiducials_seen: |
- | def initialize_subscribers(self): | + | self.waypoint_fiducial: |
- | """ | + | def initialize_subscribers(self): |
- | # Create a subscriber for the / | + | """ |
- | self.world_objects_sub = rospy.Subscriber( | + | # Create a subscriber for the / |
- | "/ | + | self.world_objects_sub = rospy.Subscriber( |
- | ) | + | "/ |
- | def initialize_publishers(self): | + | ) |
- | """ | + | def initialize_publishers(self): |
- | self.reached_waypoint_pub = rospy.Publisher( | + | """ |
- | "/ | + | self.reached_waypoint_pub = rospy.Publisher( |
- | ) | + | "/ |
- | def initialize_action_clients(self): | + | ) |
- | """ | + | def initialize_action_clients(self): |
- | # Create an action client for the / | + | """ |
- | self.navigate_to_client = actionlib.SimpleActionClient( | + | # Create an action client for the / |
- | "/ | + | self.navigate_to_client = actionlib.SimpleActionClient( |
- | ) | + | "/ |
- | # Create an action client for the / | + | ) |
- | self.navigate_init_client = actionlib.SimpleActionClient( | + | # Create an action client for the / |
- | "/ | + | self.navigate_init_client = actionlib.SimpleActionClient( |
- | ) | + | "/ |
- | def world_objects_callback(self, | + | ) |
- | # Save the message to a class variable tracking the fiducials in the message | + | def world_objects_callback(self, |
- | self.world_objects: | + | # Save the message to a class variable tracking the fiducials in the message |
- | # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary | + | self.world_objects: |
- | for world_object in self.world_objects: | + | # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary |
- | april_tag: AprilTagProperties = world_object.apriltag_properties | + | for world_object in self.world_objects: |
- | latest_snapshot: | + | april_tag: AprilTagProperties = world_object.apriltag_properties |
- | # Check if april_tag is None | + | latest_snapshot: |
- | if april_tag is None: | + | # Check if april_tag is None |
- | continue | + | if april_tag is None: |
- | # Create the FrameTreeSnapshot as a dictionary | + | continue |
- | frame_tree_snapshot: | + | # Create the FrameTreeSnapshot as a dictionary |
- | for child, parent_edge in zip( | + | frame_tree_snapshot: |
- | latest_snapshot.child_edges, | + | for child, parent_edge in zip( |
- | ): | + | latest_snapshot.child_edges, |
- | parent_edge_transform = PoseStamped() | + | ): |
- | parent_edge_transform.header.stamp = world_object.acquisition_time | + | parent_edge_transform = PoseStamped() |
- | parent_edge_transform.header.frame_id = parent_edge.parent_frame_name | + | parent_edge_transform.header.stamp = world_object.acquisition_time |
- | parent_edge_transform.pose = parent_edge.parent_tform_child | + | parent_edge_transform.header.frame_id = parent_edge.parent_frame_name |
- | frame_tree_snapshot[child] = parent_edge_transform | + | parent_edge_transform.pose = parent_edge.parent_tform_child |
- | april_tag_pose = frame_tree_snapshot[f" | + | frame_tree_snapshot[child] = parent_edge_transform |
- | april_tag_pose_filtered = frame_tree_snapshot[ | + | april_tag_pose = frame_tree_snapshot[f" |
- | f" | + | april_tag_pose_filtered = frame_tree_snapshot[ |
- | ] | + | f" |
- | # Build the april_tag into the Fiducial class | + | ] |
- | fiducial = Fiducial( | + | # Build the april_tag into the Fiducial class |
- | tag_id=april_tag.tag_id, | + | fiducial = Fiducial( |
- | dim_x=april_tag.x, | + | tag_id=april_tag.tag_id, |
- | dim_y=april_tag.y, | + | dim_x=april_tag.x, |
- | fiducial_pose=april_tag_pose, | + | dim_y=april_tag.y, |
- | filtered_fiducial_pose=april_tag_pose_filtered, | + | fiducial_pose=april_tag_pose, |
- | pose_covariance=april_tag.detection_covariance, | + | filtered_fiducial_pose=april_tag_pose_filtered, |
- | pose_covariance_frame=april_tag.detection_covariance_reference_frame, | + | pose_covariance=april_tag.detection_covariance, |
- | ) | + | pose_covariance_frame=april_tag.detection_covariance_reference_frame, |
- | # Save the fiducial to the class variable | + | ) |
- | if fiducial.tag_id in self.fiducials_seen: | + | # Save the fiducial to the class variable |
- | self.fiducials_seen[fiducial.tag_id].append(fiducial) | + | if fiducial.tag_id in self.fiducials_seen: |
- | else: | + | self.fiducials_seen[fiducial.tag_id].append(fiducial) |
- | self.fiducials_seen[fiducial.tag_id] = [fiducial] | + | else: |
- | rospy.logdebug( | + | self.fiducials_seen[fiducial.tag_id] = [fiducial] |
- | f" | + | rospy.logdebug( |
- | ) | + | f" |
- | def call_service(self, | + | ) |
- | """ | + | def call_service(self, |
- | try: | + | """ |
- | rospy.wait_for_service(service_name) | + | try: |
- | service_type = get_service_class_by_name(service_name) | + | rospy.wait_for_service(service_name) |
- | proxy = rospy.ServiceProxy(service_name, | + | service_type = get_service_class_by_name(service_name) |
- | return proxy(*args, | + | proxy = rospy.ServiceProxy(service_name, |
- | except rospy.ServiceException as e: | + | return proxy(*args, |
- | rospy.logerr(" | + | except rospy.ServiceException as e: |
- | def walk_current_graph(self): | + | rospy.logerr(" |
- | """ | + | def walk_current_graph(self): |
- | rospy.loginfo(" | + | """ |
- | # Call the ListGraph service | + | rospy.loginfo(" |
- | list_graph: ListGraphResponse = self.call_service("/ | + | # Call the ListGraph service |
- | waypoints = list_graph.waypoint_ids | + | list_graph: ListGraphResponse = self.call_service("/ |
- | # Call the / | + | waypoints = list_graph.waypoint_ids |
- | navigate_init_goal = NavigateInitGoal( | + | # Call the / |
- | upload_path="", | + | navigate_init_goal = NavigateInitGoal( |
- | initial_localization_fiducial=True, | + | upload_path="", |
- | initial_localization_waypoint=" | + | initial_localization_fiducial=True, |
- | ) | + | initial_localization_waypoint=" |
- | self.navigate_init_client.send_goal(navigate_init_goal) | + | ) |
- | self.navigate_init_client.wait_for_result() | + | self.navigate_init_client.send_goal(navigate_init_goal) |
- | # Check if the action succeeded | + | self.navigate_init_client.wait_for_result() |
- | if self.navigate_init_client.get_state() == GoalStatus.SUCCEEDED: | + | # Check if the action succeeded |
- | rospy.loginfo(" | + | if self.navigate_init_client.get_state() == GoalStatus.SUCCEEDED: |
- | for waypoint in waypoints: | + | rospy.loginfo(" |
- | # Call the / | + | for waypoint in waypoints: |
- | navigate_to_goal = NavigateToGoal(navigate_to=waypoint) | + | # Call the / |
- | self.navigate_to_client.send_goal(navigate_to_goal) | + | navigate_to_goal = NavigateToGoal(navigate_to=waypoint) |
- | self.navigate_to_client.wait_for_result() | + | self.navigate_to_client.send_goal(navigate_to_goal) |
- | # Check if the action succeeded | + | self.navigate_to_client.wait_for_result() |
- | if self.navigate_to_client.get_state() == GoalStatus.SUCCEEDED: | + | # Check if the action succeeded |
- | rospy.loginfo(f" | + | if self.navigate_to_client.get_state() == GoalStatus.SUCCEEDED: |
- | latest_world_objects_msg = rospy.wait_for_message( | + | rospy.loginfo(f" |
- | "/ | + | latest_world_objects_msg = rospy.wait_for_message( |
- | ) | + | "/ |
- | latest_fiducial = self.extract_fiducials(latest_world_objects_msg) | + | ) |
- | if waypoint in self.waypoint_fiducial: | + | latest_fiducial = self.extract_fiducials(latest_world_objects_msg) |
- | self.waypoint_fiducial[waypoint].append(latest_fiducial) | + | if waypoint in self.waypoint_fiducial: |
- | else: | + | self.waypoint_fiducial[waypoint].append(latest_fiducial) |
- | self.waypoint_fiducial[waypoint] = [latest_fiducial] | + | else: |
- | # Publish to the / | + | self.waypoint_fiducial[waypoint] = [latest_fiducial] |
- | self.reached_waypoint_pub.publish(waypoint) | + | # Publish to the / |
- | time.sleep(0.5) | + | self.reached_waypoint_pub.publish(waypoint) |
- | def extract_fiducials(self, | + | time.sleep(0.5) |
- | self.world_objects: | + | def extract_fiducials(self, |
- | fiducial_list = [] | + | self.world_objects: |
- | # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary | + | fiducial_list = [] |
- | for world_object in self.world_objects: | + | # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary |
- | april_tag: AprilTagProperties = world_object.apriltag_properties | + | for world_object in self.world_objects: |
- | latest_snapshot: | + | april_tag: AprilTagProperties = world_object.apriltag_properties |
- | # Check if april_tag is None | + | latest_snapshot: |
- | if april_tag is None: | + | # Check if april_tag is None |
- | continue | + | if april_tag is None: |
- | # Create the FrameTreeSnapshot as a dictionary | + | continue |
- | frame_tree_snapshot: | + | # Create the FrameTreeSnapshot as a dictionary |
- | for child, parent_edge in zip( | + | frame_tree_snapshot: |
- | latest_snapshot.child_edges, | + | for child, parent_edge in zip( |
- | ): | + | latest_snapshot.child_edges, |
- | parent_edge_transform = PoseStamped() | + | ): |
- | parent_edge_transform.header.stamp = world_object.acquisition_time | + | parent_edge_transform = PoseStamped() |
- | parent_edge_transform.header.frame_id = parent_edge.parent_frame_name | + | parent_edge_transform.header.stamp = world_object.acquisition_time |
- | parent_edge_transform.pose = parent_edge.parent_tform_child | + | parent_edge_transform.header.frame_id = parent_edge.parent_frame_name |
- | frame_tree_snapshot[child] = parent_edge_transform | + | parent_edge_transform.pose = parent_edge.parent_tform_child |
- | # Use tf2 to get the april_tag pose in the body frame | + | frame_tree_snapshot[child] = parent_edge_transform |
- | april_tag_pose = frame_tree_snapshot[f" | + | # Use tf2 to get the april_tag pose in the body frame |
- | april_tag_pose_filtered = frame_tree_snapshot[ | + | april_tag_pose = frame_tree_snapshot[f" |
- | f" | + | april_tag_pose_filtered = frame_tree_snapshot[ |
- | ] | + | f" |
- | # Build the april_tag into the Fiducial class | + | ] |
- | fiducial = Fiducial( | + | # Build the april_tag into the Fiducial class |
- | tag_id=april_tag.tag_id, | + | fiducial = Fiducial( |
- | dim_x=april_tag.x, | + | tag_id=april_tag.tag_id, |
- | dim_y=april_tag.y, | + | dim_x=april_tag.x, |
- | fiducial_pose=april_tag_pose, | + | dim_y=april_tag.y, |
- | filtered_fiducial_pose=april_tag_pose_filtered, | + | fiducial_pose=april_tag_pose, |
- | pose_covariance=april_tag.detection_covariance, | + | filtered_fiducial_pose=april_tag_pose_filtered, |
- | pose_covariance_frame=april_tag.detection_covariance_reference_frame, | + | pose_covariance=april_tag.detection_covariance, |
- | ) | + | pose_covariance_frame=april_tag.detection_covariance_reference_frame, |
- | # Append the fiducial to the list | + | ) |
- | fiducial_list.append(fiducial) | + | # Append the fiducial to the list |
- | return fiducial_list | + | fiducial_list.append(fiducial) |
- | def startup(self): | + | return fiducial_list |
- | rospy.loginfo(" | + | def startup(self): |
- | # Call the / | + | rospy.loginfo(" |
- | self.call_service("/ | + | # Call the / |
- | self.call_service("/ | + | self.call_service("/ |
- | def shutdown(self): | + | self.call_service("/ |
- | rospy.loginfo(" | + | def shutdown(self): |
- | # Save the fiducials to a pickle file | + | rospy.loginfo(" |
- | with open(f" | + | # Save the fiducials to a pickle file |
- | pickle.dump(self.fiducials_seen, | + | with open(f" |
- | with open(f" | + | pickle.dump(self.fiducials_seen, |
- | pickle.dump(self.waypoint_fiducial, | + | with open(f" |
- | # Call the /spot/sit, / | + | pickle.dump(self.waypoint_fiducial, |
- | self.call_service("/ | + | # Call the /spot/sit, / |
- | time.sleep(5) | + | self.call_service("/ |
- | self.call_service("/ | + | time.sleep(5) |
- | self.call_service("/ | + | self.call_service("/ |
- | def main(self): | + | self.call_service("/ |
- | rospy.init_node(" | + | def main(self): |
- | self.rates = rospy.get_param(" | + | rospy.init_node(" |
- | if " | + | self.rates = rospy.get_param(" |
- | loop_rate = self.rates[" | + | if " |
- | else: | + | loop_rate = self.rates[" |
- | loop_rate = 50 | + | else: |
- | # Initialize the node | + | loop_rate = 50 |
- | rate = rospy.Rate(loop_rate) | + | # Initialize the node |
- | rospy.loginfo(" | + | rate = rospy.Rate(loop_rate) |
- | self.initialize_subscribers() | + | rospy.loginfo(" |
- | self.initialize_publishers() | + | self.initialize_subscribers() |
- | self.initialize_action_clients() | + | self.initialize_publishers() |
- | rospy.on_shutdown(self.shutdown) | + | self.initialize_action_clients() |
- | # Walk the current graph | + | rospy.on_shutdown(self.shutdown) |
- | self.startup() | + | # Walk the current graph |
- | self.walk_current_graph() | + | self.startup() |
- | while not rospy.is_shutdown(): | + | self.walk_current_graph() |
- | rate.sleep() | + | while not rospy.is_shutdown(): |
- | if __name__ == " | + | rate.sleep() |
- | spot_nav = SpotNav() | + | if __name__ == " |
- | spot_nav.main() | + | spot_nav = SpotNav() |
+ | spot_nav.main() | ||
+ | </ |