At present, we intend to use python to write an Astar global path algorithm, which is generally divided into three parts: receiving map data, designing path (of course), and publishing path.
1, Build function package
Create a new function package and import the dependency: gmapping map_server amcl move_base roscpp rospy std_msgs
Create a new python file in the function package, and run this file to generate the corresponding path. (relevant configurations are omitted)
2, Accept map data (process upstream)
View the topic and publisher of Map Publishing
First, we need to check where the map data is published and what type of publication is used:
rostopic list #There should be a / map in it rostopic info /map #View the type of publication, and the publisher rosmsg info nav_msgs/OccupancyGrid
Receive and publish messages in the same code
Let's accept the data first:
Normally, the code is as follows:
#! /usr/bin/env python import rospy from nav_msgs.msg import OccupancyGrid #Callback function def doMsg(msg): rospy.loginfo(msg.data) if __name__ == "__main__": #Initialize ROS node rospy.init_node("Astar_globel_path_planning",anonymous=True) #Accept data sub = rospy.Subscriber("/map",OccupancyGrid,doMsg,queue_size=10) rospy.spin() #If the spin function keeps looping, it won't go down #print(1)
Although it can get messages successfully, there is a problem in rospy The code after spin () will not be. To put it bluntly, there is no spinOnce in C + + in ros. If I want to end the spin, I have to close the node, so I can't accept and publish messages in one code.
terms of settlement:
Use rospy wait_ for_ The function of message is similar to spinOnce()
#! /usr/bin/env python import rospy from nav_msgs.msg import OccupancyGrid if __name__ == "__main__": #Initialize ROS node rospy.init_node("Astar_globel_path_planning",anonymous=True) #Accept data OGmap = rospy.wait_for_message("/map",OccupancyGrid,timeout=None) #Can you continue with the following code print(OGmap) print("success")
In this way, you can successfully accept the message and process and publish the data.
python+ros establishes nodes in classes
With the idea of pair oriented programming, of course, I want to write this function as a class, and it is more convenient to write it as a class, so I try to put it into a class.
In if__ name__ == "__main__": Class is used inside, and the class will be executed__ init__(self) function
For example:
class Test(): def __init__(self): print("stay__init__inside") #self._print() def _print(self): print("use_print") if __name__ == '__main__': a = Test()
'in' will be output__ init__ If you want to run other functions, you just need to write them in__ init__ In the (self) function
PS:
Say if__ name__ == "__main__": Role of:
We wrote it py files are sometimes run directly, and sometimes imported into other files as a function py file. In if__ name__ == "__main__": The code in it will run only when the file is run directly. If it is introduced into other files as a function package, it will not run.
We modify the code of the previous section as follows:
#! /usr/bin/env python import rospy from nav_msgs.msg import OccupancyGrid class pathPlanning(): def __init__(self): #Initialize ROS node rospy.init_node("Astar_globel_path_planning",anonymous=True) self.doMap() def doMap(self): ''' get data Process data into a matrix (unknown):-1,Passable:0,Impassable:1) ''' #Accept data self.OGmap = rospy.wait_for_message("/map",OccupancyGrid,timeout=None) if __name__ == "__main__": getmap = pathPlanning()
Understanding map data
Let's take a look at the OccupancyGrid data type
std_msgs/Header header uint32 seq time stamp string frame_id nav_msgs/MapMetaData info time map_load_time float32 resolution #This is the resolution uint32 width #width of matrix uint32 height #height of matrix geometry_msgs/Pose origin geometry_msgs/Point position float64 x float64 y float64 z geometry_msgs/Quaternion orientation float64 x float64 y float64 z float64 w int8[] data
The most important data is int8[] data, where map information is placed. The value of walkable area is 0, the value of obstacle is 100, and the value of unknown field is - 1. However, it is a list, not a matrix, so we need to turn it into a matrix to process the data.
OccupancyGrid puts the width and height of the matrix on it. We use numpy to transform:
def doMap(self): ''' get data Process data into a matrix (unknown):-1,Passable:0,Impassable:1) ''' #Get map data self.OGmap = rospy.wait_for_message("/map",OccupancyGrid,timeout=None) #Width of map self.width = self.OGmap.info.width #Height of map self.height = self.OGmap.info.height #Map resolution self.resolution = self.OGmap.info.resolution #Get the data of the map, the value of walkable area is 0, the value of obstacle is 100, and the value of unknown field is - 1 mapdata = np.array(self.OGmap.data,dtype=np.int8) #Turn map data into a matrix self.map = mapdata.reshape((self.height,self.width)) #Change the obstacles in the map from 100 to 1 self.map[self.map == 100] = 1
The general algorithm is to represent 0 as a feasible area and 1 as a non passable area, so a row transformation needs to be added at the end to turn all 100 in the matrix into 1
Then let's see what the map looks like:
#View map data storage format plt.matshow(self.map, cmap=plt.cm.gray) plt.show()
The results are as follows:
But the map I built is like this in rviz:
That is, it's just the opposite, so turn the matrix to turn it into the result we want. Optimize the doMap function:
def doMap(self): ''' get data Process data into a matrix (unknown):-1,Passable:0,Impassable:1) ''' #Accept data self.OGmap = rospy.wait_for_message("/map",OccupancyGrid,timeout=None) #Width of map self.width = self.OGmap.info.width #Height of map self.height = self.OGmap.info.height #Map resolution self.resolution = self.OGmap.info.resolution #Get the data of the map, the value of walkable area is 0, the value of obstacle is 100, and the value of unknown field is - 1 mapdata = np.array(self.OGmap.data,dtype=np.int8) #Turn map data into a matrix self.map = mapdata.reshape((self.height,self.width)) #Change the obstacles in the map from 100 to 1 self.map[self.map == 100] = 1 #The columns are in reverse order, so the columns should be in reverse order self.map = self.map[:,::-1] # #View map data storage format # plt.matshow(self.map, cmap=plt.cm.gray) # plt.show()
This becomes the data format we want.
Map expansion
Because we all regard the robot as a particle in path planning, it is necessary to expand the map to prevent it from hitting the surrounding.
The idea of map expansion is as follows:
- First find the positions of all 1s in the matrix
- Traverse the matrix. If the distance between the current position and 1 is within the set expansion distance, set it to 1
Write a function in the class:
def obstacleMap(self,obsize): indexList = np.where(self.map == 1)#Locate the position of 1 in the map matrix #Traversal map matrix for x in range(self.map.shape[0]): for y in range(self.map.shape[1]): if self.map[x][y] == 0: for ox,oy in zip(indexList[0],indexList[1]): #If the distance from the position with 1 is less than or equal to the expansion coefficient, it is set to 1 distance = math.sqrt((x-ox)**2+(y-oy)**2) if distance <= obsize: self.map[x][y] = 1
Accelerate with numba
But when I ran this code, I found that it ran for three minutes. After running it several times, my shadow elf began to roar. I knew that python was slow, but it was too slow. I checked the information on the Internet and found a good package to speed up the cycle called numba
Easy to use:
from numba import jit @jit(nopython=True) def fun(): pass
In this way, it can be accelerated, but there is another problem. I found that it can not be used in classes, but can only be used for functions alone, so I have no choice but to take out this function alone to accelerate it
#Speed up the slowest algorithm @jit(nopython=True) def _obstacleMap(map,obsize): ''' Give the map an expansion parameter ''' indexList = np.where(map == 1)#Locate the position of 1 in the map matrix #Traversal map matrix for x in range(map.shape[0]): for y in range(map.shape[1]): if map[x][y] == 0: for ox,oy in zip(indexList[0],indexList[1]): #If the distance from the position with 1 is less than or equal to the expansion coefficient, it is set to 1 distance = math.sqrt((x-ox)**2+(y-oy)**2) if distance <= obsize: map[x][y] = 1 class pathPlanning(): def __init__(self): #Initialize ROS node rospy.init_node("Astar_globel_path_planning",anonymous=True) self.doMap() #obsize is the expansion coefficient, which is based on the distance of the matrix rather than the real distance, so a conversion is required self.obsize=7 #15 is too big print("Now expand the map") ob_time = time.time() _obstacleMap(self.map,self.obsize) print("How long does it take to expand the map:{:.3f}".format(time.time()-ob_time))
Speed up from three minutes to 1.045s!!!! Human miracle
PS: there is no numba package for ros
I also had this problem at the beginning, because I didn't know where to download numba and use it directly
pip install numba
One is the network problem. Secondly, it can't be recognized when the source is changed. Therefore, you should query the location of the python package used by ros:
There is a setting in vscode JSON file, where the address of the python package is given:
/opt/ros/noetic/lib/python3/dist-packages
Or in the terminal:
pip show rospy
You can also find the address
Then you can download numba from domestic source to the corresponding address and use numba in ros
sudo pip install --target=/opt/ros/noetic/lib/python3/dist-packages numba -i https://pypi.tuna.tsinghua.edu.cn/simple