Source code for odatse.util.neighborlist

# SPDX-License-Identifier: MPL-2.0
#
# ODAT-SE -- an open framework for data analysis
# Copyright (C) 2020- The University of Tokyo
#
# This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
# If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.

import typing
from typing import List, Set
from os import PathLike

import sys
import itertools

import numpy as np

from odatse import mpi

try:
    from tqdm import tqdm

    has_tqdm = True
except:
    has_tqdm = False


[docs] class Cells: """ A class to represent a grid of cells for spatial partitioning. This class divides a spatial region into a grid of cells to efficiently find neighboring points within a specified radius. Each cell contains a set of point indices that fall within its spatial boundaries. Attributes ---------- cells : List[Set[int]] List of sets, where each set contains indices of points in that cell. dimension : int Number of spatial dimensions. mins : np.ndarray Minimum coordinates of the grid in each dimension. maxs : np.ndarray Maximum coordinates of the grid in each dimension. Ns : np.ndarray Number of cells in each dimension. ncell : int Total number of cells in the grid. cellsize : float Size of each cell. Examples -------- >>> mins = np.array([0.0, 0.0, 0.0]) >>> maxs = np.array([10.0, 10.0, 10.0]) >>> cells = Cells(mins, maxs, cellsize=2.0) >>> point_index = 0 >>> point_coords = np.array([1.5, 3.2, 4.7]) >>> cell_index = cells.coord2cellindex(point_coords) >>> cells.cells[cell_index].add(point_index) """ cells: List[Set[int]] dimension: int mins: np.ndarray maxs: np.ndarray Ns: np.ndarray ncell: int cellsize: float
[docs] def __init__(self, mins: np.ndarray, maxs: np.ndarray, cellsize: float) -> None: """ Initialize the Cells object. Parameters ---------- mins : np.ndarray The minimum coordinates of the grid in each dimension. maxs : np.ndarray The maximum coordinates of the grid in each dimension. cellsize : float The size of each cell. Returns ------- None This method initializes the object's attributes. Examples -------- >>> mins = np.array([0.0, 0.0, 0.0]) >>> maxs = np.array([10.0, 10.0, 10.0]) >>> cells = Cells(mins, maxs, cellsize=2.0) """ self.dimension = len(mins) self.mins = mins Ls = (maxs - mins) * 1.001 self.Ns = np.ceil(Ls / cellsize).astype(np.int64) self.maxs = self.mins + cellsize * self.Ns self.cellsize = cellsize self.ncell = typing.cast(int, np.prod(self.Ns)) self.cells = [set() for _ in range(self.ncell)]
[docs] def coord2cellindex(self, x: np.ndarray) -> int: """ Convert spatial coordinates to a cell index. This method transforms a point's spatial coordinates into the index of the cell containing it, by first converting to cell coordinates and then to a cell index. Parameters ---------- x : np.ndarray The spatial coordinates of a point. Returns ------- int The index of the cell containing the point. Examples -------- >>> cells = Cells(np.array([0, 0]), np.array([10, 10]), 2.0) >>> cells.coord2cellindex(np.array([3.5, 4.2])) 12 """ return self.cellcoord2cellindex(self.coord2cellcoord(x))
[docs] def coord2cellcoord(self, x: np.ndarray) -> np.ndarray: """ Convert spatial coordinates to cell coordinates. Cell coordinates are integer indices that identify the position of a cell within the grid along each dimension. Parameters ---------- x : np.ndarray The spatial coordinates of a point. Returns ------- np.ndarray The cell coordinates (integer indices for each dimension). Examples -------- >>> cells = Cells(np.array([0, 0]), np.array([10, 10]), 2.0) >>> cells.coord2cellcoord(np.array([3.5, 4.2])) array([1, 2]) """ return np.floor((x - self.mins) / self.cellsize).astype(np.int64)
[docs] def cellcoord2cellindex(self, ns: np.ndarray) -> int: """ Convert cell coordinates to a cell index. This method converts multi-dimensional cell coordinates to a single index that uniquely identifies the cell in the flat cells list. Parameters ---------- ns : np.ndarray The cell coordinates (integer indices for each dimension). Returns ------- int The index of the cell in the flat cells list. Examples -------- >>> cells = Cells(np.array([0, 0]), np.array([10, 10]), 2.0) >>> cells.cellcoord2cellindex(np.array([1, 2])) 12 """ index = 0 oldN = 1 for n, N in zip(ns, self.Ns): index *= oldN index += n oldN = N return index
[docs] def cellindex2cellcoord(self, index: int) -> np.ndarray: """ Convert a cell index to cell coordinates. This method is the inverse of cellcoord2cellindex and converts a flat cell index back to multi-dimensional cell coordinates. Parameters ---------- index : int The index of the cell in the flat cells list. Returns ------- np.ndarray The cell coordinates (integer indices for each dimension). Examples -------- >>> cells = Cells(np.array([0, 0]), np.array([10, 10]), 2.0) >>> cells.cellindex2cellcoord(12) array([1, 2]) """ ns = np.zeros(self.dimension, dtype=np.int64) for d in range(self.dimension): d = self.dimension - d - 1 N = self.Ns[d] ns[d] = index % N index = index // N return ns
[docs] def out_of_bound(self, ns: np.ndarray) -> bool: """ Check if cell coordinates are out of bounds. This method verifies whether the given cell coordinates are within the valid range of the grid. Parameters ---------- ns : np.ndarray The cell coordinates to check. Returns ------- bool True if the coordinates are out of bounds, False otherwise. Examples -------- >>> cells = Cells(np.array([0, 0]), np.array([10, 10]), 2.0) >>> cells.out_of_bound(np.array([-1, 2])) True >>> cells.out_of_bound(np.array([3, 4])) False """ if np.any(ns < 0): return True if np.any(ns >= self.Ns): return True return False
[docs] def neighborcells(self, index: int) -> List[int]: """ Get the indices of neighboring cells, including the cell itself. This method returns the indices of all cells that are adjacent to the specified cell in all dimensions, along with the cell itself. Parameters ---------- index : int The index of the cell. Returns ------- List[int] The indices of the neighboring cells (including the cell itself). Examples -------- >>> cells = Cells(np.array([0, 0]), np.array([10, 10]), 2.0) >>> neighbors = cells.neighborcells(12) >>> len(neighbors) # 3x3 neighborhood in 2D 9 """ neighbors: List[int] = [] center_coord = self.cellindex2cellcoord(index) for diff in itertools.product([-1, 0, 1], repeat=self.dimension): other_coord = center_coord + np.array(diff) if self.out_of_bound(other_coord): continue other_coord_index = self.cellcoord2cellindex(other_coord) neighbors.append(other_coord_index) return neighbors
[docs] def make_neighbor_list_cell( X: np.ndarray, radius: float, allow_selfloop: bool, show_progress: bool, comm: mpi.Comm = None, ) -> List[List[int]]: """ Create a neighbor list using cell-based spatial partitioning. This function uses the Cells class to efficiently find neighboring points within the specified radius by only considering points in adjacent cells. Parameters ---------- X : np.ndarray The coordinates of the points (N x D array where N is the number of points and D is the dimensionality). radius : float The radius within which points are considered neighbors. allow_selfloop : bool Whether to allow a point to be its own neighbor. show_progress : bool Whether to show a progress bar during computation. comm : mpi.Comm, optional The MPI communicator for parallel processing, by default None. Returns ------- List[List[int]] A list of lists, where each inner list contains the indices of neighboring points for a given point. Examples -------- >>> coords = np.array([[0.0, 0.0], [1.0, 0.0], [0.0, 1.0], [5.0, 5.0]]) >>> neighbor_list = make_neighbor_list_cell(coords, radius=1.5, ... allow_selfloop=False, ... show_progress=False) >>> neighbor_list [[1, 2], [0, 2], [0, 1], []] """ if comm is None: mpisize = 1 mpirank = 0 else: mpisize = comm.size mpirank = comm.rank mins = typing.cast(np.ndarray, X.min(axis=0)) maxs = typing.cast(np.ndarray, X.max(axis=0)) cells = Cells(mins, maxs, radius * 1.001) npoints = X.shape[0] for n in range(npoints): xs = X[n, :] index = cells.coord2cellindex(xs) cells.cells[index].add(n) points = np.array_split(range(npoints), mpisize)[mpirank] npoints_local = len(points) nnlist: List[List[int]] = [[] for _ in range(npoints_local)] if mpirank == 0 and show_progress and has_tqdm: desc = "rank 0" if mpisize > 1 else None ns = tqdm(points, desc=desc) else: ns = points for n in ns: xs = X[n, :] cellindex = cells.coord2cellindex(xs) for neighborcell in cells.neighborcells(cellindex): for other in cells.cells[neighborcell]: if (not allow_selfloop) and n == other: continue ys = X[other, :] r = np.linalg.norm(xs - ys) if r <= radius: nnlist[n - points[0]].append(other) if mpisize > 1: nnlist = list(itertools.chain.from_iterable(comm.allgather(nnlist))) nnlist = [sorted(nn) for nn in nnlist] return nnlist
[docs] def make_neighbor_list_naive( X: np.ndarray, radius: float, allow_selfloop: bool, show_progress: bool, comm: mpi.Comm = None, ) -> List[List[int]]: """ Create a neighbor list using a naive all-pairs approach. This function computes the distance between all pairs of points to find neighbors within the specified radius. This is less efficient than the cell-based approach for large point sets but may be more straightforward. Parameters ---------- X : np.ndarray The coordinates of the points (N x D array where N is the number of points and D is the dimensionality). radius : float The radius within which points are considered neighbors. allow_selfloop : bool Whether to allow a point to be its own neighbor. show_progress : bool Whether to show a progress bar during computation. comm : mpi.Comm, optional The MPI communicator for parallel processing, by default None. Returns ------- List[List[int]] A list of lists, where each inner list contains the indices of neighboring points for a given point. Examples -------- >>> coords = np.array([[0.0, 0.0], [1.0, 0.0], [0.0, 1.0], [5.0, 5.0]]) >>> neighbor_list = make_neighbor_list_naive(coords, radius=1.5, ... allow_selfloop=False, ... show_progress=False) >>> neighbor_list [[1, 2], [0, 2], [0, 1], []] """ if comm is None: mpisize = 1 mpirank = 0 else: mpisize = comm.size mpirank = comm.rank npoints = X.shape[0] points = np.array_split(range(npoints), mpisize)[mpirank] npoints_local = len(points) nnlist: List[List[int]] = [[] for _ in range(npoints_local)] if mpirank == 0 and show_progress and has_tqdm: desc = "rank 0" if mpisize > 1 else None ns = tqdm(points, desc=desc) else: ns = points for n in ns: xs = X[n, :] for m in range(npoints): if (not allow_selfloop) and n == m: continue ys = X[m, :] r = np.linalg.norm(xs - ys) if r <= radius: nnlist[n - points[0]].append(m) if mpisize > 1: nnlist = list(itertools.chain.from_iterable(comm.allgather(nnlist))) nnlist = [sorted(nn) for nn in nnlist] return nnlist
[docs] def make_neighbor_list( X: np.ndarray, radius: float, allow_selfloop: bool = False, check_allpairs: bool = False, show_progress: bool = False, comm: mpi.Comm = None, ) -> List[List[int]]: """ Create a neighbor list for given points. This function serves as a unified interface to create neighbor lists, choosing between cell-based or naive implementation based on the parameters. Parameters ---------- X : np.ndarray The coordinates of the points (N x D array where N is the number of points and D is the dimensionality). radius : float The radius within which points are considered neighbors. allow_selfloop : bool, optional Whether to allow a point to be its own neighbor, by default False. check_allpairs : bool, optional Whether to use the naive all-pairs approach instead of the cell-based one, by default False. show_progress : bool, optional Whether to show a progress bar during computation, by default False. comm : mpi.Comm, optional The MPI communicator for parallel processing, by default None. Returns ------- List[List[int]] A list of lists, where each inner list contains the indices of neighboring points for a given point. Examples -------- >>> coords = np.array([[0.0, 0.0], [1.0, 0.0], [0.0, 1.0], [5.0, 5.0]]) >>> neighbor_list = make_neighbor_list(coords, radius=1.5) >>> neighbor_list [[1, 2], [0, 2], [0, 1], []] >>> # Force all-pairs algorithm >>> neighbor_list = make_neighbor_list(coords, radius=1.5, check_allpairs=True) >>> neighbor_list [[1, 2], [0, 2], [0, 1], []] """ if check_allpairs: return make_neighbor_list_naive( X, radius, allow_selfloop=allow_selfloop, show_progress=show_progress, comm=comm, ) else: return make_neighbor_list_cell( X, radius, allow_selfloop=allow_selfloop, show_progress=show_progress, comm=comm, )
[docs] def load_neighbor_list(filename: PathLike, nnodes: int = None) -> List[List[int]]: """ Load a neighbor list from a file. The file format is expected to have one line per node, with the first number being the node index and subsequent numbers being its neighbors. Parameters ---------- filename : PathLike The path to the file containing the neighbor list. nnodes : int, optional The number of nodes. If None, it will be determined from the file by counting the number of non-empty lines. Returns ------- List[List[int]] The neighbor list loaded from the file. Examples -------- >>> # Example file content: >>> # 0 1 2 >>> # 1 0 2 >>> # 2 0 1 >>> neighbor_list = load_neighbor_list("neighborlist.txt") >>> neighbor_list [[1, 2], [0, 2], [0, 1]] """ if nnodes is None: nnodes = 0 with open(filename) as f: for line in f: line = line.split("#")[0].strip() if len(line) == 0: continue nnodes += 1 neighbor_list: List[List[int]] = [[] for _ in range(nnodes)] with open(filename) as f: for line in f: line = line.strip().split("#")[0] if len(line) == 0: continue words = line.split() i = int(words[0]) nn = [int(w) for w in words[1:]] neighbor_list[i] = nn return neighbor_list
[docs] def write_neighbor_list( filename: str, nnlist: List[List[int]], radius: float = None, unit: np.ndarray = None, ) -> None: """ Write the neighbor list to a file. The file format has one line per node, with the first number being the node index and subsequent numbers being its neighbors. Optional metadata can be included as comments at the beginning of the file. Parameters ---------- filename : str The path to the output file. nnlist : List[List[int]] The neighbor list to write. radius : float, optional The neighborhood radius, written as a comment in the file, by default None. unit : np.ndarray, optional The unit lengths for each coordinate dimension, written as a comment in the file, by default None. Returns ------- None This function writes the neighbor list to a file. Examples -------- >>> neighbor_list = [[1, 2], [0, 2], [0, 1]] >>> write_neighbor_list("neighborlist.txt", neighbor_list, radius=1.5, ... unit=np.array([1.0, 1.0, 1.0])) """ with open(filename, "w") as f: if radius is not None: f.write(f"# radius = {radius}\n") if unit is not None: f.write(f"# unit =") for u in unit: f.write(f" {u}") f.write("\n") for idx, nn in enumerate(nnlist): f.write(" ".join(map(str, [idx, *nn])) + "\n")
[docs] def main() -> None: """ Command-line utility for creating neighbor lists from mesh data files. This function parses command-line arguments and creates a neighbor list file from a given input mesh file. Returns ------- None This function is the main entry point for the command-line utility. """ import argparse parser = argparse.ArgumentParser( description="Make neighbor-list file from mesh-data file", epilog=""" Note: - The first column of an input file will be ignored. - UNIT is used for changing aspect ratio (a kind of normalization) - Each coodinate will be divided by the corresponding unit - UNIT is given as a string separated with white space - For example, -u "1.0 0.5 1.0" for 3D case - tqdm python package is required to show progress bar """, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("input", type=str, help="input file") parser.add_argument( "-o", "--output", type=str, default="neighborlist.txt", help="output file" ) parser.add_argument( "-r", "--radius", type=float, default=1.0, help="neighborhood radius" ) parser.add_argument( "-u", "--unit", default=None, help="length unit for each coordinate (see Note)", ) parser.add_argument( "-q", "--quiet", action="store_true", help="Do not show progress bar" ) parser.add_argument( "--progress", action="store_true", help="show progress bar" ) parser.add_argument("--allow-selfloop", action="store_true", help="allow self loop") parser.add_argument( "--check-allpairs", action="store_true", help="check all pairs (bruteforce algorithm)", ) args = parser.parse_args() inputfile = args.input outputfile = args.output radius = args.radius if (args.progress or not args.quiet) and not has_tqdm: print("WARNING: cannot show progress because tqdm package is not available") X = np.zeros((0, 0)) if mpi.rank() == 0: X = np.loadtxt(inputfile) if mpi.size() > 1: sh = mpi.comm().bcast(X.shape, root=0) if mpi.rank() != 0: X = np.zeros(sh) mpi.comm().Bcast(X, root=0) D = X.shape[1] - 1 if args.unit is None: unit = np.ones(D, dtype=float) else: unit = np.array([float(w) for w in args.unit.split()]) if len(unit) != D: print(f"--unit option expects {D} floats as a string but {len(unit)} given") sys.exit(1) X = X[:, 1:] / unit nnlist = make_neighbor_list( X, radius, allow_selfloop=args.allow_selfloop, check_allpairs=args.check_allpairs, show_progress=(args.progress or not args.quiet), comm=mpi.comm(), ) write_neighbor_list(outputfile, nnlist, radius=radius, unit=unit)