Fiducial Localization Test Example

This was some example code used to traverse a GraphNav map and record fiducial positions along the way from the World Objects model of Spot.

#!/usr/bin/env python3
import typing
import pickle
import time
import rospy
import actionlib
from actionlib_msgs.msg import GoalStatus
from rosservice import get_service_class_by_name
from std_msgs.msg import String
from geometry_msgs.msg import PoseStamped
from spot_msgs.msg import WorldObjectArray, WorldObject
from spot_msgs.msg import AprilTagProperties
from spot_msgs.msg import FrameTreeSnapshot, ParentEdge
from spot_msgs.msg import NavigateInitAction, NavigateInitGoal
from spot_msgs.msg import NavigateToAction, NavigateToGoal
from spot_msgs.srv import ListGraphResponse
from std_srvs.srv import TriggerRequest
from fiducial import Fiducial
class SpotNav:
    def __init__(self):
        self.world_objects = None
        self.fiducials_seen: typing.Dict[int, typing.List["Fiducial"]] = {}
        self.waypoint_fiducial: typing.Dict[str, typing.List["Fiducial"]] = {}
    def initialize_subscribers(self):
        """Initialize ROS subscribers"""
        # Create a subscriber for the /spot/world_objects topic for WorldObjectArray messages
        self.world_objects_sub = rospy.Subscriber(
            "/spot/world_objects", WorldObjectArray, self.world_objects_callback
        )
    def initialize_publishers(self):
        """Initialize ROS publishers"""
        self.reached_waypoint_pub = rospy.Publisher(
            "/spot/nav/reached_waypoint", String, queue_size=1
        )
    def initialize_action_clients(self):
        """Initialize ROS action clients"""
        # Create an action client for the /spot/navigate_to action
        self.navigate_to_client = actionlib.SimpleActionClient(
            "/spot/navigate_to", NavigateToAction
        )
        # Create an action client for the /spot/navigate_init action
        self.navigate_init_client = actionlib.SimpleActionClient(
            "/spot/navigate_init", NavigateInitAction
        )
    def world_objects_callback(self, msg: WorldObjectArray):
        # Save the message to a class variable tracking the fiducials in the message
        self.world_objects: typing.List[WorldObject] = msg.world_objects
        # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary
        for world_object in self.world_objects:
            april_tag: AprilTagProperties = world_object.apriltag_properties
            latest_snapshot: FrameTreeSnapshot = world_object.frame_tree_snapshot
            # Check if april_tag is None
            if april_tag is None:
                continue
            # Create the FrameTreeSnapshot as a dictionary
            frame_tree_snapshot: typing.Dict[str, PoseStamped] = {}
            for child, parent_edge in zip(
                latest_snapshot.child_edges, latest_snapshot.parent_edges
            ):
                parent_edge_transform = PoseStamped()
                parent_edge_transform.header.stamp = world_object.acquisition_time
                parent_edge_transform.header.frame_id = parent_edge.parent_frame_name
                parent_edge_transform.pose = parent_edge.parent_tform_child
                frame_tree_snapshot[child] = parent_edge_transform
            april_tag_pose = frame_tree_snapshot[f"fiducial_{april_tag.tag_id}"]
            april_tag_pose_filtered = frame_tree_snapshot[
                f"filtered_fiducial_{april_tag.tag_id}"
            ]
            # Build the april_tag into the Fiducial class
            fiducial = Fiducial(
                tag_id=april_tag.tag_id,
                dim_x=april_tag.x,
                dim_y=april_tag.y,
                fiducial_pose=april_tag_pose,
                filtered_fiducial_pose=april_tag_pose_filtered,
                pose_covariance=april_tag.detection_covariance,
                pose_covariance_frame=april_tag.detection_covariance_reference_frame,
            )
            # Save the fiducial to the class variable
            if fiducial.tag_id in self.fiducials_seen:
                self.fiducials_seen[fiducial.tag_id].append(fiducial)
            else:
                self.fiducials_seen[fiducial.tag_id] = [fiducial]
            rospy.logdebug(
                f"fiducial: {fiducial.tag_id}\n position_x: {fiducial.fiducial_pose.pose.position.x:.2f}\n filtered_position_x: {fiducial.filtered_fiducial_pose.pose.position.x:.2f}"
            )
    def call_service(self, service_name: str, *args, **kwargs):
        """Call a service and wait for it to be available"""
        try:
            rospy.wait_for_service(service_name)
            service_type = get_service_class_by_name(service_name)
            proxy = rospy.ServiceProxy(service_name, service_type)
            return proxy(*args, **kwargs)
        except rospy.ServiceException as e:
            rospy.logerr("Service call failed: %s" % e)
    def walk_current_graph(self):
        """Walk the current graph in GraphNav"""
        rospy.loginfo("Walking the current graph")
        # Call the ListGraph service
        list_graph: ListGraphResponse = self.call_service("/spot/list_graph")
        waypoints = list_graph.waypoint_ids
        # Call the /spot/navigate_init action
        navigate_init_goal = NavigateInitGoal(
            upload_path="",
            initial_localization_fiducial=True,
            initial_localization_waypoint="mm",
        )
        self.navigate_init_client.send_goal(navigate_init_goal)
        self.navigate_init_client.wait_for_result()
        # Check if the action succeeded
        if self.navigate_init_client.get_state() == GoalStatus.SUCCEEDED:
            rospy.loginfo("NavigateInit action succeeded")
        for waypoint in waypoints:
            # Call the /spot/navigate_to action
            navigate_to_goal = NavigateToGoal(navigate_to=waypoint)
            self.navigate_to_client.send_goal(navigate_to_goal)
            self.navigate_to_client.wait_for_result()
            # Check if the action succeeded
            if self.navigate_to_client.get_state() == GoalStatus.SUCCEEDED:
                rospy.loginfo(f"NavigateTo {waypoint} action succeeded")
                latest_world_objects_msg = rospy.wait_for_message(
                    "/spot/world_objects", WorldObjectArray
                )
                latest_fiducial = self.extract_fiducials(latest_world_objects_msg)
                if waypoint in self.waypoint_fiducial:
                    self.waypoint_fiducial[waypoint].append(latest_fiducial)
                else:
                    self.waypoint_fiducial[waypoint] = [latest_fiducial]
                # Publish to the /spot/nav/reached_waypoint topic
                self.reached_waypoint_pub.publish(waypoint)
                time.sleep(0.5)
    def extract_fiducials(self, msg: WorldObjectArray) -> typing.List["Fiducial"]:
        self.world_objects: typing.List[WorldObject] = msg.world_objects
        fiducial_list = []
        # Iterate through the fiducials in the message, append the x,y,z coordinates to a dictionary
        for world_object in self.world_objects:
            april_tag: AprilTagProperties = world_object.apriltag_properties
            latest_snapshot: FrameTreeSnapshot = world_object.frame_tree_snapshot
            # Check if april_tag is None
            if april_tag is None:
                continue
            # Create the FrameTreeSnapshot as a dictionary
            frame_tree_snapshot: typing.Dict[str, PoseStamped] = {}
            for child, parent_edge in zip(
                latest_snapshot.child_edges, latest_snapshot.parent_edges
            ):
                parent_edge_transform = PoseStamped()
                parent_edge_transform.header.stamp = world_object.acquisition_time
                parent_edge_transform.header.frame_id = parent_edge.parent_frame_name
                parent_edge_transform.pose = parent_edge.parent_tform_child
                frame_tree_snapshot[child] = parent_edge_transform
            # Use tf2 to get the april_tag pose in the body frame
            april_tag_pose = frame_tree_snapshot[f"fiducial_{april_tag.tag_id}"]
            april_tag_pose_filtered = frame_tree_snapshot[
                f"filtered_fiducial_{april_tag.tag_id}"
            ]
            # Build the april_tag into the Fiducial class
            fiducial = Fiducial(
                tag_id=april_tag.tag_id,
                dim_x=april_tag.x,
                dim_y=april_tag.y,
                fiducial_pose=april_tag_pose,
                filtered_fiducial_pose=april_tag_pose_filtered,
                pose_covariance=april_tag.detection_covariance,
                pose_covariance_frame=april_tag.detection_covariance_reference_frame,
            )
            # Append the fiducial to the list
            fiducial_list.append(fiducial)
        return fiducial_list
    def startup(self):
        rospy.loginfo("SpotNav robot starting up")
        # Call the /spot/claim, /spot/power_on, /spot/stand service
        self.call_service("/spot/claim", TriggerRequest())
        self.call_service("/spot/power_on", TriggerRequest())
    def shutdown(self):
        rospy.loginfo("SpotNav node shutting down")
        # Save the fiducials to a pickle file
        with open(f"fiducials_seen.pickle_{time.time()}", "wb") as f:
            pickle.dump(self.fiducials_seen, f)
        with open(f"waypoint_fiducial.pickle_{time.time()}", "wb") as f:
            pickle.dump(self.waypoint_fiducial, f)
        # Call the /spot/sit, /spot/power_off, /spot/release service
        self.call_service("/spot/sit", TriggerRequest())
        time.sleep(5)
        self.call_service("/spot/power_off", TriggerRequest())
        self.call_service("/spot/release", TriggerRequest())
    def main(self):
        rospy.init_node("spot_nav", anonymous=True)
        self.rates = rospy.get_param("~rates", {})
        if "loop_frequency" in self.rates:
            loop_rate = self.rates["loop_frequency"]
        else:
            loop_rate = 50
        # Initialize the node
        rate = rospy.Rate(loop_rate)
        rospy.loginfo("SpotNav node started")
        self.initialize_subscribers()
        self.initialize_publishers()
        self.initialize_action_clients()
        rospy.on_shutdown(self.shutdown)
        # Walk the current graph
        self.startup()
        self.walk_current_graph()
        while not rospy.is_shutdown():
            rate.sleep()
if __name__ == "__main__":
    spot_nav = SpotNav()
    spot_nav.main()