# SPDX-License-Identifier: MPL-2.0
#
# ODAT-SE -- an open framework for data analysis
# Copyright (C) 2020- The University of Tokyo
#
# This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
# If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
import typing
from typing import List, Set
from os import PathLike
import sys
import itertools
import numpy as np
from odatse import mpi
try:
from tqdm import tqdm
has_tqdm = True
except:
has_tqdm = False
[docs]
class Cells:
"""
A class to represent a grid of cells for spatial partitioning.
"""
cells: List[Set[int]]
dimension: int
mins: np.ndarray
maxs: np.ndarray
Ns: np.ndarray
ncell: int
cellsize: float
[docs]
def __init__(self, mins: np.ndarray, maxs: np.ndarray, cellsize: float):
"""
Initialize the Cells object.
Parameters
----------
mins : np.ndarray
The minimum coordinates of the grid.
maxs : np.ndarray
The maximum coordinates of the grid.
cellsize : float
The size of each cell.
"""
self.dimension = len(mins)
self.mins = mins
Ls = (maxs - mins) * 1.001
self.Ns = np.ceil(Ls / cellsize).astype(np.int64)
self.maxs = self.mins + cellsize * self.Ns
self.cellsize = cellsize
self.ncell = typing.cast(int, np.prod(self.Ns))
self.cells = [set() for _ in range(self.ncell)]
[docs]
def coord2cellindex(self, x: np.ndarray) -> int:
"""
Convert coordinates to a cell index.
Parameters
----------
x : np.ndarray
The coordinates to convert.
Returns
-------
int
The index of the cell.
"""
return self.cellcoord2cellindex(self.coord2cellcoord(x))
[docs]
def coord2cellcoord(self, x: np.ndarray) -> np.ndarray:
"""
Convert coordinates to cell coordinates.
Parameters
----------
x : np.ndarray
The coordinates to convert.
Returns
-------
np.ndarray
The cell coordinates.
"""
return np.floor((x - self.mins) / self.cellsize).astype(np.int64)
[docs]
def cellcoord2cellindex(self, ns: np.ndarray) -> int:
"""
Convert cell coordinates to a cell index.
Parameters
----------
ns : np.ndarray
The cell coordinates to convert.
Returns
-------
int
The index of the cell.
"""
index = 0
oldN = 1
for n, N in zip(ns, self.Ns):
index *= oldN
index += n
oldN = N
return index
[docs]
def cellindex2cellcoord(self, index: int) -> np.ndarray:
"""
Convert a cell index to cell coordinates.
Parameters
----------
index : int
The index of the cell.
Returns
-------
np.ndarray
The cell coordinates.
"""
ns = np.zeros(self.dimension, dtype=np.int64)
for d in range(self.dimension):
d = self.dimension - d - 1
N = self.Ns[d]
ns[d] = index % N
index = index // N
return ns
[docs]
def out_of_bound(self, ns: np.ndarray) -> bool:
"""
Check if cell coordinates are out of bounds.
Parameters
----------
ns : np.ndarray
The cell coordinates to check.
Returns
-------
bool
True if out of bounds, False otherwise.
"""
if np.any(ns < 0):
return True
if np.any(ns >= self.Ns):
return True
return False
[docs]
def neighborcells(self, index: int) -> List[int]:
"""
Get the indices of neighboring cells.
Parameters
----------
index : int
The index of the cell.
Returns
-------
List[int]
The indices of the neighboring cells.
"""
neighbors: List[int] = []
center_coord = self.cellindex2cellcoord(index)
for diff in itertools.product([-1, 0, 1], repeat=self.dimension):
other_coord = center_coord + np.array(diff)
if self.out_of_bound(other_coord):
continue
other_coord_index = self.cellcoord2cellindex(other_coord)
neighbors.append(other_coord_index)
return neighbors
[docs]
def make_neighbor_list_cell(
X: np.ndarray,
radius: float,
allow_selfloop: bool,
show_progress: bool,
comm: mpi.Comm = None,
) -> List[List[int]]:
"""
Create a neighbor list using cell-based spatial partitioning.
Parameters
----------
X : np.ndarray
The coordinates of the points.
radius : float
The radius within which neighbors are considered.
allow_selfloop : bool
Whether to allow self-loops in the neighbor list.
show_progress : bool
Whether to show a progress bar.
comm : mpi.Comm, optional
The MPI communicator.
Returns
-------
List[List[int]]
The neighbor list.
"""
if comm is None:
mpisize = 1
mpirank = 0
else:
mpisize = comm.size
mpirank = comm.rank
mins = typing.cast(np.ndarray, X.min(axis=0))
maxs = typing.cast(np.ndarray, X.max(axis=0))
cells = Cells(mins, maxs, radius * 1.001)
npoints = X.shape[0]
for n in range(npoints):
xs = X[n, :]
index = cells.coord2cellindex(xs)
cells.cells[index].add(n)
points = np.array_split(range(npoints), mpisize)[mpirank]
npoints_local = len(points)
nnlist: List[List[int]] = [[] for _ in range(npoints_local)]
if show_progress and mpirank == 0:
if has_tqdm:
desc = "rank 0" if mpisize > 1 else None
ns = tqdm(points, desc=desc)
else:
print("WARNING: cannot show progress because tqdm package is not available")
ns = points
else:
ns = points
for n in ns:
xs = X[n, :]
cellindex = cells.coord2cellindex(xs)
for neighborcell in cells.neighborcells(cellindex):
for other in cells.cells[neighborcell]:
if (not allow_selfloop) and n == other:
continue
ys = X[other, :]
r = np.linalg.norm(xs - ys)
if r <= radius:
nnlist[n - points[0]].append(other)
if mpisize > 1:
nnlist = list(itertools.chain.from_iterable(comm.allgather(nnlist)))
return nnlist
[docs]
def make_neighbor_list_naive(
X: np.ndarray,
radius: float,
allow_selfloop: bool,
show_progress: bool,
comm: mpi.Comm = None,
) -> List[List[int]]:
"""
Create a neighbor list using a naive all-pairs approach.
Parameters
----------
X : np.ndarray)
The coordinates of the points.
radius : float
The radius within which neighbors are considered.
allow_selfloop : bool
Whether to allow self-loops in the neighbor list.
show_progress : bool
Whether to show a progress bar.
comm : mpi.Comm, optional
The MPI communicator.
Returns
-------
List[List[int]]
The neighbor list.
"""
if comm is None:
mpisize = 1
mpirank = 0
else:
mpisize = comm.size
mpirank = comm.rank
npoints = X.shape[0]
points = np.array_split(range(npoints), mpisize)[mpirank]
npoints_local = len(points)
nnlist: List[List[int]] = [[] for _ in range(npoints_local)]
if show_progress and mpirank == 0:
if has_tqdm:
desc = "rank 0" if mpisize > 1 else None
ns = tqdm(points, desc=desc)
else:
print("WARNING: cannot show progress because tqdm package is not available")
ns = points
else:
ns = points
for n in ns:
xs = X[n, :]
for m in range(npoints):
if (not allow_selfloop) and n == m:
continue
ys = X[m, :]
r = np.linalg.norm(xs - ys)
if r <= radius:
nnlist[n - points[0]].append(m)
if mpisize > 1:
nnlist = list(itertools.chain.from_iterable(comm.allgather(nnlist)))
return nnlist
[docs]
def make_neighbor_list(
X: np.ndarray,
radius: float,
allow_selfloop: bool = False,
check_allpairs: bool = False,
show_progress: bool = False,
comm: mpi.Comm = None,
) -> List[List[int]]:
"""
Create a neighbor list for given points.
Parameters
----------
X : np.ndarray
The coordinates of the points.
radius : float
The radius within which neighbors are considered.
allow_selfloop : bool
Whether to allow self-loops in the neighbor list.
check_allpairs : bool
Whether to use the naive all-pairs approach.
show_progress : bool
Whether to show a progress bar.
comm : mpi.Comm, optional
The MPI communicator.
Returns
-------
List[List[int]]
The neighbor list.
"""
if check_allpairs:
return make_neighbor_list_naive(
X,
radius,
allow_selfloop=allow_selfloop,
show_progress=show_progress,
comm=comm,
)
else:
return make_neighbor_list_cell(
X,
radius,
allow_selfloop=allow_selfloop,
show_progress=show_progress,
comm=comm,
)
[docs]
def load_neighbor_list(filename: PathLike, nnodes: int = None) -> List[List[int]]:
"""
Load a neighbor list from a file.
Parameters
----------
filename : PathLike
The path to the file containing the neighbor list.
nnodes : int, optional
The number of nodes. If None, it will be determined from the file.
Returns
-------
List[List[int]]
The neighbor list.
"""
if nnodes is None:
nnodes = 0
with open(filename) as f:
for line in f:
line = line.split("#")[0].strip()
if len(line) == 0:
continue
nnodes += 1
neighbor_list: List[List[int]] = [[] for _ in range(nnodes)]
with open(filename) as f:
for line in f:
line = line.strip().split("#")[0]
if len(line) == 0:
continue
words = line.split()
i = int(words[0])
nn = [int(w) for w in words[1:]]
neighbor_list[i] = nn
return neighbor_list
[docs]
def write_neighbor_list(
filename: str,
nnlist: List[List[int]],
radius: float = None,
unit: np.ndarray = None,
):
"""
Write the neighbor list to a file.
Parameters
----------
filename : str
The path to the output file.
nnlist : List[List[int]]
The neighbor list to write.
radius : float, optional
The neighborhood radius. Defaults to None.
unit : np.ndarray, optional
The unit for each coordinate. Defaults to None.
"""
with open(filename, "w") as f:
if radius is not None:
f.write(f"# radius = {radius}\n")
if unit is not None:
f.write(f"# unit =")
for u in unit:
f.write(f" {u}")
f.write("\n")
for i, nn in enumerate(nnlist):
f.write(str(i))
for o in sorted(nn):
f.write(f" {o}")
f.write("\n")
def main():
import argparse
parser = argparse.ArgumentParser(
description="Make neighbor-list file from mesh-data file",
epilog="""
Note:
- The first column of an input file will be ignored.
- UNIT is used for changing aspect ratio (a kind of normalization)
- Each coodinate will be divided by the corresponding unit
- UNIT is given as a string separated with white space
- For example, -u "1.0 0.5 1.0" for 3D case
- tqdm python package is required to show progress bar
""",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("input", type=str, help="input file")
parser.add_argument(
"-o", "--output", type=str, default="neighborlist.txt", help="output file"
)
parser.add_argument(
"-r", "--radius", type=float, default=1.0, help="neighborhood radius"
)
parser.add_argument(
"-u",
"--unit",
default=None,
help="length unit for each coordinate (see Note)",
)
parser.add_argument(
"-q", "--quiet", action="store_true", help="Do not show progress bar"
)
parser.add_argument("--allow-selfloop", action="store_true", help="allow self loop")
parser.add_argument(
"--check-allpairs",
action="store_true",
help="check all pairs (bruteforce algorithm)",
)
args = parser.parse_args()
inputfile = args.input
outputfile = args.output
radius = args.radius
X = np.zeros((0, 0))
if mpi.rank() == 0:
X = np.loadtxt(inputfile)
if mpi.size() > 1:
sh = mpi.comm().bcast(X.shape, root=0)
if mpi.rank() != 0:
X = np.zeros(sh)
mpi.comm().Bcast(X, root=0)
D = X.shape[1] - 1
if args.unit is None:
unit = np.ones(D, dtype=float)
else:
unit = np.array([float(w) for w in args.unit.split()])
if len(unit) != D:
print(f"--unit option expects {D} floats as a string but {len(unit)} given")
sys.exit(1)
X = X[:, 1:] / unit
nnlist = make_neighbor_list(
X,
radius,
allow_selfloop=args.allow_selfloop,
check_allpairs=args.check_allpairs,
show_progress=(not args.quiet),
comm=mpi.comm(),
)
write_neighbor_list(outputfile, nnlist, radius=radius, unit=unit)