documentation:robots:spot:spot_ros:spot_ros_example

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

documentation:robots:spot:spot_ros:spot_ros_example [2023/04/24 19:43] – created mingsdocumentation:robots:spot:spot_ros:spot_ros_example [2023/11/27 08:38] (current) marcusk
Line 3: Line 3:
 This was some example code used to traverse a GraphNav map and record fiducial positions along the way from the World Objects model of Spot. This was some example code used to traverse a GraphNav map and record fiducial positions along the way from the World Objects model of Spot.
  
-  #!/usr/bin/env python3 +<code python> 
-  import typing +#!/usr/bin/env python3 
-  import pickle +import typing 
-  import time +import pickle 
-  import rospy +import time 
-  import actionlib +import rospy 
-  from actionlib_msgs.msg import GoalStatus +import actionlib 
-  from rosservice import get_service_class_by_name +from actionlib_msgs.msg import GoalStatus 
-  from std_msgs.msg import String +from rosservice import get_service_class_by_name 
-  from geometry_msgs.msg import PoseStamped +from std_msgs.msg import String 
-  from spot_msgs.msg import WorldObjectArray, WorldObject +from geometry_msgs.msg import PoseStamped 
-  from spot_msgs.msg import AprilTagProperties +from spot_msgs.msg import WorldObjectArray, WorldObject 
-  from spot_msgs.msg import FrameTreeSnapshot, ParentEdge +from spot_msgs.msg import AprilTagProperties 
-  from spot_msgs.msg import NavigateInitAction, NavigateInitGoal +from spot_msgs.msg import FrameTreeSnapshot, ParentEdge 
-  from spot_msgs.msg import NavigateToAction, NavigateToGoal +from spot_msgs.msg import NavigateInitAction, NavigateInitGoal 
-  from spot_msgs.srv import ListGraphResponse +from spot_msgs.msg import NavigateToAction, NavigateToGoal 
-  from std_srvs.srv import TriggerRequest +from spot_msgs.srv import ListGraphResponse 
-  from fiducial import Fiducial +from std_srvs.srv import TriggerRequest 
-  class SpotNav: +from fiducial import Fiducial 
-      def __init__(self): +class SpotNav: 
-          self.world_objects = None +    def __init__(self): 
-          self.fiducials_seen: typing.Dict[int, typing.List["Fiducial"]] = {} +        self.world_objects = None 
-          self.waypoint_fiducial: typing.Dict[str, typing.List["Fiducial"]] = {} +        self.fiducials_seen: typing.Dict[int, typing.List["Fiducial"]] = {} 
-      def initialize_subscribers(self): +        self.waypoint_fiducial: typing.Dict[str, typing.List["Fiducial"]] = {} 
-          """Initialize ROS subscribers""" +    def initialize_subscribers(self): 
-          # Create a subscriber for the /spot/world_objects topic for WorldObjectArray messages +        """Initialize ROS subscribers""" 
-          self.world_objects_sub = rospy.Subscriber( +        # Create a subscriber for the /spot/world_objects topic for WorldObjectArray messages 
-              "/spot/world_objects", WorldObjectArray, self.world_objects_callback +        self.world_objects_sub = rospy.Subscriber( 
-          +            "/spot/world_objects", WorldObjectArray, self.world_objects_callback 
-      def initialize_publishers(self): +        
-          """Initialize ROS publishers""" +    def initialize_publishers(self): 
-          self.reached_waypoint_pub = rospy.Publisher( +        """Initialize ROS publishers""" 
-              "/spot/nav/reached_waypoint", String, queue_size=1 +        self.reached_waypoint_pub = rospy.Publisher( 
-          +            "/spot/nav/reached_waypoint", String, queue_size=1 
-      def initialize_action_clients(self): +        
-          """Initialize ROS action clients""" +    def initialize_action_clients(self): 
-          # Create an action client for the /spot/navigate_to action +        """Initialize ROS action clients""" 
-          self.navigate_to_client = actionlib.SimpleActionClient( +        # Create an action client for the /spot/navigate_to action 
-              "/spot/navigate_to", NavigateToAction +        self.navigate_to_client = actionlib.SimpleActionClient( 
-          +            "/spot/navigate_to", NavigateToAction 
-          # Create an action client for the /spot/navigate_init action +        
-          self.navigate_init_client = actionlib.SimpleActionClient( +        # Create an action client for the /spot/navigate_init action 
-              "/spot/navigate_init", NavigateInitAction +        self.navigate_init_client = actionlib.SimpleActionClient( 
-          +            "/spot/navigate_init", NavigateInitAction 
-      def world_objects_callback(self, msg: WorldObjectArray): +        
-          # Save the message to a class variable tracking the fiducials in the message +    def world_objects_callback(self, msg: WorldObjectArray): 
-          self.world_objects: typing.List[WorldObject] = msg.world_objects +        # Save the message to a class variable tracking the fiducials in the message 
-          # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary +        self.world_objects: typing.List[WorldObject] = msg.world_objects 
-          for world_object in self.world_objects: +        # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary 
-              april_tag: AprilTagProperties = world_object.apriltag_properties +        for world_object in self.world_objects: 
-              latest_snapshot: FrameTreeSnapshot = world_object.frame_tree_snapshot +            april_tag: AprilTagProperties = world_object.apriltag_properties 
-              # Check if april_tag is None +            latest_snapshot: FrameTreeSnapshot = world_object.frame_tree_snapshot 
-              if april_tag is None: +            # Check if april_tag is None 
-                  continue +            if april_tag is None: 
-              # Create the FrameTreeSnapshot as a dictionary +                continue 
-              frame_tree_snapshot: typing.Dict[str, PoseStamped] = {} +            # Create the FrameTreeSnapshot as a dictionary 
-              for child, parent_edge in zip( +            frame_tree_snapshot: typing.Dict[str, PoseStamped] = {} 
-                  latest_snapshot.child_edges, latest_snapshot.parent_edges +            for child, parent_edge in zip( 
-              ): +                latest_snapshot.child_edges, latest_snapshot.parent_edges 
-                  parent_edge_transform = PoseStamped() +            ): 
-                  parent_edge_transform.header.stamp = world_object.acquisition_time +                parent_edge_transform = PoseStamped() 
-                  parent_edge_transform.header.frame_id = parent_edge.parent_frame_name +                parent_edge_transform.header.stamp = world_object.acquisition_time 
-                  parent_edge_transform.pose = parent_edge.parent_tform_child +                parent_edge_transform.header.frame_id = parent_edge.parent_frame_name 
-                  frame_tree_snapshot[child] = parent_edge_transform +                parent_edge_transform.pose = parent_edge.parent_tform_child 
-              april_tag_pose = frame_tree_snapshot[f"fiducial_{april_tag.tag_id}"+                frame_tree_snapshot[child] = parent_edge_transform 
-              april_tag_pose_filtered = frame_tree_snapshot[ +            april_tag_pose = frame_tree_snapshot[f"fiducial_{april_tag.tag_id}"
-                  f"filtered_fiducial_{april_tag.tag_id}" +            april_tag_pose_filtered = frame_tree_snapshot[ 
-              +                f"filtered_fiducial_{april_tag.tag_id}" 
-              # Build the april_tag into the Fiducial class +            
-              fiducial = Fiducial( +            # Build the april_tag into the Fiducial class 
-                  tag_id=april_tag.tag_id, +            fiducial = Fiducial( 
-                  dim_x=april_tag.x, +                tag_id=april_tag.tag_id, 
-                  dim_y=april_tag.y, +                dim_x=april_tag.x, 
-                  fiducial_pose=april_tag_pose, +                dim_y=april_tag.y, 
-                  filtered_fiducial_pose=april_tag_pose_filtered, +                fiducial_pose=april_tag_pose, 
-                  pose_covariance=april_tag.detection_covariance, +                filtered_fiducial_pose=april_tag_pose_filtered, 
-                  pose_covariance_frame=april_tag.detection_covariance_reference_frame, +                pose_covariance=april_tag.detection_covariance, 
-              +                pose_covariance_frame=april_tag.detection_covariance_reference_frame, 
-              # Save the fiducial to the class variable +            
-              if fiducial.tag_id in self.fiducials_seen: +            # Save the fiducial to the class variable 
-                  self.fiducials_seen[fiducial.tag_id].append(fiducial) +            if fiducial.tag_id in self.fiducials_seen: 
-              else: +                self.fiducials_seen[fiducial.tag_id].append(fiducial) 
-                  self.fiducials_seen[fiducial.tag_id] = [fiducial] +            else: 
-              rospy.logdebug( +                self.fiducials_seen[fiducial.tag_id] = [fiducial] 
-                  f"fiducial: {fiducial.tag_id}\n position_x: {fiducial.fiducial_pose.pose.position.x:.2f}\n filtered_position_x: {fiducial.filtered_fiducial_pose.pose.position.x:.2f}" +            rospy.logdebug( 
-              +                f"fiducial: {fiducial.tag_id}\n position_x: {fiducial.fiducial_pose.pose.position.x:.2f}\n filtered_position_x: {fiducial.filtered_fiducial_pose.pose.position.x:.2f}" 
-      def call_service(self, service_name: str, *args, **kwargs): +            
-          """Call a service and wait for it to be available""" +    def call_service(self, service_name: str, *args, **kwargs): 
-          try: +        """Call a service and wait for it to be available""" 
-              rospy.wait_for_service(service_name) +        try: 
-              service_type = get_service_class_by_name(service_name) +            rospy.wait_for_service(service_name) 
-              proxy = rospy.ServiceProxy(service_name, service_type) +            service_type = get_service_class_by_name(service_name) 
-              return proxy(*args, **kwargs) +            proxy = rospy.ServiceProxy(service_name, service_type) 
-          except rospy.ServiceException as e: +            return proxy(*args, **kwargs) 
-              rospy.logerr("Service call failed: %s" % e) +        except rospy.ServiceException as e: 
-      def walk_current_graph(self): +            rospy.logerr("Service call failed: %s" % e) 
-          """Walk the current graph in GraphNav""" +    def walk_current_graph(self): 
-          rospy.loginfo("Walking the current graph"+        """Walk the current graph in GraphNav""" 
-          # Call the ListGraph service +        rospy.loginfo("Walking the current graph"
-          list_graph: ListGraphResponse = self.call_service("/spot/list_graph"+        # Call the ListGraph service 
-          waypoints = list_graph.waypoint_ids +        list_graph: ListGraphResponse = self.call_service("/spot/list_graph"
-          # Call the /spot/navigate_init action +        waypoints = list_graph.waypoint_ids 
-          navigate_init_goal = NavigateInitGoal( +        # Call the /spot/navigate_init action 
-              upload_path="", +        navigate_init_goal = NavigateInitGoal( 
-              initial_localization_fiducial=True, +            upload_path="", 
-              initial_localization_waypoint="mm", +            initial_localization_fiducial=True, 
-          +            initial_localization_waypoint="mm", 
-          self.navigate_init_client.send_goal(navigate_init_goal) +        
-          self.navigate_init_client.wait_for_result() +        self.navigate_init_client.send_goal(navigate_init_goal) 
-          # Check if the action succeeded +        self.navigate_init_client.wait_for_result() 
-          if self.navigate_init_client.get_state() == GoalStatus.SUCCEEDED: +        # Check if the action succeeded 
-              rospy.loginfo("NavigateInit action succeeded"+        if self.navigate_init_client.get_state() == GoalStatus.SUCCEEDED: 
-          for waypoint in waypoints: +            rospy.loginfo("NavigateInit action succeeded"
-              # Call the /spot/navigate_to action +        for waypoint in waypoints: 
-              navigate_to_goal = NavigateToGoal(navigate_to=waypoint) +            # Call the /spot/navigate_to action 
-              self.navigate_to_client.send_goal(navigate_to_goal) +            navigate_to_goal = NavigateToGoal(navigate_to=waypoint) 
-              self.navigate_to_client.wait_for_result() +            self.navigate_to_client.send_goal(navigate_to_goal) 
-              # Check if the action succeeded +            self.navigate_to_client.wait_for_result() 
-              if self.navigate_to_client.get_state() == GoalStatus.SUCCEEDED: +            # Check if the action succeeded 
-                  rospy.loginfo(f"NavigateTo {waypoint} action succeeded"+            if self.navigate_to_client.get_state() == GoalStatus.SUCCEEDED: 
-                  latest_world_objects_msg = rospy.wait_for_message( +                rospy.loginfo(f"NavigateTo {waypoint} action succeeded"
-                      "/spot/world_objects", WorldObjectArray +                latest_world_objects_msg = rospy.wait_for_message( 
-                  +                    "/spot/world_objects", WorldObjectArray 
-                  latest_fiducial = self.extract_fiducials(latest_world_objects_msg) +                
-                  if waypoint in self.waypoint_fiducial: +                latest_fiducial = self.extract_fiducials(latest_world_objects_msg) 
-                      self.waypoint_fiducial[waypoint].append(latest_fiducial) +                if waypoint in self.waypoint_fiducial: 
-                  else: +                    self.waypoint_fiducial[waypoint].append(latest_fiducial) 
-                      self.waypoint_fiducial[waypoint] = [latest_fiducial] +                else: 
-                  # Publish to the /spot/nav/reached_waypoint topic +                    self.waypoint_fiducial[waypoint] = [latest_fiducial] 
-                  self.reached_waypoint_pub.publish(waypoint) +                # Publish to the /spot/nav/reached_waypoint topic 
-                  time.sleep(0.5) +                self.reached_waypoint_pub.publish(waypoint) 
-      def extract_fiducials(self, msg: WorldObjectArray) -> typing.List["Fiducial"]: +                time.sleep(0.5) 
-          self.world_objects: typing.List[WorldObject] = msg.world_objects +    def extract_fiducials(self, msg: WorldObjectArray) -> typing.List["Fiducial"]: 
-          fiducial_list = [] +        self.world_objects: typing.List[WorldObject] = msg.world_objects 
-          # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary +        fiducial_list = [] 
-          for world_object in self.world_objects: +        # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary 
-              april_tag: AprilTagProperties = world_object.apriltag_properties +        for world_object in self.world_objects: 
-              latest_snapshot: FrameTreeSnapshot = world_object.frame_tree_snapshot +            april_tag: AprilTagProperties = world_object.apriltag_properties 
-              # Check if april_tag is None +            latest_snapshot: FrameTreeSnapshot = world_object.frame_tree_snapshot 
-              if april_tag is None: +            # Check if april_tag is None 
-                  continue +            if april_tag is None: 
-              # Create the FrameTreeSnapshot as a dictionary +                continue 
-              frame_tree_snapshot: typing.Dict[str, PoseStamped] = {} +            # Create the FrameTreeSnapshot as a dictionary 
-              for child, parent_edge in zip( +            frame_tree_snapshot: typing.Dict[str, PoseStamped] = {} 
-                  latest_snapshot.child_edges, latest_snapshot.parent_edges +            for child, parent_edge in zip( 
-              ): +                latest_snapshot.child_edges, latest_snapshot.parent_edges 
-                  parent_edge_transform = PoseStamped() +            ): 
-                  parent_edge_transform.header.stamp = world_object.acquisition_time +                parent_edge_transform = PoseStamped() 
-                  parent_edge_transform.header.frame_id = parent_edge.parent_frame_name +                parent_edge_transform.header.stamp = world_object.acquisition_time 
-                  parent_edge_transform.pose = parent_edge.parent_tform_child +                parent_edge_transform.header.frame_id = parent_edge.parent_frame_name 
-                  frame_tree_snapshot[child] = parent_edge_transform +                parent_edge_transform.pose = parent_edge.parent_tform_child 
-              # Use tf2 to get the april_tag pose in the body frame +                frame_tree_snapshot[child] = parent_edge_transform 
-              april_tag_pose = frame_tree_snapshot[f"fiducial_{april_tag.tag_id}"+            # Use tf2 to get the april_tag pose in the body frame 
-              april_tag_pose_filtered = frame_tree_snapshot[ +            april_tag_pose = frame_tree_snapshot[f"fiducial_{april_tag.tag_id}"
-                  f"filtered_fiducial_{april_tag.tag_id}" +            april_tag_pose_filtered = frame_tree_snapshot[ 
-              +                f"filtered_fiducial_{april_tag.tag_id}" 
-              # Build the april_tag into the Fiducial class +            
-              fiducial = Fiducial( +            # Build the april_tag into the Fiducial class 
-                  tag_id=april_tag.tag_id, +            fiducial = Fiducial( 
-                  dim_x=april_tag.x, +                tag_id=april_tag.tag_id, 
-                  dim_y=april_tag.y, +                dim_x=april_tag.x, 
-                  fiducial_pose=april_tag_pose, +                dim_y=april_tag.y, 
-                  filtered_fiducial_pose=april_tag_pose_filtered, +                fiducial_pose=april_tag_pose, 
-                  pose_covariance=april_tag.detection_covariance, +                filtered_fiducial_pose=april_tag_pose_filtered, 
-                  pose_covariance_frame=april_tag.detection_covariance_reference_frame, +                pose_covariance=april_tag.detection_covariance, 
-              +                pose_covariance_frame=april_tag.detection_covariance_reference_frame, 
-              # Append the fiducial to the list +            
-              fiducial_list.append(fiducial) +            # Append the fiducial to the list 
-          return fiducial_list +            fiducial_list.append(fiducial) 
-      def startup(self): +        return fiducial_list 
-          rospy.loginfo("SpotNav robot starting up") +    def startup(self): 
-          # Call the /spot/claim, /spot/power_on, /spot/stand service +        rospy.loginfo("SpotNav robot starting up") 
-          self.call_service("/spot/claim", TriggerRequest()) +        # Call the /spot/claim, /spot/power_on, /spot/stand service 
-          self.call_service("/spot/power_on", TriggerRequest()) +        self.call_service("/spot/claim", TriggerRequest()) 
-      def shutdown(self): +        self.call_service("/spot/power_on", TriggerRequest()) 
-          rospy.loginfo("SpotNav node shutting down"+    def shutdown(self): 
-          # Save the fiducials to a pickle file +        rospy.loginfo("SpotNav node shutting down"
-          with open(f"fiducials_seen.pickle_{time.time()}", "wb") as f: +        # Save the fiducials to a pickle file 
-              pickle.dump(self.fiducials_seen, f) +        with open(f"fiducials_seen.pickle_{time.time()}", "wb") as f: 
-          with open(f"waypoint_fiducial.pickle_{time.time()}", "wb") as f: +            pickle.dump(self.fiducials_seen, f) 
-              pickle.dump(self.waypoint_fiducial, f) +        with open(f"waypoint_fiducial.pickle_{time.time()}", "wb") as f: 
-          # Call the /spot/sit, /spot/power_off, /spot/release service +            pickle.dump(self.waypoint_fiducial, f) 
-          self.call_service("/spot/sit", TriggerRequest()) +        # Call the /spot/sit, /spot/power_off, /spot/release service 
-          time.sleep(5) +        self.call_service("/spot/sit", TriggerRequest()) 
-          self.call_service("/spot/power_off", TriggerRequest()) +        time.sleep(5) 
-          self.call_service("/spot/release", TriggerRequest()) +        self.call_service("/spot/power_off", TriggerRequest()) 
-      def main(self): +        self.call_service("/spot/release", TriggerRequest()) 
-          rospy.init_node("spot_nav", anonymous=True) +    def main(self): 
-          self.rates = rospy.get_param("~rates", {}) +        rospy.init_node("spot_nav", anonymous=True) 
-          if "loop_frequency" in self.rates: +        self.rates = rospy.get_param("~rates", {}) 
-              loop_rate = self.rates["loop_frequency"+        if "loop_frequency" in self.rates: 
-          else: +            loop_rate = self.rates["loop_frequency"
-              loop_rate = 50 +        else: 
-          # Initialize the node +            loop_rate = 50 
-          rate = rospy.Rate(loop_rate) +        # Initialize the node 
-          rospy.loginfo("SpotNav node started"+        rate = rospy.Rate(loop_rate) 
-          self.initialize_subscribers() +        rospy.loginfo("SpotNav node started"
-          self.initialize_publishers() +        self.initialize_subscribers() 
-          self.initialize_action_clients() +        self.initialize_publishers() 
-          rospy.on_shutdown(self.shutdown) +        self.initialize_action_clients() 
-          # Walk the current graph +        rospy.on_shutdown(self.shutdown) 
-          self.startup() +        # Walk the current graph 
-          self.walk_current_graph() +        self.startup() 
-          while not rospy.is_shutdown(): +        self.walk_current_graph() 
-              rate.sleep() +        while not rospy.is_shutdown(): 
-  if __name__ == "__main__": +            rate.sleep() 
-      spot_nav = SpotNav() +if __name__ == "__main__": 
-      spot_nav.main() +    spot_nav = SpotNav() 
 +    spot_nav.main() 
 +</code>
  • documentation/robots/spot/spot_ros/spot_ros_example.txt
  • Last modified: 2023/11/27 08:38
  • by marcusk