generate_obstacles.py:

  1. import statements: Importing necessary libraries like torch, pygame, and os.

  2. def main():: Defining the main function.

  3. pygame.init() and pygame.display.init(): Initializing Pygame and the display.

  4. window_size = 366: Setting the window size to 366 pixels.

  5. window = pygame.display.set_mode((window_size, window_size)): Creating a square window with the specified size.

  6. clock = pygame.time.Clock(): Creating a clock object to control the frame rate.

  7. map = pygame.image.load('map0.png'): Loading a map image file.

  8. idx = 0, Segments = torch.tensor([], dtype=torch.float), Apexes = torch.zeros((4, 2), dtype=torch.float): Initializing variables for storing obstacle segments and their vertices.

  9. Obstacle_Segments = torch.load('Obstacle_Segments.pt'): Loading previously saved obstacle segments from a file.

  10. for _ in range(Obstacle_Segments.shape[0]):: Iterating over the loaded obstacle segments.

  11. pygame.draw.line(map, (0, 0, 0), Obstacle_Segments[_, 0].int().numpy(), Obstacle_Segments[_, 1].int().numpy(), width=4): Drawing the loaded obstacle segments on the map.

  12. pygame.draw.line(...): Drawing red lines at specific coordinates on the map.

  13. while True:: Starting the main game loop.

  14. ev = pygame.event.get(), keys = pygame.key.get_pressed(): Getting the events and key presses.

  15. if keys[pygame.K_ESCAPE]:: Checking if the Escape key is pressed.

  16. torch.save(Segments, 'Obstacle_Segments.pt'): Saving the obstacle segments to a file.

  17. break: Exiting the game loop.

  18. for event in ev:: Iterating over the events.

  19. if event.type == pygame.MOUSEBUTTONDOWN:: Checking if the mouse button is clicked.

  20. pos = pygame.mouse.get_pos(): Getting the mouse position.

  21. Apexes[idx] = torch.tensor(pos): Storing the mouse position as a vertex.

  22. idx += 1: Incrementing the vertex index.

  23. if idx > 1:: If there are at least two vertices.

  24. pygame.draw.line(map, (0, 0, 255), Apexes[idx - 2].int().numpy(), Apexes[idx - 1].int().numpy(), width=4): Drawing a line between the last two vertices on the map.

  25. if idx == 4:: If there are four vertices.

  26. pygame.draw.line(map, (0, 0, 255), Apexes[0].int().numpy(), Apexes[-1].int().numpy(), width=4): Drawing a line between the first and last vertices, forming a quadrilateral.

  27. idx = 0: Resetting the vertex index.

  28. Startpoints = Apexes.clone(), Endpoints = torch.zeros((4, 2)): Initializing start and end points for the obstacle segment.

  29. Endpoints[0:3] = Apexes[1:4].clone(), Endpoints[3] = Apexes[0].clone(): Assigning start and end points for the obstacle segment.

  30. obstacle = torch.stack((Startpoints, Endpoints), dim=1): Creating a tensor for the obstacle segment.

  31. print(obstacle): Printing the obstacle segment tensor.

  32. Segments = torch.cat((Segments, obstacle)): Concatenating the obstacle segment with the existing segments.

  33. window.blit(map, map.get_rect()): Blitting the map onto the window.

  34. pygame.event.pump() and pygame.display.update(): Updating the display.

  35. clock.tick(100): Limiting the frame rate to 100 FPS.

  36. if __name__ == '__main__':: Checking if the script is being run directly.

  37. main(): Calling the main function.

This code creates a simple GUI application using Pygame, where the user can draw obstacles (quadrilaterals) on the map by clicking with the mouse. The obstacle segments are saved to a file (Obstacle_Segments.pt), which can be loaded and used in other applications.

DTPSO_Path_static.py:

