-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrt_rrt_star.py
More file actions
1295 lines (1059 loc) · 50.2 KB
/
rt_rrt_star.py
File metadata and controls
1295 lines (1059 loc) · 50.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Path planning Sample Code with RRT*
author: Atsushi Sakai(@Atsushi_twi)
"""
import math
import os
import sys
import time
import random
import pickle, dill
from collections import deque
import itertools, more_itertools
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import numpy as np
from ellipsis import get_random_node_ellipsis_sampling
sys.setrecursionlimit(10000)
show_animation = False
class RRTStar():
"""
Class for RRT Star planning
"""
class Node:
"""
RRT Node
"""
def __init__(self, x, y):
self.x = x
self.y = y
self.path_x = []
self.path_y = []
self.parent = None
self.children = {}
self.cost = 0.0
self.blocking_cost = 0.0
self.total_cost = lambda: self.cost + self.blocking_cost
self.trajectory_until_node = []
def __str__(self):
return '('+str(round(self.x))+','+str(round(self.y))+')'
def __repr__(self):
return '('+str(round(self.x))+','+str(round(self.y))+')'
def __hash__(self):
return hash((self.x, self.y))
def __lt__(self, other):
return (self.x, self.y) < (other.x, other.y)
def __eq__(self, other):
try:
return (self.x, self.y) == (other.x, other.y)
except AttributeError:
return False
def __init__(self,
start,
goal,
obstacle_list,
rand_area,
expand_dis=8.0,
path_resolution=1.0,
goal_sample_rate=20,
max_iter=300,
grid_size=20,
max_time=0.1,
connect_circle_dist=50.0,
search_until_max_iter=False,
warm_start=True,
warm_start_tree_size=6000,
robot_radius=0.0):
"""
Setting Parameter
start:Start Position [x,y]
goal:Goal Position [x,y]
obstacleList:obstacle Positions [[x,y,size],...]
randArea:Random Sampling Area [min,max]
"""
self.start = self.Node(start[0], start[1])
self.end = self.Node(goal[0], goal[1])
self.min_xrand = rand_area[0]
self.max_xrand = rand_area[1]
self.min_yrand = rand_area[2]
self.max_yrand = rand_area[3]
self.expand_dis = expand_dis
self.path_resolution = path_resolution
self.goal_sample_rate = goal_sample_rate
self.max_iter = max_iter
self.obstacle_list = obstacle_list
self.node_list = []
self.robot_radius = robot_radius
self.connect_circle_dist = connect_circle_dist
self.goal_node = self.Node(goal[0], goal[1])
self.search_until_max_iter = search_until_max_iter
self.max_time=max_time
#The RT-RRT* queues (to rewire from tree root and recursively the children)
self.Q_r = deque()
self.Q_s = deque()
#for plot purposes
self.rewired_r = []
self.rewired_s = []
self.rewired_s_success = []
#Grid for grid indexing of the nodes
self.grid = {}
self.cells = []
self.grid_size=grid_size
for x in range((rand_area[0]//grid_size)-2,(rand_area[1]//grid_size)+3):
self.grid[x] = {}
for y in range((rand_area[2]//grid_size)-2,(rand_area[3]//grid_size)+3):
self.grid[x][y] = []
self.cells.append((x,y))
#If warm start required, perform a warm start, but without any consideration for obstacles
if warm_start:
start_time = time.time()
obstacles = self.obstacle_list
self.warm_start(warm_start_tree_size)
self.obstacle_list = obstacles
print("warm start calculation terminated in ",time.time() - start_time)
#Blocked nodes by obstacles
#we block the cell containing an obstacle + all adjacent cells
self.blocked_cells = {}
for obstacle in self.obstacle_list:
self.blocked_cells[(obstacle[0]//self.grid_size, obstacle[1]//self.grid_size)] = None
for x,y in itertools.product([(obstacle[0]//self.grid_size)-1,obstacle[0]//self.grid_size,(obstacle[0]//self.grid_size)+1],[(obstacle[1]//self.grid_size)-1,obstacle[1]//self.grid_size,(obstacle[1]//self.grid_size)+1]):
self.blocked_cells[(x,y)] = None
# self.blocked_cells[(obstacle[0]//self.grid_size, obstacle[1]//self.grid_size)] = None
def draw_graph(self, rnd=None, room_area=None):
plt.clf()
if room_area is not None:
plt.xlim(room_area[:2])
plt.ylim(room_area[2:])
else:
plt.axis("equal")
plt.axis([-2, 15, -2, 15])
# for stopping simulation with the esc key.
plt.gcf().canvas.mpl_connect(
'key_release_event',
lambda event: [exit(0) if event.key == 'escape' else None])
if rnd is not None:
plt.plot(rnd.x, rnd.y, "^k")
if self.robot_radius > 0.0:
self.plot_circle(rnd.x, rnd.y, self.robot_radius, '-r')
for node in self.node_list:
if node.parent:
if node.blocking_cost == float("inf") or node.parent.blocking_cost == float("inf"):
plt.plot([node.x,node.parent.x], [node.y,node.parent.y], "-y")
else:
plt.plot([node.x,node.parent.x], [node.y,node.parent.y], "-g")
# for node in self.rewired_r:
# plt.plot(node.x, node.y, "1b")
# for node in self.rewired_s:
# # plt.plot(node.x, node.y, "2c")
for node in self.rewired_s_success:
plt.plot(node.x, node.y, "2m")
for (ox, oy, size) in self.obstacle_list:
self.plot_circle(ox, oy, size)
# self.plot_ellipsis(ox, oy, size)
plt.plot(self.start.x, self.start.y, "or")
plt.plot(self.end.x, self.end.y, "xr")
plt.grid(True)
# plt.pause(0.01)
def calc_dist_to_goal(self, x, y):
dx = x - self.end.x
dy = y - self.end.y
return math.hypot(dx, dy)
def nodes_inside_ellipsis(self,obstaclex,obstacley):
g_ell_center = (obstaclex-40, obstacley-40)
g_ell_width = 100
g_ell_height = 40
angle = 45.
cos_angle = np.cos(np.radians(180.-angle))
sin_angle = np.sin(np.radians(180.-angle))
x = np.array([node.x for node in self.node_list])
y = np.array([node.y for node in self.node_list])
xc = x - g_ell_center[0]
yc = y - g_ell_center[1]
xct = xc * cos_angle - yc * sin_angle
yct = xc * sin_angle + yc * cos_angle
rad_cc = (xct**2/(g_ell_width/2.)**2) + (yct**2/(g_ell_height/2.)**2)
nodes_inside_ellipsis = [self.node_list[index] for index in np.where(rad_cc <= 1.)[0]]
# print(nodes_inside_ellipsis)
return nodes_inside_ellipsis
@staticmethod
def plot_circle(x, y, size, color="-b"): # pragma: no cover
deg = list(range(0, 360, 5))
deg.append(0)
xl = [x + size * math.cos(np.deg2rad(d)) for d in deg]
yl = [y + size * math.sin(np.deg2rad(d)) for d in deg]
plt.plot(xl, yl, color)
@staticmethod
def plot_ellipsis(x,y,size,color="-b"):
g_ell_center = (x-40, y-40)
g_ell_width = 100
g_ell_height = 40
angle = 45.
g_ellipse = patches.Ellipse(g_ell_center, g_ell_width, g_ell_height, angle=angle, fill=False, edgecolor='green', linewidth=2)
plt.gca().add_patch(g_ellipse)
@staticmethod
def get_nearest_node_index(node_list, rnd_node):
dlist = [(node.x - rnd_node.x)**2 + (node.y - rnd_node.y)**2
for node in node_list]
minind = dlist.index(min(dlist))
return minind
@staticmethod
def check_collision(node, obstacleList, robot_radius):
if node is None:
return False
for (ox, oy, size) in obstacleList:
dx_list = [ox - x for x in node.path_x]
dy_list = [oy - y for y in node.path_y]
d_list = [dx * dx + dy * dy for (dx, dy) in zip(dx_list, dy_list)]
if min(d_list) <= (size+robot_radius)**2:
return False # collision
return True # safe
@staticmethod
def calc_distance_and_angle(from_node, to_node):
dx = to_node.x - from_node.x
dy = to_node.y - from_node.y
d = math.hypot(dx, dy)
theta = math.atan2(dy, dx)
return d, theta
def steer(self, from_node, to_node, extend_length=float("inf")):
new_node = self.Node(from_node.x, from_node.y)
d, theta = self.calc_distance_and_angle(new_node, to_node)
new_node.path_x = [new_node.x]
new_node.path_y = [new_node.y]
if extend_length > d:
extend_length = d
n_expand = math.floor(extend_length / self.path_resolution)
for _ in range(n_expand):
new_node.x += self.path_resolution * math.cos(theta)
new_node.y += self.path_resolution * math.sin(theta)
new_node.path_x.append(new_node.x)
new_node.path_y.append(new_node.y)
# d, alpha = self.calc_distance_and_angle(new_node, to_node)
d, _ = self.calc_distance_and_angle(new_node, to_node)
if d <= self.path_resolution:
# if d <= self.path_resolution and abs(math.degrees(alpha))<=10:
new_node.path_x.append(to_node.x)
new_node.path_y.append(to_node.y)
new_node.x = to_node.x
new_node.y = to_node.y
new_node.parent = from_node
new_node.blocking_cost = from_node.blocking_cost
return new_node
def line_is_free(self, from_node, to_node):
if from_node is None or to_node is None:
return False
path_x = [from_node.x]
path_y = [from_node.y]
nb_points = math.ceil(math.hypot(to_node.x - from_node.x, to_node.y - from_node.y)/self.path_resolution)
x_spacing = (to_node.x - from_node.x) / (nb_points + 1)
y_spacing = (to_node.y - from_node.y) / (nb_points + 1)
for i in range(1, nb_points+1):
path_x.append(from_node.x + i * x_spacing)
path_y.append(from_node.y + i * y_spacing)
path_x.append(to_node.x)
path_y.append(to_node.y)
for (ox, oy, size) in self.obstacle_list:
dx_list = [ox - x for x in path_x]
dy_list = [oy - y for y in path_y]
d_list = [dx * dx + dy * dy for (dx, dy) in zip(dx_list, dy_list)]
if min(d_list) <= (size+self.robot_radius)**2:
return False # collision
return True # safe
def get_random_node(self):
#random sampling
if random.randint(0, 100) > self.goal_sample_rate:
rnd = self.Node(
random.uniform(self.min_xrand, self.max_xrand),
random.uniform(self.min_yrand, self.max_yrand))
else:
# goal point sampling
if random.randint(0, 100) < self.goal_sample_rate:
rnd = self.Node(self.end.x, self.end.y)
#ellipsis sampling
else:
x,y = get_random_node_ellipsis_sampling([self.start.x,self.start.y],[self.end.x,self.end.y])
rnd = self.Node(x,y)
return rnd
def generate_partial_course(self, goal_ind):
node = self.node_list[goal_ind]
path = [[round(node.x), round(node.y)]]
path_nodes = [node]
while node.parent is not None and node.parent != self.start:
path.append([round(node.x), round(node.y)])
path_nodes.append(node)
node = node.parent
path.append([round(node.x), round(node.y)])
path_nodes.append(node)
path.append([round(self.start.x), round(self.start.y)])
path_nodes.append(self.start)
return path, path_nodes
#Finds all the nodes near to a given node (the nodes in the grid of the given node + adjacent grids)
def find_nodes_near(self, node):
X_near = []
for x,y in itertools.product([(node.x//self.grid_size)-1,node.x//self.grid_size,(node.x//self.grid_size)+1],[(node.y//self.grid_size)-1,node.y//self.grid_size,(node.y//self.grid_size)+1]):
X_near.extend(self.grid[x][y])
try:
X_near.remove(node)
except ValueError:
pass
return X_near
#Returns the nearest node of a given node (in the ones in the grid of the given node + adjacent grids)
def find_nearest_node(self, node):
X_near = self.find_nodes_near(node)
s = sorted(X_near, key=lambda x_near: math.hypot(node.x - x_near.x, node.y - x_near.y))
return s[0]
#Returns the nearest node in a path of a given node
def find_nearest_node_path(self, measured_robot_position, path_nodes):
s = sorted(path_nodes, key=lambda path_node: math.hypot(measured_robot_position[0] - path_node.x, measured_robot_position[1] - path_node.y))
return s[0]
def find_nodes_near_within_expanddis(self, node):
X_near = []
for x,y in itertools.product([(node.x//self.grid_size)-1,node.x//self.grid_size,(node.x//self.grid_size)+1],[(node.y//self.grid_size)-1,node.y//self.grid_size,(node.y//self.grid_size)+1]):
X_near.extend(self.grid[x][y])
#Remove itself
try:
X_near.remove(node)
except ValueError:
pass
#Remove nodes with distance greater than expand_dis
X_near = [x_near for x_near in X_near if math.hypot(node.x - x_near.x, node.y - x_near.y) <= self.expand_dis]
return X_near
def find_nodes_near_within_expanddis_and_not_infcost(self, node, unblocked_cells, new_blocked_cells):
X_near = []
neighbouring_cells = [(x,y) for x,y in itertools.product([(node.x//self.grid_size)-1,node.x//self.grid_size,(node.x//self.grid_size)+1],[(node.y//self.grid_size)-1,node.y//self.grid_size,(node.y//self.grid_size)+1])]
neighbouring_cells_not_blocked = list(set(neighbouring_cells)-set(new_blocked_cells)-set(unblocked_cells))
for cell in neighbouring_cells_not_blocked:
X_near.extend(self.grid[cell[0]][cell[1]])
#Remove itself
try:
X_near.remove(node)
except ValueError:
pass
#Remove nodes with distance greater than expand_dis
X_near = [x_near for x_near in X_near if (math.hypot(node.x - x_near.x, node.y - x_near.y) <= self.expand_dis and x_near.blocking_cost != float("inf"))]
s = sorted(X_near, key=lambda x_near: math.hypot(node.x - x_near.x, node.y - x_near.y))
# for x_near in s:
# print("\t\t",node,x_near,math.hypot(node.x - x_near.x, node.y - x_near.y))
return s
def recursively_swap_parent_child(parent,child):
node = self.node_list[goal_ind]
while node.parent is not None and node.parent != self.start:
path.append([round(node.x), round(node.y)])
path_nodes.append(node)
node = node.parent
def set_new_start_new_goal(self,new_start,new_goal):
new_start_node = self.find_nearest_node(self.Node(new_start[0],new_start[1]))
new_end_node = self.find_nearest_node(self.Node(new_goal[0],new_goal[1]))
old_start = self.start
self.start = new_start_node
processing_node = self.start
chain = []
while processing_node != old_start:
chain.append(processing_node)
processing_node = processing_node.parent
chain.append(old_start)
for elt in more_itertools.windowed(chain,n=2):
child, par = elt[0], elt[1]
child.children[par] = None
try:
del par.children[child]
except KeyError:
pass
par.parent = child
self.start.parent = None
self.start.cost = 0
self.update_cost_to_leaves(self.start)
self.end = new_end_node
#Rewire the entire tree from the root node
print("updating tree with start and end goal")
self.rewired_s = []
self.rewired_s_success = []
if not self.Q_s:
self.Q_s.append(self.start)
time_start = time.time()
while self.Q_s:
x_s = self.Q_s.popleft()
for child in x_s.children:
self.Q_s.append(child)
X_near = self.find_nodes_near_within_expanddis(x_s)
for x_near in X_near:
c_old = x_near.total_cost()
c_new = x_s.total_cost()+math.hypot(x_s.x-x_near.x,x_s.y-x_near.y)
if c_new<c_old and self.line_is_free(x_s, x_near) and x_s not in self.successors(x_near):
try:
del x_near.parent.children[x_near]
except KeyError:
#TODO
pass
x_near.parent = x_s
x_s.children[x_near] = None
x_near.cost = x_s.cost+math.hypot(x_near.x - x_s.x, x_near.y - x_s.y)
x_near.blocking_cost = x_s.blocking_cost
self.update_cost_to_leaves(x_near)
self.rewired_s_success.append(x_near)
self.Q_s.appendleft(x_near)
self.rewired_s.append(x_near)
def warm_start(self, number_nodes):
"""
warm start rt-rrt* planning
"""
self.node_list = [self.start]
self.obstacle_list = []
i = -1
#Recursively add the desired number of nodes in the tree
while len(self.node_list) < number_nodes:
i+=1
if len(self.node_list) % 100 == 0:
print("warm start in progress: ", round((len(self.node_list)/number_nodes)*100,1), "% completed")
#ADD NODE TO THE TREE
rnd = self.get_random_node()
nearest_ind = self.get_nearest_node_index(self.node_list, rnd)
new_node = self.steer(self.node_list[nearest_ind], rnd,
self.expand_dis)
#rewire the end node
if new_node == self.end:
near_node = self.node_list[nearest_ind]
self.end.cost = near_node.cost + math.hypot(self.end.x-near_node.x, self.end.y-near_node.y)
self.end.blocking_cost = near_node.blocking_cost
try:
del self.end.parent.children[self.end]
except Exception:
pass
self.end.parent = near_node
self.end.parent.children[self.end] = None
continue
#Find the nearest node and connect to it. Rewire if needed.
near_node = self.node_list[nearest_ind]
new_node.cost = near_node.cost + \
math.hypot(new_node.x-near_node.x,
new_node.y-near_node.y)
new_node.blocking_cost = near_node.blocking_cost
if self.check_collision(
new_node, self.obstacle_list, self.robot_radius):
near_inds = self.find_near_nodes(new_node)
node_with_updated_parent = self.choose_parent(
new_node, near_inds)
if node_with_updated_parent:
self.rewire(node_with_updated_parent, near_inds)
node_with_updated_parent.parent.children[node_with_updated_parent] = None
self.node_list.append(node_with_updated_parent)
self.grid[node_with_updated_parent.x//self.grid_size][node_with_updated_parent.y//self.grid_size].append(node_with_updated_parent)
self.Q_r.append(node_with_updated_parent)
else:
new_node.parent.children[new_node] = None
self.node_list.append(new_node)
self.grid[new_node.x//self.grid_size][new_node.y//self.grid_size].append(new_node)
self.Q_r.append(new_node)
#Rewire the entire tree from the root node
print("rewire from tree root")
self.rewired_s = []
self.rewired_s_success = []
#REWIRE FROM TREE ROOT
if not self.Q_s:
self.Q_s.append(self.start)
time_start = time.time()
while self.Q_s:
x_s = self.Q_s.popleft()
for child in x_s.children:
self.Q_s.append(child)
X_near = self.find_nodes_near_within_expanddis(x_s)
for x_near in X_near:
c_old = x_near.total_cost()
c_new = x_s.total_cost()+math.hypot(x_s.x-x_near.x,x_s.y-x_near.y)
if c_new<c_old and self.line_is_free(x_s, x_near) and x_s not in self.successors(x_near):
try:
del x_near.parent.children[x_near]
except KeyError:
#TODO
pass
x_near.parent = x_s
x_s.children[x_near] = None
x_near.cost = x_s.cost+math.hypot(x_near.x - x_s.x, x_near.y - x_s.y)
x_near.blocking_cost = x_s.blocking_cost
self.update_cost_to_leaves(x_near)
self.rewired_s_success.append(x_near)
self.Q_s.appendleft(x_near)
self.rewired_s.append(x_near)
print("rewire from tree root processed in ",time.time()-time_start,"s")
print("end of warm start computation, tree size is ",len(self.node_list))
def rewire_unblocked_nodes(self,unblocked_cells,new_blocked_cells):
#a dict with key the unblocked node, and value the min dist to a no blocked node
unblocked_nodes = {}
for unblocked_cell in unblocked_cells:
neighbouring_cells = [(x,y) for x,y in itertools.product([unblocked_cell[0]-1,unblocked_cell[0],unblocked_cell[0]+1],[unblocked_cell[1]-1,unblocked_cell[1],unblocked_cell[1]+1])]
neighbouring_cells_not_blocked = list(set(neighbouring_cells)-set(new_blocked_cells)-set(unblocked_cells))
#if there is no unblocked neighbouring cell, set the min dist to a no blocked node to inf
if not neighbouring_cells_not_blocked:
for node in self.grid[unblocked_cell[0]][unblocked_cell[1]]:
unblocked_nodes[node] = float("inf")
continue
for node in self.grid[unblocked_cell[0]][unblocked_cell[1]]:
list_dist_neighbours = []
for neighbour_cell in neighbouring_cells_not_blocked:
for neighbour in self.grid[neighbour_cell[0]][neighbour_cell[1]]:
list_dist_neighbours.append(math.hypot(neighbour.x - node.x, neighbour.y - node.y))
try:
unblocked_nodes[node] = min(list_dist_neighbours)
except ValueError:
unblocked_nodes[node] = float("inf")
unblocked_nodes = dict(sorted(unblocked_nodes.items(), key=lambda item: item[1]))
#now rewire the nodes by acending distance to the nearest non-blocked node (that is not recently unblocked)
for node in unblocked_nodes:
X_near = self.find_nodes_near_within_expanddis_and_not_infcost(node,unblocked_cells,new_blocked_cells)
if not X_near:
continue
costs = [x_near.total_cost()+math.hypot(node.x - x_near.x, node.y - x_near.y) for x_near in X_near]
new_parent = list(X_near)[costs.index(min(costs))]
if self.line_is_free(new_parent, node) and new_parent not in self.successors(node):
try:
del node.parent.children[node]
except KeyError:
pass
node.parent = new_parent
new_parent.children[node] = None
node.cost = new_parent.cost+math.hypot(node.x - new_parent.x, node.y - new_parent.y)
node.blocking_cost = new_parent.blocking_cost
self.update_cost_to_leaves(node)
self.rewired_s_success.append(node)
def planning(self, animation=True, current_pos=None, updated_obstacle_list=None):
"""
rrt star path planning
animation: flag for animation on or off .
"""
#Update costs and tree's root
time_start = time.time()
if current_pos is None:
self.node_list = [self.start]
elif self.start == current_pos:
#do nothing
pass
else:
# nearest_ind = self.get_nearest_node_index(self.node_list, self.Node(current_pos[0],current_pos[1]))
# if self.node_list[nearest_ind] == self.start:
# print("no move was done")
# else:
#for the new start node, set its parent as children, and set no parent. Also, set the cost of the old parent as "inf" UNLESS the goal position is changed!
# old_start = self.start
# # self.start = self.node_list[nearest_ind]
# self.start = current_pos
# self.start.parent = None
# del old_start.children[self.start]
# old_start.parent = self.start
# self.start.children[old_start] = None
# self.start.cost = 0
# self.update_cost_to_leaves(self.start)
old_start = self.start
self.start = current_pos
processing_node = self.start
chain = []
while processing_node != old_start:
chain.append(processing_node)
processing_node = processing_node.parent
chain.append(old_start)
for elt in more_itertools.windowed(chain,n=2):
child, par = elt[0], elt[1]
child.children[par] = None
try:
del par.children[child]
except KeyError:
pass
par.parent = child
self.start.parent = None
self.start.cost = 0
self.update_cost_to_leaves(self.start)
self.rewired_s = []
self.rewired_s_success = []
if updated_obstacle_list is not None:
self.obstacle_list = updated_obstacle_list
new_blocked_cells = {}
for obstacle in self.obstacle_list:
# new_blocked_cells[(obstacle[0]//self.grid_size, obstacle[1]//self.grid_size)] = None
for x,y in itertools.product([(obstacle[0]//self.grid_size)-1,obstacle[0]//self.grid_size,(obstacle[0]//self.grid_size)+1],[(obstacle[1]//self.grid_size)-1,obstacle[1]//self.grid_size,(obstacle[1]//self.grid_size)+1]):
new_blocked_cells[(x,y)] = None
became_blocked = list(set(list(new_blocked_cells))-set(list(self.blocked_cells)))
became_unblocked = list(set(list(self.blocked_cells))-set(list(new_blocked_cells)))
#set unblocked nodes blocking cost to 0.0 and propagate to leaves
for cell in became_unblocked:
for node in self.grid[cell[0]][cell[1]]:
node.blocking_cost = 0.0
self.unblock_leaves(node)
#rewiring of unblocked nodes, since some of them might be rewired to parts of the tree that are not blocked!
self.rewire_unblocked_nodes(became_unblocked,new_blocked_cells)
#nodes in front of the obstacle (ellipsis) are set to a high blocking cost
# nodes_inside_ellipsis = []
# for obstacle in self.obstacle_list:
# nodes_inside_ellipsis.extend(self.nodes_inside_ellipsis(obstacle[0],obstacle[1]))
# for node in nodes_inside_ellipsis:
# node.blocking_cost = float("inf")
# self.block_leaves(node)
#set newly blocked nodes blocking cost to inf and propagate to leaves
for cell in new_blocked_cells:
for node in self.grid[cell[0]][cell[1]]:
node.blocking_cost = float("inf")
self.block_leaves(node)
self.blocked_cells = new_blocked_cells
remaining_time = self.max_time - (time.time() - time_start)
# print("rewire from tree root")
#REWIRE FROM TREE ROOT
time_start = time.time()
# while time.time()-time_start < self.max_time * 0.9:
while time.time()-time_start < remaining_time * 0.9:
if not self.Q_s:
self.Q_s.append(self.start)
x_s = self.Q_s.popleft()
for child in x_s.children:
self.Q_s.append(child)
X_near = self.find_nodes_near_within_expanddis(x_s)
for x_near in X_near:
c_old = x_near.total_cost()
c_new = x_s.total_cost()+math.hypot(x_s.x-x_near.x,x_s.y-x_near.y)
if c_new<c_old and self.line_is_free(x_s, x_near) and x_s not in self.successors(x_near):
try:
del x_near.parent.children[x_near]
except KeyError:
#TODO fix this KeyError
pass
x_near.parent = x_s
x_s.children[x_near] = None
x_near.cost = x_s.cost+math.hypot(x_near.x - x_s.x, x_near.y - x_s.y)
x_near.blocking_cost = x_s.blocking_cost
self.update_cost_to_leaves(x_near)
self.rewired_s_success.append(x_near)
# self.Q_s.append(x_near)
self.rewired_s.append(x_near)
# self.Q_s.append(x_near)
# for child in x_s.children:
# self.Q_s.append(child)
#we reblock the cells in the area of the obstacles, becaume maybe because of rewiring some cells that are actually blocked became unblocked
for cell in new_blocked_cells:
for node in self.grid[cell[0]][cell[1]]:
node.blocking_cost = float("inf")
self.block_leaves(node)
return self.generate_path()
def choose_parent(self, new_node, near_inds):
"""
Computes the cheapest point to new_node contained in the list
near_inds and set such a node as the parent of new_node.
Arguments:
--------
new_node, Node
randomly generated node with a path from its neared point
There are not coalitions between this node and th tree.
near_inds: list
Indices of indices of the nodes what are near to new_node
Returns.
------
Node, a copy of new_node
"""
if not near_inds:
return None
# search nearest cost in near_inds
costs = []
for i in near_inds:
near_node = self.node_list[i]
t_node = self.steer(near_node, new_node)
if t_node and self.check_collision(
t_node, self.obstacle_list, self.robot_radius):
costs.append(self.calc_new_cost(near_node, new_node))
else:
costs.append(float("inf")) # the cost of collision node
min_cost = min(costs)
if min_cost == float("inf"):
print("There is no good path.(min_cost is inf)")
return None
min_ind = near_inds[costs.index(min_cost)]
new_node = self.steer(self.node_list[min_ind], new_node)
new_node.cost = min_cost
new_node.blocking_cost = self.node_list[min_ind].blocking_cost
return new_node
def search_best_goal_node(self):
dist_to_goal_list = [
self.calc_dist_to_goal(n.x, n.y) for n in self.node_list
]
goal_inds = [
dist_to_goal_list.index(i) for i in dist_to_goal_list
if i <= self.expand_dis
]
safe_goal_inds = []
for goal_ind in goal_inds:
t_node = self.steer(self.node_list[goal_ind], self.goal_node)
if self.check_collision(
t_node, self.obstacle_list, self.robot_radius):
safe_goal_inds.append(goal_ind)
if not safe_goal_inds:
return None
min_cost = min([self.node_list[i].cost for i in safe_goal_inds])
for i in safe_goal_inds:
if self.node_list[i].cost == min_cost:
print("COST",i,self.node_list[i],self.node_list[i].cost)
return i
return None
def search_best_temporary_goal_node(self):
#In case no goal can be reached, find the node the closest to it, so we can already go towards it
dist_to_goal_list = [
self.calc_dist_to_goal(n.x, n.y) for n in self.node_list
]
return dist_to_goal_list.index(min(dist_to_goal_list))
def generate_path(self):
if self.end.parent is not None and self.end.total_cost() != float("inf"):
path = [[round(self.end.x), round(self.end.y)]]
path_nodes = [self.end]
node = self.end
while node.parent is not None and node.parent != self.start:
path.append([round(node.x), round(node.y)])
path_nodes.append(node)
node = node.parent
path.append([round(node.x), round(node.y)])
path_nodes.append(node)
path.append([round(self.start.x), round(self.start.y)])
path_nodes.append(self.start)
return path, path_nodes
else:
print(self.end,"is inf or has no parent",self.end.total_cost())
dist_to_goal_list = [self.calc_dist_to_goal(n.x, n.y) if n.total_cost()!=float("inf") else float("inf") for n in self.node_list]
if min(dist_to_goal_list) == float("inf"):
return [[round(self.start.x), round(self.start.y)]], [self.start]
return self.generate_partial_course(dist_to_goal_list.index(min(dist_to_goal_list)))
def find_near_nodes(self, new_node):
"""
1) defines a ball centered on new_node
2) Returns all nodes of the three that are inside this ball
Arguments:
---------
new_node: Node
new randomly generated node, without collisions between
its nearest node
Returns:
-------
list
List with the indices of the nodes inside the ball of
radius r
"""
nnode = len(self.node_list) + 1
r = self.connect_circle_dist * math.sqrt((math.log(nnode) / nnode))
# if expand_dist exists, search vertices in a range no more than
# expand_dist
if hasattr(self, 'expand_dis'):
r = min(r, self.expand_dis)
dist_list = [(node.x - new_node.x)**2 + (node.y - new_node.y)**2
for node in self.node_list]
near_inds = [dist_list.index(i) for i in dist_list if i <= r**2]
return near_inds
def rewire(self, new_node, near_inds):
"""
For each node in near_inds, this will check if it is cheaper to
arrive to them from new_node.
In such a case, this will re-assign the parent of the nodes in
near_inds to new_node.
Parameters:
----------
new_node, Node
Node randomly added which can be joined to the tree
near_inds, list of uints
A list of indices of the self.new_node which contains
nodes within a circle of a given radius.
Remark: parent is designated in choose_parent.
"""
for i in near_inds:
near_node = self.node_list[i]
edge_node = self.steer(new_node, near_node)
if not edge_node:
continue
edge_node.cost = self.calc_new_cost(new_node, near_node)
no_collision = self.check_collision(
edge_node, self.obstacle_list, self.robot_radius)
improved_cost = near_node.cost > edge_node.cost
if no_collision and improved_cost:
try:
del near_node.parent.children[near_node]
except KeyError:
print("\t",near_node,"was not in the children list of",near_node.parent)
near_node.x = edge_node.x
near_node.y = edge_node.y
near_node.cost = edge_node.cost
near_node.blocking_cost = edge_node.blocking_cost
near_node.path_x = edge_node.path_x
near_node.path_y = edge_node.path_y
near_node.parent = edge_node.parent
self.propagate_cost_to_leaves(new_node)
def calc_new_cost(self, from_node, to_node):
d, _ = self.calc_distance_and_angle(from_node, to_node)
return from_node.cost + d
def propagate_cost_to_leaves(self, parent_node):
for node in self.node_list:
if node.parent == parent_node:
node.cost = self.calc_new_cost(parent_node, node)
self.propagate_cost_to_leaves(node)
def block_leaves(self, parent_node):
for node in parent_node.children:
node.blocking_cost = float("inf")
self.block_leaves(node)
def unblock_leaves(self, parent_node):
for node in parent_node.children:
node.blocking_cost = 0.0
self.unblock_leaves(node)
def find_near_nodes_close_updated_obstacles(self):
for node in self.node_list:
try:
for (ox, oy, size) in self.obstacle_list:
if math.hypot(ox-node.x, oy-node.y) <= (size+self.robot_radius):
node.cost = float("inf")
self.propagate_cost_to_leaves(node)
print("update",node.x,node.y)
except ValueError:
continue
def successors(self,node):
lst = []
for child in node.children:
lst.append(child)
lst.extend(self.successors(child))
return lst