# A* Shortest Path Algorithm
# http://en.wikipedia.org/wiki/A*
# FB - 201012256
from heapq import heappush, heappop # for priority queue
import math
import time
import random
class node:
xPos = 0 # x position
yPos = 0 # y position
distance = 0 # total distance already travelled to reach the node
priority = 0 # priority = distance + remaining distance estimate
def __init__(self, xPos, yPos, distance, priority):
self.xPos = xPos
self.yPos = yPos
self.distance = distance
self.priority = priority
def __lt__(self, other): # comparison method for priority queue
return self.priority < other.priority
def updatePriority(self, xDest, yDest):
self.priority = self.distance + self.estimate(xDest, yDest) * 10 # A*
# give higher priority to going straight instead of diagonally
def nextMove(self, dirs, d): # d: direction to move
if dirs == 8 and d % 2 != 0:
self.distance += 14
else:
self.distance += 10
# Estimation function for the remaining distance to the goal.
def estimate(self, xDest, yDest):
xd = xDest - self.xPos
yd = yDest - self.yPos
# Euclidian Distance
d = math.sqrt(xd * xd + yd * yd)
# Manhattan distance
# d = abs(xd) + abs(yd)
# Chebyshev distance
# d = max(abs(xd), abs(yd))
return(d)
# A-star algorithm.
# The path returned will be a string of digits of directions.
def pathFind(the_map, n, m, dirs, dx, dy, xA, yA, xB, yB):
closed_nodes_map = [] # map of closed (tried-out) nodes
open_nodes_map = [] # map of open (not-yet-tried) nodes
dir_map = [] # map of dirs
row = [0] * n
for i in range(m): # create 2d arrays
closed_nodes_map.append(list(row))
open_nodes_map.append(list(row))
dir_map.append(list(row))
pq = [[], []] # priority queues of open (not-yet-tried) nodes
pqi = 0 # priority queue index
# create the start node and push into list of open nodes
n0 = node(xA, yA, 0, 0)
n0.updatePriority(xB, yB)
heappush(pq[pqi], n0)
open_nodes_map[yA][xA] = n0.priority # mark it on the open nodes map
# A* search
while len(pq[pqi]) > 0:
# get the current node w/ the highest priority
# from the list of open nodes
n1 = pq[pqi][0] # top node
n0 = node(n1.xPos, n1.yPos, n1.distance, n1.priority)
x = n0.xPos
y = n0.yPos
heappop(pq[pqi]) # remove the node from the open list
open_nodes_map[y][x] = 0
closed_nodes_map[y][x] = 1 # mark it on the closed nodes map
# quit searching when the goal is reached
# if n0.estimate(xB, yB) == 0:
if x == xB and y == yB:
# generate the path from finish to start
# by following the dirs
path = ''
while not (x == xA and y == yA):
j = dir_map[y][x]
c = str((j + dirs / 2) % dirs)
path = c + path
x += dx[j]
y += dy[j]
return path
# generate moves (child nodes) in all possible dirs
for i in range(dirs):
xdx = x + dx[i]
ydy = y + dy[i]
if not (xdx < 0 or xdx > n-1 or ydy < 0 or ydy > m - 1
or the_map[ydy][xdx] == 1 or closed_nodes_map[ydy][xdx] == 1):
# generate a child node
m0 = node(xdx, ydy, n0.distance, n0.priority)
m0.nextMove(dirs, i)
m0.updatePriority(xB, yB)
# if it is not in the open list then add into that
if open_nodes_map[ydy][xdx] == 0:
open_nodes_map[ydy][xdx] = m0.priority
heappush(pq[pqi], m0)
# mark its parent node direction
dir_map[ydy][xdx] = (i + dirs / 2) % dirs
elif open_nodes_map[ydy][xdx] > m0.priority:
# update the priority
open_nodes_map[ydy][xdx] = m0.priority
# update the parent direction
dir_map[ydy][xdx] = (i + dirs / 2) % dirs
# replace the node
# by emptying one pq to the other one
# except the node to be replaced will be ignored
# and the new node will be pushed in instead
while not (pq[pqi][0].xPos == xdx and pq[pqi][0].yPos == ydy):
heappush(pq[1 - pqi], pq[pqi][0])
heappop(pq[pqi])
heappop(pq[pqi]) # remove the target node
# empty the larger size priority queue to the smaller one
if len(pq[pqi]) > len(pq[1 - pqi]):
pqi = 1 - pqi
while len(pq[pqi]) > 0:
heappush(pq[1-pqi], pq[pqi][0])
heappop(pq[pqi])
pqi = 1 - pqi
heappush(pq[pqi], m0) # add the better node instead
return '' # if no route found
# MAIN
dirs = 8 # number of possible directions to move on the map
if dirs == 4:
dx = [1, 0, -1, 0]
dy = [0, 1, 0, -1]
elif dirs == 8:
dx = [1, 1, 0, -1, -1, -1, 0, 1]
dy = [0, 1, 1, 1, 0, -1, -1, -1]
n = 30 # horizontal size of the map
m = 30 # vertical size of the map
the_map = []
row = [0] * n
for i in range(m): # create empty map
the_map.append(list(row))
# fillout the map with a '+' pattern
for x in range(n / 8, n * 7 / 8):
the_map[m / 2][x] = 1
for y in range(m/8, m * 7 / 8):
the_map[y][n / 2] = 1
# randomly select start and finish locations from a list
sf = []
sf.append((0, 0, n - 1, m - 1))
sf.append((0, m - 1, n - 1, 0))
sf.append((n / 2 - 1, m / 2 - 1, n / 2 + 1, m / 2 + 1))
sf.append((n / 2 - 1, m / 2 + 1, n / 2 + 1, m / 2 - 1))
sf.append((n / 2 - 1, 0, n / 2 + 1, m - 1))
sf.append((n / 2 + 1, m - 1, n / 2 - 1, 0))
sf.append((0, m / 2 - 1, n - 1, m / 2 + 1))
sf.append((n - 1, m / 2 + 1, 0, m / 2 - 1))
(xA, yA, xB, yB) = random.choice(sf)
print 'Map size (X,Y): ', n, m
print 'Start: ', xA, yA
print 'Finish: ', xB, yB
t = time.time()
route = pathFind(the_map, n, m, dirs, dx, dy, xA, yA, xB, yB)
print 'Time to generate the route (seconds): ', time.time() - t
print 'Route:'
print route
# mark the route on the map
if len(route) > 0:
x = xA
y = yA
the_map[y][x] = 2
for i in range(len(route)):
j = int(route[i])
x += dx[j]
y += dy[j]
the_map[y][x] = 3
the_map[y][x] = 4
# display the map with the route added
print 'Map:'
for y in range(m):
for x in range(n):
xy = the_map[y][x]
if xy == 0:
print '.', # space
elif xy == 1:
print 'O', # obstacle
elif xy == 2:
print 'S', # start
elif xy == 3:
print 'R', # route
elif xy == 4:
print 'F', # finish
print
raw_input('Press Enter...')
Diff to Previous Revision
--- revision 2 2010-12-26 01:54:28
+++ revision 3 2010-12-26 08:31:16
@@ -7,29 +7,25 @@
import random
class node:
- # current position
- xPos = 0
- yPos = 0
- # total distance already travelled to reach the node
- distance = 0
- # priority = distance + remaining distance estimate
- priority = 0 # smaller: higher priority
+ xPos = 0 # x position
+ yPos = 0 # y position
+ distance = 0 # total distance already travelled to reach the node
+ priority = 0 # priority = distance + remaining distance estimate
def __init__(self, xPos, yPos, distance, priority):
self.xPos = xPos
self.yPos = yPos
self.distance = distance
self.priority = priority
- def __lt__(self, other): # for priority queue
+ def __lt__(self, other): # comparison method for priority queue
return self.priority < other.priority
def updatePriority(self, xDest, yDest):
self.priority = self.distance + self.estimate(xDest, yDest) * 10 # A*
- # give better priority to going straight instead of diagonally
- def nextdistance(self, directions, i): # i: direction
- if directions == 8 and i % 2 != 0:
+ # give higher priority to going straight instead of diagonally
+ def nextMove(self, dirs, d): # d: direction to move
+ if dirs == 8 and d % 2 != 0:
self.distance += 14
else:
self.distance += 10
-
# Estimation function for the remaining distance to the goal.
def estimate(self, xDest, yDest):
xd = xDest - self.xPos
@@ -43,11 +39,11 @@
return(d)
# A-star algorithm.
-# Path returned will be a string of digits of directions.
-def pathFind(the_map, directions, dx, dy, xStart, yStart, xFinish, yFinish):
+# The path returned will be a string of digits of directions.
+def pathFind(the_map, n, m, dirs, dx, dy, xA, yA, xB, yB):
closed_nodes_map = [] # map of closed (tried-out) nodes
open_nodes_map = [] # map of open (not-yet-tried) nodes
- dir_map = [] # map of directions
+ dir_map = [] # map of dirs
row = [0] * n
for i in range(m): # create 2d arrays
closed_nodes_map.append(list(row))
@@ -57,10 +53,10 @@
pq = [[], []] # priority queues of open (not-yet-tried) nodes
pqi = 0 # priority queue index
# create the start node and push into list of open nodes
- n0 = node(xStart, yStart, 0, 0)
- n0.updatePriority(xFinish, yFinish)
+ n0 = node(xA, yA, 0, 0)
+ n0.updatePriority(xB, yB)
heappush(pq[pqi], n0)
- open_nodes_map[yStart][xStart] = n0.priority # mark it on the open nodes map
+ open_nodes_map[yA][xA] = n0.priority # mark it on the open nodes map
# A* search
while len(pq[pqi]) > 0:
@@ -72,44 +68,43 @@
y = n0.yPos
heappop(pq[pqi]) # remove the node from the open list
open_nodes_map[y][x] = 0
- # mark it on the closed nodes map
- closed_nodes_map[y][x] = 1
+ closed_nodes_map[y][x] = 1 # mark it on the closed nodes map
- # quit searching when the goal state is reached
- # if n0.estimate(xFinish, yFinish) == 0:
- if x == xFinish and y == yFinish:
+ # quit searching when the goal is reached
+ # if n0.estimate(xB, yB) == 0:
+ if x == xB and y == yB:
# generate the path from finish to start
- # by following the directions
+ # by following the dirs
path = ''
- while not (x == xStart and y == yStart):
+ while not (x == xA and y == yA):
j = dir_map[y][x]
- c = str((j + directions / 2) % directions)
+ c = str((j + dirs / 2) % dirs)
path = c + path
x += dx[j]
y += dy[j]
return path
- # generate moves (child nodes) in all possible directions
- for i in range(directions):
+ # generate moves (child nodes) in all possible dirs
+ for i in range(dirs):
xdx = x + dx[i]
ydy = y + dy[i]
if not (xdx < 0 or xdx > n-1 or ydy < 0 or ydy > m - 1
or the_map[ydy][xdx] == 1 or closed_nodes_map[ydy][xdx] == 1):
# generate a child node
m0 = node(xdx, ydy, n0.distance, n0.priority)
- m0.nextdistance(directions, i)
- m0.updatePriority(xFinish, yFinish)
+ m0.nextMove(dirs, i)
+ m0.updatePriority(xB, yB)
# if it is not in the open list then add into that
if open_nodes_map[ydy][xdx] == 0:
open_nodes_map[ydy][xdx] = m0.priority
heappush(pq[pqi], m0)
# mark its parent node direction
- dir_map[ydy][xdx] = (i + directions / 2) % directions
+ dir_map[ydy][xdx] = (i + dirs / 2) % dirs
elif open_nodes_map[ydy][xdx] > m0.priority:
- # update the priority info
+ # update the priority
open_nodes_map[ydy][xdx] = m0.priority
- # update the parent direction info
- dir_map[ydy][xdx] = (i + directions / 2) % directions
+ # update the parent direction
+ dir_map[ydy][xdx] = (i + dirs / 2) % dirs
# replace the node
# by emptying one pq to the other one
# except the node to be replaced will be ignored
@@ -117,8 +112,8 @@
while not (pq[pqi][0].xPos == xdx and pq[pqi][0].yPos == ydy):
heappush(pq[1 - pqi], pq[pqi][0])
heappop(pq[pqi])
- heappop(pq[pqi]) # remove the wanted node
- # empty the larger size pq to the smaller one
+ heappop(pq[pqi]) # remove the target node
+ # empty the larger size priority queue to the smaller one
if len(pq[pqi]) > len(pq[1 - pqi]):
pqi = 1 - pqi
while len(pq[pqi]) > 0:
@@ -126,26 +121,25 @@
heappop(pq[pqi])
pqi = 1 - pqi
heappush(pq[pqi], m0) # add the better node instead
- return '' # no route found
+ return '' # if no route found
# MAIN
-directions = 8 # number of possible directions to move on the map
-if directions == 4:
+dirs = 8 # number of possible directions to move on the map
+if dirs == 4:
dx = [1, 0, -1, 0]
dy = [0, 1, 0, -1]
-elif directions == 8:
+elif dirs == 8:
dx = [1, 1, 0, -1, -1, -1, 0, 1]
dy = [0, 1, 1, 1, 0, -1, -1, -1]
-# map matrix
-n = 30 # horizontal size
-m = 30 # vertical size
+n = 30 # horizontal size of the map
+m = 30 # vertical size of the map
the_map = []
row = [0] * n
-for i in range(m):
+for i in range(m): # create empty map
the_map.append(list(row))
-# fillout the map matrix with a '+' pattern
+# fillout the map with a '+' pattern
for x in range(n / 8, n * 7 / 8):
the_map[m / 2][x] = 1
for y in range(m/8, m * 7 / 8):
@@ -163,12 +157,12 @@
sf.append((n - 1, m / 2 + 1, 0, m / 2 - 1))
(xA, yA, xB, yB) = random.choice(sf)
-print 'Map Size (X,Y): ', n, m
+print 'Map size (X,Y): ', n, m
print 'Start: ', xA, yA
print 'Finish: ', xB, yB
t = time.time()
-route = pathFind(the_map, directions, dx, dy, xA, yA, xB, yB)
-print 'Time to generate the route (s): ', time.time() - t
+route = pathFind(the_map, n, m, dirs, dx, dy, xA, yA, xB, yB)
+print 'Time to generate the route (seconds): ', time.time() - t
print 'Route:'
print route
@@ -184,7 +178,7 @@
the_map[y][x] = 3
the_map[y][x] = 4
-# display the map with the route
+# display the map with the route added
print 'Map:'
for y in range(m):
for x in range(n):