This code is more complex and involves path planning using the DTPSO (Discrete Time-driven Particle Swarm Optimization) algorithm. I'll explain the key parts:

  1. import statements: Importing necessary libraries.

  2. def str2bool(v):: A helper function to convert string values to boolean for argparse.

  3. parser = argparse.ArgumentParser(): Creating an argument parser.

  4. parser.add_argument(...): Adding command-line arguments for various parameters like device, compilation, number of particles, dimensions, iterations, rendering, and truncation window length.

  5. opt = parser.parse_args(): Parsing the command-line arguments.

  6. device = torch.device(opt.dvc): Setting the device (CPU or GPU) for computations.

  7. class DTPSO_Path_Plan():: Defining the DTPSO_Path_Plan class.

  8. def __init__(self, opt):: Initializing the class with the provided options.

  9. self.Max_iterations = opt.Max_iterations: Setting the maximum number of iterations.

  10. self.dvc = device: Setting the device for computations.

  11. self.G, self.N, self.D = 8, opt.N, opt.D: Setting the number of groups, particles, and dimensions.

  12. self.Search_range = torch.tensor([5., 360.], device=self.dvc): Setting the search range for particles.

  13. self.arange_idx = torch.arange(self.G, device=self.dvc): Creating an index tensor for groups.

  14. self.window_size = 366: Setting the window size for rendering.

  15. self.Obs_Segments = torch.load('Generate_Obstacle_Segments/Obstacle_Segments.pt').to(self.dvc): Loading obstacle segments from a file and transferring them to the device.

  16. self.O = self.Obs_Segments.shape[0] // 4: Calculating the number of obstacles.

  17. self.Grouped_Obs_Segments = self.Obs_Segments.reshape(self.O, 4, 2, 2): Reshaping the obstacle segments tensor.

  18. self.NP = int(opt.D / 2), self.S = self.NP - 1: Calculating the number of path points and segments.

  19. self.P = self.G * self.N * self.S: Calculating the total number of path segments.

  20. self.x_start, self.y_start = opt.start, opt.start, self.x_target, self.y_target = opt.target, opt.target: Setting the start and target coordinates.

  21. self.rd_area = 0.5 * (opt.target - opt.start) / (self.NP - 1): Calculating the random area for initializing particles based on prior knowledge.

  22. self.Random = torch.ones((4, self.G, self.N, 1), device=self.dvc): Initializing a tensor for random numbers used in velocity updates.

  23. self.render = opt.render: Setting the rendering flag.

  24. if self.render:: If rendering is enabled.

  25. pygame.init(), pygame.display.init(): Initializing Pygame and the display.

  26. self.window = pygame.display.set_mode((self.window_size, self.window_size)): Creating a window for rendering.

  27. self.clock = pygame.time.Clock(): Creating a clock object for controlling the frame rate.

  28. `self.canvas = pygame.Surface((self.window_

  29. self.map_pyg = pygame.Surface((self.window_size, self.window_size)): Creating a surface for the map.

  30. self.map_pyg.fill((255, 255, 255)): Filling the map surface with a white color.

  31. for _ in range(self.Grouped_Obs_Segments.shape[0]):: Iterating over the obstacles.

  32. obs_color = (50, 50, 50) if _ < (self.O - 2) else (225, 100, 0): Setting the obstacle color based on its index.

  33. pygame.draw.polygon(self.map_pyg, obs_color, self.Grouped_Obs_Segments[_, :, 0, :].cpu().int().numpy()): Drawing the obstacle polygon on the map surface.

  34. self.canvas.blit(self.map_pyg, self.map_pyg.get_rect()): Blitting the map surface onto the canvas.

  35. def _uniform_random(self, low, high, shape):: A helper function to generate uniformly random numbers within a specified range and shape.

  36. def Reset(self, Priori_X, ratio):: A method to reset the particle swarm optimization (PSO) algorithm with prior knowledge (Priori_X) and a ratio for initializing particles.

  37. self.t0 = time.time(): Recording the start time for iteration monitoring.

  38. self.Obs_Free = False: Initializing the flag for obstacle-free paths.

  39. self.Tbest_values_deque = deque(maxlen=opt.TrucWindow): Creating a deque to store the best values for auto-truncation.

  40. self.w_init = ..., self.w_end = ..., self.w_delta = ...: Initializing the inertia weights for different groups and calculating the weight delta for decay.

  41. self.v_limit_ratio = ..., self.v_min = ..., self.v_max = ..., self.v_init_ratio = ...: Initializing velocity limits and ratios for different groups.

  42. self.Hyper = ...: Initializing a hyperparameter tensor for different groups.

  43. self.Locate = ...: Initializing a tensor to store particle positions.

  44. self._position_init(Priori_X, ratio): Calling the method to initialize particle positions based on prior knowledge.

  45. self.Kinmtc = ...: Initializing a tensor to store particle velocities, best positions, and best values.

  46. self.Pbest_value = ..., self.Gbest_value = ..., self.Tbest_value = ...: Initializing tensors to store the best values for particles, groups, and the entire swarm.

  47. def _position_init(self, Priori_X, ratio):: A method to initialize particle positions based on prior knowledge and a ratio.

  48. self.Locate[1:4] = ...: Initializing particle positions randomly within the search range.

  49. RN = int(ratio * self.N): Calculating the number of particles to be initialized based on prior knowledge.

  50. self.Locate[1:4, :, 0:RN] = ...: Initializing a subset of particles based on prior knowledge and a random area.

  51. self.Locate[1:4].clip_(self.Search_range[0], self.Search_range[1]): Clipping particle positions to the search range.

  52. self.Locate[1:4, :, :, 0] = self.x_start, self.Locate[1:4, :, :, self.NP] = self.y_start, self.Locate[1:4, :, :, self.NP - 1] = self.x_target, self.Locate[1:4, :, :, 2 * self.NP - 1] = self.y_target: Fixing the start and target positions for all particles.

  53. def _Cross_product_for_VectorSet(self, V_PM, V_PP):: A method to calculate the cross product between two sets of vectors.

  54. def _Is_Seg_Ingersection_PtoM(self, P, M):: A method to check for intersections between sets of line segments.

  55. def _get_Obs_Penalty(self):: A method to calculate the obstacle penalty for each particle.

  56. def iterate(self):: The main iteration method for the PSO algorithm.

  57. for i in range(self.Max_iterations):: Iterating over the maximum number of iterations.

  58. if self.Obs_Free and (np.std(self.Tbest_values_deque) < 5):: Checking the condition for auto-truncation based on the obstacle-free flag and the standard deviation of the best values.

  59. print(f'Averate Iteration Time:{(time.time() - self.t0) / (i + 1)}'): Printing the average iteration time.

  60. break: Breaking out of the iteration loop if the auto-truncation condition is met.

  61. lenth = ..., Obs_penalty = ..., fitness = ...: Calculating the path length, obstacle penalty, and overall fitness for each particle.

  62. P_replace = ..., self.Pbest_value[P_replace] = ..., self.Kinmtc[1, P_replace] = ...: Updating the personal best values and positions for particles.

  63. values, indices = ..., G_replace = ..., self.Gbest_value[G_replace] = ..., self.Kinmtc[2, G_replace] = ...: Updating the group best values and positions.

  64. flat_idx = ..., idx0, idx1 = ..., if Obs_penalty[idx0, idx1] == 0: self.Obs_Free = True, else: self.Obs_Free = False, min_fitness = ..., if min_fitness < self.Tbest_value:, self.Tbest_value = ..., self.Kinmtc[3] = ...: Updating the global best value and position, and setting the obstacle-free flag.

  65. self.Tbest_values_deque.append(self.Tbest_value.item()): Appending the global best value to the deque for auto-truncation.

  66. self.Hyper[0] = ...: Updating the inertia weight for the current iteration.

  67. self.Random[1:4] = ...: Generating random numbers for velocity updates.

  68. self.Kinmtc[0] = ...: Calculating the new velocities based on the inertia weight, random numbers, and the current positions, personal best, group best, and global best positions.

  69. self.Kinmtc[0].clip_(self.v_min, self.v_max): Clipping the velocities to the specified limits.

  70. self.Locate[1:4] += self.Kinmtc[0]: Updating the particle positions based on the new velocities.

  71. self.Locate[1:4].clip_(self.Search_range[0], self.Search_range[1]): Clipping the particle positions to the search range.

  72. self.Locate[1:4, :, :, 0] = self.x_start, self.Locate[1:4, :, :, self.NP] = self.y_start, self.Locate[1:4, :, :, self.NP - 1] = self.x_target, self.Locate[1:4, :, :, 2 * self.NP - 1] = self.y_target: Fixing the start and target positions for all particles.

  73. print(f'Iteration:{i + 1}, logMinvalue:{torch.log10(self.Tbest_value)}, TbestValueStd:{np.std(self.Tbest_values_deque)}'): Printing the iteration number, the logarithm of the global best value, and the standard deviation of the best values.

  74. if self.render:: If rendering is enabled.

Sure, let's continue from line 75:

  1. self.canvas.blit(self.map_pyg, self.map_pyg.get_rect()): Blitting the map surface onto the canvas.

  2. for p in range(self.NP - 1):: Iterating over the path segments.

  3. pygame.draw.line(self.canvas, (255, 0, 0), (self.Kinmtc[3, 0, 0, p].item(), self.Kinmtc[3, 0, 0, p + self.NP].item()), (self.Kinmtc[3, 0, 0, p + 1].item(), self.Kinmtc[3, 0, 0, p + self.NP + 1].item()), width=4): Drawing a red line on the canvas for the current path segment of the global best solution.

  4. self.window.blit(self.canvas, self.map_pyg.get_rect()): Blitting the canvas onto the window.

  5. pygame.event.pump() and pygame.display.update(): Updating the display.

  6. self.clock.tick(50): Limiting the frame rate to 50 FPS.

  7. if __name__ == '__main__':: Checking if the script is being run directly.

  8. opt.start = 20, opt.target = 350: Setting the start and target coordinates.

  9. dtpso = DTPSO_Path_Plan(opt): Creating an instance of the DTPSO_Path_Plan class with the provided options.

  10. Priori_X = (opt.start + (opt.target - opt.start) * torch.arange(dtpso.NP, device=device) / (dtpso.NP - 1)): Calculating the prior knowledge (Priori_X) based on the start and target coordinates.

  11. Priori_X = torch.cat((Priori_X, Priori_X)): Duplicating the prior knowledge tensor.

  12. t0 = time.time(): Recording the start time for the overall planning process.

  13. dtpso.Reset(Priori_X, ratio=0.1): Resetting the DTPSO algorithm with the prior knowledge and a ratio of 0.1 for initializing particles.

  14. dtpso.iterate(): Calling the iterate method to perform the path planning process.

  15. print('Time per planning:', time.time() - t0): Printing the total time taken for path planning.

  16. torch.save(dtpso.Kinmtc[3, 0, 0], 'Tbest.pt'): Saving the global best solution to a file.

  17. import draw_Tbest: Importing a module for drawing the global best solution.

This code implements the DTPSO (Discrete Time-driven Particle Swarm Optimization) algorithm for path planning in an environment with obstacles. It initializes particles (representing potential paths) based on prior knowledge and a specified ratio, and then iteratively updates their positions and velocities based on personal best, group best, and global best solutions. The algorithm aims to find an obstacle-free path between the start and target coordinates while minimizing the path length. The progress is visualized using the Pygame library, and the final global best solution is saved to a file.

Sure, here are some potential modifications that could improve the performance of the DTPSO algorithm for path planning:

  1. Adaptive Inertia Weight: Instead of using a fixed inertia weight decay schedule, you could employ an adaptive inertia weight strategy. The inertia weight could be dynamically adjusted based on the current state of the swarm, such as the diversity of particles or the rate of convergence. This could help strike a better balance between exploration and exploitation, leading to faster convergence and better solutions.

  2. Dynamic Neighborhood Topologies: The current implementation uses a fixed neighborhood topology (gbest) for information sharing among particles. However, dynamic neighborhood topologies, where the neighborhood structure changes during the optimization process, could improve the algorithm's performance. This could be achieved by periodically re-evaluating and updating the neighborhoods based on particle fitness or spatial distribution.

  3. Hybrid Approach: You could consider integrating the DTPSO algorithm with other optimization techniques, such as local search methods or evolutionary algorithms, to create a hybrid approach. This could help overcome the limitations of a single algorithm and leverage the strengths of different techniques. For instance, you could employ a local search method to refine the best solutions found by the DTPSO algorithm, potentially leading to better quality paths.

  4. Obstacle Representation: The current implementation represents obstacles as quadrilaterals. However, depending on the application, you could explore alternative obstacle representations, such as polygons with more vertices or even non-convex shapes. This could enable the algorithm to handle more complex obstacle configurations and potentially improve its performance in certain scenarios.

  5. Parallel Computation: If you have access to parallel computing resources (e.g., GPUs or multi-core CPUs), you could explore parallelizing the computations within the DTPSO algorithm. This could involve evaluating particle fitness in parallel, updating particle positions and velocities concurrently, or even running multiple swarms simultaneously and sharing information between them.

  6. Multi-Objective Optimization: In some cases, the path planning problem might involve multiple objectives, such as minimizing path length, avoiding obstacles, and considering other constraints (e.g., energy consumption, terrain characteristics). You could extend the DTPSO algorithm to handle multi-objective optimization, where it tries to find a set of Pareto-optimal solutions that balance the different objectives.

  7. Problem Decomposition: If the search space is particularly large or complex, you could explore decomposing the problem into smaller sub-problems and solving them independently or hierarchically. This could involve dividing the search space into regions or employing a multi-level approach, where the algorithm first finds a coarse path and then refines it in subsequent stages.

  8. Parameter Tuning: While the current implementation uses fixed parameter values (e.g., number of particles, dimensions, truncation window length), you could investigate techniques for automatically tuning these parameters based on the characteristics of the problem instance or the performance of the algorithm. This could involve techniques like meta-heuristics or machine learning-based methods.

It's important to note that the effectiveness of these modifications may depend on the specific characteristics of the path planning problem and the computational resources available. Additionally, some modifications may require significant changes to the algorithm's implementation or the introduction of new techniques, which could increase the overall complexity of the solution.

Sure, I can provide some sample code implementations for the suggestions I made. However, please note that these are just examples, and you may need to adapt them to fit your specific implementation and requirements.

  1. Adaptive Inertia Weight:
def update_inertia_weight(self, iteration):
    # Calculate the inertia weight based on the current iteration
    max_iter = self.Max_iterations
    w_max = 0.9
    w_min = 0.4
    self.w = w_max - (w_max - w_min) * (iteration / max_iter)
    return self.w

In this example, the inertia weight w is updated based on the current iteration number, gradually decreasing from w_max to w_min as the iterations progress.

  1. Dynamic Neighborhood Topologies:
def update_neighborhood_topology(self):
    # Calculate the fitness diversity of the swarm
    fitness_values = torch.tensor([particle.fitness for particle in self.particles])
    fitness_diversity = fitness_values.std()

    # Update the neighborhood topology based on fitness diversity
    if fitness_diversity < self.diversity_threshold:
        self.neighborhood_topology = 'ring'  # Switch to a ring topology for better exploration
    else:
        self.neighborhood_topology = 'gbest'  # Use the global best topology for exploitation

    # Update the neighborhood for each particle
    for particle in self.particles:
        particle.update_neighborhood(self.neighborhood_topology)

In this example, the neighborhood topology is updated based on the fitness diversity of the particles. If the diversity is below a certain threshold, a ring topology is used to encourage exploration. Otherwise, the global best topology is used for exploitation.

  1. Hybrid Approach with Local Search:
def refine_solution_with_local_search(self, solution):
    # Perform local search to refine the solution
    local_search = LocalSearchMethod(solution)
    refined_solution = local_search.optimize()
    return refined_solution

In this example, a local search method is employed to refine the best solution found by the DTPSO algorithm. You would need to implement the LocalSearchMethod class based on the specific local search technique you want to use (e.g., hill climbing, simulated annealing, etc.).

  1. Parallel Computation:
import multiprocessing

def evaluate_particle_fitness(particle):
    # Evaluate the fitness of the particle
    particle.fitness = calculate_fitness(particle.position)
    return particle

def parallel_fitness_evaluation(self):
    pool = multiprocessing.Pool()
    self.particles = pool.map(evaluate_particle_fitness, self.particles)
    pool.close()
    pool.join()

In this example, the multiprocessing module is used to evaluate the fitness of particles in parallel. The evaluate_particle_fitness function calculates the fitness for a single particle, and the parallel_fitness_evaluation method uses a process pool to evaluate the fitness of all particles concurrently.

  1. Multi-Objective Optimization:
def calculate_pareto_front(self):
    # Sort particles based on their fitness values
    self.particles.sort(key=lambda x: x.fitness, reverse=True)

    # Initialize the Pareto front
    pareto_front = []

    # Iterate over the sorted particles
    for particle in self.particles:
        is_dominated = False
        for pareto_particle in pareto_front:
            if particle.dominates(pareto_particle):
                pareto_front.remove(pareto_particle)
            elif pareto_particle.dominates(particle):
                is_dominated = True
                break

        if not is_dominated:
            pareto_front.append(particle)

    return pareto_front

In this example, the calculate_pareto_front method determines the Pareto-optimal solutions based on the fitness values of the particles. The dominates method in the Particle class would need to be implemented to compare particles based on the multiple objectives.

  1. Problem Decomposition:
def decompose_problem(self):
    # Divide the search space into regions
    regions = divide_search_space(self.search_range)

    # Solve each region independently
    solutions = []
    for region in regions:
        sub_problem = SubProblem(region, self.obstacles)
        sub_solution = sub_problem.solve()
        solutions.append(sub_solution)

    # Combine the sub-solutions into a global solution
    global_solution = combine_solutions(solutions)
    return global_solution

In this example, the decompose_problem method divides the search space into regions and solves each region as a sub-problem independently. The divide_search_space function would need to be implemented to divide the search space based on the problem characteristics. The SubProblem class would represent and solve each sub-problem, and the combine_solutions function would combine the sub-solutions into a global solution.

  1. Parameter Tuning:
import optuna

def objective(trial):
    # Define the search space for the hyperparameters
    n_particles = trial.suggest_int('n_particles', 50, 200)
    max_iterations = trial.suggest_int('max_iterations', 50, 500)
    inertia_weight = trial.suggest_uniform('inertia_weight', 0.4, 0.9)

    # Create and run the DTPSO algorithm with the sampled hyperparameters
    dtpso = DTPSO_Path_Plan(n_particles=n_particles, max_iterations=max_iterations, inertia_weight=inertia_weight)
    dtpso.run()

    # Return the objective value (e.g., path length, fitness, etc.)
    return dtpso.best_solution.path_length

# Create an Optuna study and optimize the hyperparameters
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=100)

# Print the best hyperparameters and their corresponding objective value
print('Best hyperparameters: ', study.best_params)
print('Best objective value: ', study.best_value)

In this example, the optuna library is used for hyperparameter tuning. The objective function defines the search space for the hyperparameters (e.g., number of particles, maximum iterations, inertia weight) and creates an instance of the DTPSO algorithm with the sampled hyperparameters. The algorithm is run, and the objective value (e.g., path length, fitness) is returned. The create_study function creates an Optuna study, and the optimize method optimizes the hyperparameters over a specified number of trials.

Please note that these examples are not complete implementations and may require additional code and modifications to work with your existing implementation. Additionally, some of these suggestions may require more substantial changes to the algorithm or the introduction of new techniques, which could increase the overall complexity of the solution.