"""Random spherical cluster generation and growth algorithms.
This module provides functions for generating initial atomic cluster structures
through random placement in spherical volumes and convex-hull-guided growth
strategies that operate on the current cluster geometry.
"""
from __future__ import annotations
from collections import Counter, defaultdict
import numpy as np
from ase import Atom, Atoms
from ase.data import atomic_masses, atomic_numbers, covalent_radii
from scipy.spatial import KDTree
from scgo.utils.helpers import get_composition_counts
from scgo.utils.logging import get_logger
from .atomic_radii import cluster_passes_ga_blmin, resolve_steric_floor
from .geometry_helpers import (
_check_composition_feasibility,
_generate_batch_positions_on_convex_hull,
_verify_exact_composition,
analyze_disconnection,
compute_bond_distance_params,
format_placement_error_message,
get_covalent_radius,
get_structure_diagnostics,
is_cluster_connected,
validate_cluster,
)
from .initialization_config import (
BLMIN_RATIO_DEFAULT,
CONNECTIVITY_FACTOR,
GROWTH_ORDER_STRATEGY_COUNT,
KDTREE_THRESHOLD,
MASS_FIRST_PLACEMENT_PROB,
MAX_CONNECTIVITY_RETRIES,
MAX_CONSECUTIVE_FAILURES,
MAX_PLACEMENT_ATTEMPTS_PER_ATOM,
MIN_DISTANCE_FACTOR_DEFAULT,
MIN_DISTANCE_THRESHOLD_HIGH,
MIN_DISTANCE_THRESHOLD_LOW,
PLACEMENT_RADIUS_SCALING_DEFAULT,
PLACEMENT_RELAXATION_FACTOR,
)
logger = get_logger(__name__)
# Growth order strategy implementations
def _group_atoms_by_element(atoms_to_add: list[str]) -> dict[str, list[str]]:
"""Group atoms by element symbol.
Args:
atoms_to_add: List of element symbols to group
Returns:
Dictionary mapping element symbol to list of occurrences
"""
element_groups: dict[str, list[str]] = {}
for atom in atoms_to_add:
if atom not in element_groups:
element_groups[atom] = []
element_groups[atom].append(atom)
return element_groups
def _growth_order_random(
atoms_to_add: list[str], rng: np.random.Generator
) -> list[str]:
"""Strategy 0: Keep random order (already shuffled).
Args:
atoms_to_add: List of element symbols to add
rng: Random number generator (unused, kept for API consistency)
Returns:
List of element symbols in original random order
"""
return atoms_to_add
def _growth_order_by_element(
atoms_to_add: list[str], rng: np.random.Generator
) -> list[str]:
"""Strategy 1: Group by element, add all of one element first.
Args:
atoms_to_add: List of element symbols to add
rng: Random number generator for shuffling element order
Returns:
List of element symbols grouped by element type
"""
element_groups = _group_atoms_by_element(atoms_to_add)
element_order = list(element_groups.keys())
rng.shuffle(element_order)
return [atom for element in element_order for atom in element_groups[element]]
def _growth_order_alternating(
atoms_to_add: list[str], rng: np.random.Generator
) -> list[str]:
"""Strategy 2: Interleave elements to promote mixing.
Args:
atoms_to_add: List of element symbols to add
rng: Random number generator (unused, kept for API consistency)
Returns:
List of element symbols with interleaved element types
"""
element_groups = _group_atoms_by_element(atoms_to_add)
max_len = max(len(group) for group in element_groups.values())
result = [
element_groups[element][i]
for i in range(max_len)
for element in element_groups
if i < len(element_groups[element])
]
return result
def _atomic_mass(symbol: str) -> float:
return float(atomic_masses[atomic_numbers[symbol]])
def _growth_order_by_mass(
atoms_to_add: list[str], rng: np.random.Generator
) -> list[str]:
"""Place heavier element groups first; shuffle within each element group.
All randomness is drawn from ``rng`` so callers with paired generators
get identical placement orders for the same seed.
"""
element_groups = _group_atoms_by_element(atoms_to_add)
if len(element_groups) <= 1:
shuffled = list(atoms_to_add)
rng.shuffle(shuffled)
return shuffled
mass_by_element = {element: _atomic_mass(element) for element in element_groups}
mass_tiers = sorted(set(mass_by_element.values()), reverse=True)
result: list[str] = []
for mass in mass_tiers:
tier_elements = [
element for element in element_groups if mass_by_element[element] == mass
]
rng.shuffle(tier_elements)
for element in tier_elements:
group = list(element_groups[element])
rng.shuffle(group)
result.extend(group)
return result
def _sample_atoms_to_add_order(
atoms_to_add: list[str],
rng: np.random.Generator,
*,
base_composition: list[str] | None = None,
target_composition: list[str] | None = None,
) -> list[str]:
"""Sample a placement order: mass-biased by default, exploratory otherwise.
Consumes randomness only from ``rng``. Each call advances the generator
state, so paired RNGs with the same seed yield identical orders.
"""
if rng.random() < MASS_FIRST_PLACEMENT_PROB:
return _growth_order_by_mass(atoms_to_add, rng)
strategy = int(rng.integers(0, GROWTH_ORDER_STRATEGY_COUNT))
return _apply_growth_order_strategy(
atoms_to_add,
strategy,
rng,
base_composition=base_composition,
target_composition=target_composition,
)
def _growth_order_by_size(
atoms_to_add: list[str], rng: np.random.Generator
) -> list[str]:
"""Strategy 3: Sort by atomic size (covalent radius).
Args:
atoms_to_add: List of element symbols to add
rng: Random number generator for selecting sort direction
Returns:
List of element symbols sorted by covalent radius
"""
size_direction = rng.integers(0, 2) # 0=larger first, 1=smaller first
atoms_with_sizes = [(atom, get_covalent_radius(atom)) for atom in atoms_to_add]
atoms_with_sizes.sort(key=lambda x: x[1], reverse=(size_direction == 0))
return [atom for atom, _ in atoms_with_sizes]
def _growth_order_element_clustering(
atoms_to_add: list[str], rng: np.random.Generator
) -> list[str]:
"""Strategy 4: Maximize or minimize element clustering.
Args:
atoms_to_add: List of element symbols to add
rng: Random number generator for selecting clustering preference
Returns:
List of element symbols ordered to maximize or minimize clustering
"""
element_counts = get_composition_counts(atoms_to_add)
if len(element_counts) <= 1:
# Single element - keep random order
return atoms_to_add
cluster_preference = rng.integers(0, 2) # 0=maximize clustering, 1=minimize
element_groups = _group_atoms_by_element(atoms_to_add)
if cluster_preference == 0:
# Maximize clustering: add all of one element, then all of next
element_order = sorted(
element_groups.keys(),
key=lambda e: len(element_groups[e]),
reverse=True,
)
return [atom for element in element_order for atom in element_groups[element]]
else:
# Minimize clustering: maximize interleaving
max_len = max(len(group) for group in element_groups.values())
result: list[str] = []
for i in range(max_len):
# Shuffle element order each iteration for more dispersion
element_order = list(element_groups.keys())
rng.shuffle(element_order)
result.extend(
element_groups[element][i]
for element in element_order
if i < len(element_groups[element])
)
return result
def _growth_order_by_composition_balance(
atoms_to_add: list[str],
base_composition: list[str],
target_composition: list[str],
rng: np.random.Generator,
) -> list[str]:
"""Strategy 5: Order growth to maintain target composition ratios.
Calculates composition deficit for each element and prioritizes
adding atoms that bring composition closer to target ratios.
This helps maintain balanced composition throughout growth.
Args:
atoms_to_add: List of element symbols to add
base_composition: Current composition of the seed/cluster
target_composition: Target composition to achieve
rng: Random number generator
Returns:
Reordered list of atoms to add, prioritizing elements with largest deficits
"""
if not base_composition or not target_composition:
# Fallback to random if compositions are missing
return atoms_to_add
base_counts = get_composition_counts(base_composition)
target_counts = get_composition_counts(target_composition)
# Calculate current composition (base + what we're adding)
current_counts = base_counts.copy()
for atom in atoms_to_add:
current_counts[atom] = current_counts.get(atom, 0) + 1
# Calculate deficit for each element (how far from target ratio)
deficits = {}
all_elements = set(target_composition) | set(atoms_to_add)
for elem in all_elements:
target_count = target_counts.get(elem, 0)
current_count = current_counts.get(elem, 0)
deficit = target_count - current_count
deficits[elem] = deficit
atoms_with_deficit = [(atom, deficits.get(atom, 0)) for atom in atoms_to_add]
atoms_with_deficit.sort(key=lambda x: x[1], reverse=True)
# Group atoms by deficit and shuffle within each group for diversity
deficit_groups = defaultdict(list)
for atom, deficit in atoms_with_deficit:
deficit_groups[deficit].append(atom)
# Sort by deficit (descending) and shuffle within each group
result = []
for deficit in sorted(deficit_groups.keys(), reverse=True):
group = deficit_groups[deficit]
rng.shuffle(group)
result.extend(group)
return result
# Growth order strategy dispatch table
_GROWTH_ORDER_STRATEGIES = {
0: _growth_order_random,
1: _growth_order_by_element,
2: _growth_order_alternating,
3: _growth_order_by_size,
4: _growth_order_element_clustering,
5: _growth_order_by_composition_balance,
}
def _compute_effective_placement_params(
attempt_ratio: float,
min_distance_factor: float,
placement_radius_scaling: float,
*,
steric_floor: float | None = None,
) -> tuple[float, float]:
"""Compute effective placement parameters with progressive relaxation.
Args:
attempt_ratio: Ratio of current attempt to max attempts (0.0 to 1.0)
min_distance_factor: Base minimum distance factor
placement_radius_scaling: Base placement radius scaling
steric_floor: Never relax clash checks below this value (e.g. GA blmin).
Returns:
Tuple of (effective_scaling, effective_min_distance)
"""
scale_growth = min(1.0, min_distance_factor)
if placement_radius_scaling < PLACEMENT_RELAXATION_FACTOR:
effective_scaling = placement_radius_scaling
else:
effective_scaling = placement_radius_scaling * (
1.0 + scale_growth * attempt_ratio
)
relaxed_factor = min_distance_factor * (
1.0 - PLACEMENT_RELAXATION_FACTOR * attempt_ratio
)
if steric_floor is not None:
absolute_floor = steric_floor
elif min_distance_factor >= MIN_DISTANCE_THRESHOLD_LOW:
absolute_floor = MIN_DISTANCE_THRESHOLD_LOW
else:
absolute_floor = 0.0
if min_distance_factor >= MIN_DISTANCE_THRESHOLD_HIGH:
effective_min_distance = min_distance_factor
else:
effective_min_distance = max(absolute_floor, relaxed_factor)
return effective_scaling, effective_min_distance
def _raise_if_connectivity_below_steric_floor(
connectivity_factor: float,
steric_floor: float,
) -> None:
"""Reject parameter combinations that cannot yield a connected, clash-free cluster."""
if connectivity_factor + 1e-9 < steric_floor:
raise ValueError(
f"connectivity_factor ({connectivity_factor}) must be >= steric floor "
f"({steric_floor}) when GA blmin enforcement is active: bonded pairs "
f"need distance <= connectivity_factor*(r_i+r_j) and >= "
f"steric_floor*(r_i+r_j). Increase connectivity_factor or pass "
f"blmin_ratio=None to disable the GA steric floor."
)
def _effective_placement_scaling_for_retry(
placement_radius_scaling: float,
retry_attempt: int,
max_retries: int,
*,
steric_floor: float,
) -> float:
"""Widen the placement shell on outer retries using the inner-attempt curve."""
retry_ratio = retry_attempt / max(1, max_retries - 1)
effective_scaling, _ = _compute_effective_placement_params(
retry_ratio,
steric_floor,
placement_radius_scaling,
steric_floor=steric_floor,
)
return effective_scaling
def _sample_connectivity_anchored_position(
anchor_pos: np.ndarray,
bond_distance: float,
rng: np.random.Generator,
) -> np.ndarray:
"""Return a uniform random point on the sphere of radius ``bond_distance``."""
direction = rng.standard_normal(3)
direction /= np.linalg.norm(direction)
return anchor_pos + direction * bond_distance
def _apply_growth_order_strategy(
atoms_to_add: list[str],
strategy: int,
rng: np.random.Generator,
base_composition: list[str] | None = None,
target_composition: list[str] | None = None,
) -> list[str]:
"""Apply a growth order strategy to atoms_to_add list.
Args:
atoms_to_add: List of element symbols to add
strategy: Strategy index (0-5)
rng: Random number generator
base_composition: Current composition (required for strategy 5)
target_composition: Target composition (required for strategy 5)
Returns:
Reordered list of atoms to add
"""
strategy_func = _GROWTH_ORDER_STRATEGIES.get(strategy, _growth_order_random)
# Strategy 5 requires base and target composition
if strategy == 5:
if base_composition is None or target_composition is None:
# Fallback to random if compositions not provided
return _growth_order_random(atoms_to_add, rng)
return strategy_func(atoms_to_add, base_composition, target_composition, rng)
# Other strategies only need atoms_to_add and rng
return strategy_func(atoms_to_add, rng)
[docs]
def random_spherical(
composition: list[str],
cell_side: float,
rng: np.random.Generator,
placement_radius_scaling: float = PLACEMENT_RADIUS_SCALING_DEFAULT,
min_distance_factor: float = MIN_DISTANCE_FACTOR_DEFAULT,
connectivity_factor: float = CONNECTIVITY_FACTOR,
max_connectivity_retries: int = MAX_CONNECTIVITY_RETRIES,
blmin_ratio: float | None = BLMIN_RATIO_DEFAULT,
) -> Atoms:
"""Place atoms randomly within a compact sphere, ensuring minimum distances.
Atoms are added iteratively with covalent-radii-based clash checks and
connectivity enforcement. For each retry attempt the algorithm slightly
relaxes the effective placement radius and distance thresholds within the
user-specified bounds to improve the chance of finding a valid connected
configuration. Placement order is sampled on each attempt (mass-biased by
default, exploratory otherwise); see
:mod:`scgo.initialization.initialization_config`.
When ``blmin_ratio`` is set (default: ``BLMIN_RATIO_DEFAULT``), placement
and final validation enforce the same steric floor used by GA operators
(``ratio_of_covalent_radii`` / ``build_blmin``). Progressive placement
relaxation never drops below that floor. Pass ``blmin_ratio=None`` to
disable the GA floor and rely only on ``min_distance_factor``.
Args:
composition: List of element symbols for the atoms.
cell_side: The side length of the cubic cell for the returned Atoms object.
placement_radius_scaling: A scaling factor used to determine the initial
spherical volume for atom placement. Larger values result in a larger
initial volume.
min_distance_factor: Factor to scale the sum of covalent radii for
minimum allowed distance between atoms. A value of 1.0 means no
overlap, while < 1.0 allows some overlap.
connectivity_factor: Factor to multiply sum of covalent radii for
connectivity threshold.
max_connectivity_retries: Maximum number of retries if connectivity
validation fails.
blmin_ratio: GA-compatible steric floor (covalent-radius scale). ``None``
disables the extra floor beyond ``min_distance_factor``.
rng: Numpy ``Generator`` supplying all randomness for this call
(placement order, coordinates, retries).
Returns:
An :class:`ase.Atoms` instance with the randomly placed cluster.
Raises:
ValueError: If all atoms cannot be placed within the given constraints
after a maximum number of attempts, or if connectivity validation
fails after all retries.
"""
# Handle empty composition
if not composition:
return Atoms()
if placement_radius_scaling <= 0:
raise ValueError(
f"placement_radius_scaling must be positive, got {placement_radius_scaling}"
)
if min_distance_factor < 0:
raise ValueError(
f"min_distance_factor must be non-negative, got {min_distance_factor}"
)
if cell_side <= 0:
raise ValueError(f"cell_side must be positive, got {cell_side}")
n_atoms = len(composition)
if n_atoms == 0:
return Atoms()
steric_floor = resolve_steric_floor(min_distance_factor, blmin_ratio)
_raise_if_connectivity_below_steric_floor(connectivity_factor, steric_floor)
# Retry logic for connectivity validation
for retry_attempt in range(max_connectivity_retries):
effective_prs = _effective_placement_scaling_for_retry(
placement_radius_scaling,
retry_attempt,
max_connectivity_retries,
steric_floor=steric_floor,
)
new_atoms = Atoms()
new_atoms.set_cell([cell_side, cell_side, cell_side])
atoms_to_add = _sample_atoms_to_add_order(list(composition), rng)
final_atoms = _add_atoms_to_cluster_iteratively(
base_atoms=new_atoms,
atoms_to_add=atoms_to_add,
min_distance_factor=steric_floor,
placement_radius_scaling=effective_prs,
rng=rng,
connectivity_factor=connectivity_factor,
steric_floor=steric_floor,
)
if final_atoms is None:
if retry_attempt == max_connectivity_retries - 1:
error_msg = format_placement_error_message(
context=f"place all {n_atoms} atoms after {max_connectivity_retries} attempts",
composition=composition,
n_atoms=n_atoms,
placement_radius_scaling=placement_radius_scaling,
min_distance_factor=min_distance_factor,
connectivity_factor=connectivity_factor,
cell_side=cell_side,
additional_info=(
f"This may indicate:\n"
f" - Cell too small: cell_side={cell_side:.2f} Å may be insufficient for {n_atoms} atoms\n"
f" - Placement volume too small\n"
f" - Distance constraints too strict\n"
f" - Connectivity factor too strict"
),
)
raise ValueError(error_msg)
continue # Try again with different random placement
if len(final_atoms) > 2 and not is_cluster_connected(
final_atoms, connectivity_factor, use_mic=False
):
if retry_attempt == max_connectivity_retries - 1:
diagnostics = get_structure_diagnostics(
final_atoms, steric_floor, connectivity_factor, use_mic=False
)
error_msg = format_placement_error_message(
context=f"create connected cluster after {max_connectivity_retries} attempts",
composition=composition,
n_atoms=n_atoms,
placement_radius_scaling=placement_radius_scaling,
min_distance_factor=min_distance_factor,
connectivity_factor=connectivity_factor,
diagnostics=diagnostics,
additional_info="The cluster is disconnected.",
)
raise ValueError(error_msg)
continue # Try again with different random placement
final_atoms.center()
validated_atoms, is_valid, _ = validate_cluster(
final_atoms,
composition=composition,
min_distance_factor=steric_floor,
connectivity_factor=connectivity_factor,
sort_atoms=True,
raise_on_failure=False,
source="random_spherical",
)
if not is_valid:
if retry_attempt == max_connectivity_retries - 1:
validate_cluster(
final_atoms,
composition=composition,
min_distance_factor=steric_floor,
connectivity_factor=connectivity_factor,
sort_atoms=True,
raise_on_failure=True,
source="random_spherical",
)
continue
if blmin_ratio is not None and not cluster_passes_ga_blmin(
validated_atoms, blmin_ratio
):
if retry_attempt == max_connectivity_retries - 1:
raise ValueError(
f"[random_spherical] Cluster failed GA blmin validation at "
f"ratio={blmin_ratio} after {max_connectivity_retries} attempts."
)
continue
return validated_atoms
# Should never reach here; raise to indicate placement failure
raise ValueError(
"Failed to place atoms into a connected cluster within allowed attempts"
)
[docs]
def grow_from_seed(
seed_atoms: Atoms,
target_composition: list[str],
placement_radius_scaling: float,
cell_side: float,
rng: np.random.Generator,
min_distance_factor: float = MIN_DISTANCE_FACTOR_DEFAULT,
connectivity_factor: float = CONNECTIVITY_FACTOR,
blmin_ratio: float | None = BLMIN_RATIO_DEFAULT,
) -> Atoms | None:
"""Try to grow a smaller candidate :class:`ase.Atoms` to the target composition.
Growth is performed by repeatedly adding atoms to the existing seed using
convex-hull-based placement (via :func:`_add_atoms_to_cluster_iteratively`),
with covalent-radii-based clash checks and connectivity enforcement.
Args:
seed_atoms: The seed :class:`ase.Atoms` object to grow from.
target_composition: The target composition as a list of element symbols.
placement_radius_scaling: A scaling factor to determine the placement shell
radius.
min_distance_factor: Factor to scale covalent radii for minimum distance
checks.
cell_side: The side length of the cubic cell for the new :class:`ase.Atoms`
object.
connectivity_factor: Factor to multiply sum of covalent radii for
connectivity threshold.
rng: Optional numpy random number generator.
Returns:
A new :class:`ase.Atoms` object of the target composition on success,
or ``None`` on failure.
"""
base_atoms = seed_atoms.copy()
base_composition = base_atoms.get_chemical_symbols()
# Handle empty target composition - return seed with proper cell/centering
if not target_composition:
base_atoms.set_cell([cell_side, cell_side, cell_side])
base_atoms.center()
return base_atoms
# Check composition feasibility before attempting growth
is_feasible, error_message = _check_composition_feasibility(
base_composition, target_composition, operation="grow"
)
if not is_feasible:
logger.warning(
f"Composition growth not feasible: {error_message}. "
f"Base: {base_composition}, Target: {target_composition}"
)
return None
base_counts = get_composition_counts(base_composition)
target_counts = get_composition_counts(target_composition)
atoms_to_add = list((target_counts - base_counts).elements())
if not atoms_to_add:
base_atoms.set_cell([cell_side, cell_side, cell_side])
base_atoms.center()
return base_atoms
atoms_to_add = _sample_atoms_to_add_order(
atoms_to_add,
rng,
base_composition=base_composition,
target_composition=target_composition,
)
steric_floor = resolve_steric_floor(min_distance_factor, blmin_ratio)
final_atoms = _add_atoms_to_cluster_iteratively(
base_atoms=base_atoms,
atoms_to_add=atoms_to_add,
min_distance_factor=steric_floor,
placement_radius_scaling=placement_radius_scaling,
rng=rng,
connectivity_factor=connectivity_factor,
steric_floor=steric_floor,
)
if final_atoms:
final_atoms.set_cell([cell_side, cell_side, cell_side])
final_atoms.center()
# Verify exact composition match after growth
if not _verify_exact_composition(final_atoms, target_composition):
expected_counts = get_composition_counts(target_composition)
actual_counts = get_composition_counts(final_atoms.get_chemical_symbols())
raise ValueError(
f"Growth operation produced incorrect composition. "
f"Expected {target_composition} (counts: {expected_counts}), "
f"got {final_atoms.get_chemical_symbols()} (counts: {actual_counts})"
)
validated_atoms, is_valid, _ = validate_cluster(
final_atoms,
composition=target_composition,
min_distance_factor=steric_floor,
connectivity_factor=connectivity_factor,
sort_atoms=True,
raise_on_failure=False,
source="grow_from_seed",
)
if not is_valid:
return None
if blmin_ratio is not None and not cluster_passes_ga_blmin(
validated_atoms, blmin_ratio
):
return None
return validated_atoms
# If growth could not complete above, explicitly signal failure
return None
def _add_atoms_to_cluster_iteratively(
base_atoms: Atoms,
atoms_to_add: list[str],
min_distance_factor: float,
placement_radius_scaling: float,
rng: np.random.Generator,
connectivity_factor: float = CONNECTIVITY_FACTOR,
*,
steric_floor: float | None = None,
) -> Atoms | None:
"""Iteratively adds atoms to a base Atoms object within a spherical volume.
This function places new atoms in batches when possible (for clusters with ≥4 atoms),
using convex hull facets to place multiple atoms per hull computation. For smaller
clusters or edge cases, it falls back to single-atom placement. The placement volume
is dynamically adjusted. Connectivity is checked during growth to ensure the cluster
remains connected.
Args:
base_atoms: The starting Atoms object to add atoms to
atoms_to_add: List of element symbols to add.
min_distance_factor: Factor for minimum distance validation.
placement_radius_scaling: Scaling for placement radius.
rng: Random number generator.
connectivity_factor: Factor for connectivity threshold.
Returns:
A new Atoms object with all atoms added, or None if addition failed
"""
if not atoms_to_add:
return base_atoms.copy()
atoms = base_atoms.copy()
atoms_to_add = list(atoms_to_add)
new_atoms = atoms.copy()
radii_to_add = {s: get_covalent_radius(s) for s in set(atoms_to_add)}
# Determine if we're creating a 2-atom cluster
total_atoms = len(base_atoms) + len(atoms_to_add)
is_two_atom_cluster = total_atoms == 2
# Handle first atom placement (always at origin)
if not new_atoms:
if not atoms_to_add:
return new_atoms
first_symbol = atoms_to_add.pop(0)
new_atoms.append(Atom(first_symbol, np.array([0.0, 0.0, 0.0])))
# Use batch placement for clusters with ≥4 atoms unless connectivity is as
# tight as the steric floor (bond length fixed at blmin); then anchor-based
# single-atom growth is more reliable than convex-hull batch placement.
max_attempts_per_atom = MAX_PLACEMENT_ATTEMPTS_PER_ATOM
steric_reference = steric_floor if steric_floor is not None else min_distance_factor
tight_connectivity = connectivity_factor <= steric_reference * (1.0 + 1e-6)
if len(new_atoms) < 4 or tight_connectivity:
return _add_atoms_single_mode(
new_atoms,
atoms_to_add,
radii_to_add,
min_distance_factor,
placement_radius_scaling,
rng,
connectivity_factor,
is_two_atom_cluster,
max_attempts_per_atom,
logger,
base_atoms,
steric_floor=steric_floor,
)
# Batch placement mode for clusters with ≥4 atoms
return _add_atoms_batch_mode(
new_atoms,
atoms_to_add,
radii_to_add,
min_distance_factor,
placement_radius_scaling,
rng,
connectivity_factor,
max_attempts_per_atom,
logger,
base_atoms,
steric_floor=steric_floor,
)
def _add_atoms_single_mode(
new_atoms: Atoms,
atoms_to_add: list[str],
radii_to_add: dict[str, float],
min_distance_factor: float,
placement_radius_scaling: float,
rng: np.random.Generator,
connectivity_factor: float,
is_two_atom_cluster: bool,
max_attempts_per_atom: int,
logger,
base_atoms: Atoms,
*,
steric_floor: float | None = None,
) -> Atoms | None:
"""Single-atom placement mode for clusters with <4 atoms."""
for atom_idx, atom_symbol in enumerate(atoms_to_add):
atom_radius = radii_to_add[atom_symbol]
new_pos = None
consecutive_failures = 0
# Create KDTree once per atom if cluster is large enough
use_kdtree = len(new_atoms) >= KDTREE_THRESHOLD
tree = None
if use_kdtree and len(new_atoms) > 0:
tree = KDTree(new_atoms.get_positions())
for attempt in range(max_attempts_per_atom):
attempt_ratio = (attempt + 1) / max_attempts_per_atom
effective_scaling, effective_min_distance = (
_compute_effective_placement_params(
attempt_ratio,
min_distance_factor,
placement_radius_scaling,
steric_floor=steric_floor,
)
)
current_positions = new_atoms.get_positions()
n_current = len(new_atoms)
if n_current > 0:
center = new_atoms.get_center_of_mass()
# Special handling for 2-atom clusters: bond length must respect
# both the steric floor and connectivity_factor (not raw r_i+r_j).
if is_two_atom_cluster and n_current == 1:
existing_radius = covalent_radii[new_atoms.numbers[0]]
bond_distance, _, _ = compute_bond_distance_params(
existing_radius,
atom_radius,
connectivity_factor,
min_distance_factor,
placement_radius_scaling,
effective_min_distance=effective_min_distance,
effective_scaling=effective_scaling,
)
anchor_pos = current_positions[0]
candidate_pos = _sample_connectivity_anchored_position(
anchor_pos, bond_distance, rng
)
elif n_current == 1:
existing_radius = covalent_radii[new_atoms.numbers[0]]
base_extent = existing_radius
placement_distance = (
existing_radius + atom_radius * effective_scaling
)
else:
max_dist_from_center = np.max(
np.linalg.norm(current_positions - center, axis=1),
)
base_extent = max_dist_from_center
placement_distance = (
max_dist_from_center + atom_radius * effective_scaling
)
if not (is_two_atom_cluster and n_current == 1):
available_shell = placement_distance - base_extent
if available_shell < atom_radius * effective_min_distance:
continue
current_radii = np.array(
[covalent_radii[n] for n in new_atoms.numbers]
)
max_existing_radius = (
np.max(current_radii) if len(current_radii) > 0 else atom_radius
)
bond_distance, min_dist, max_connectivity_dist = (
compute_bond_distance_params(
max_existing_radius,
atom_radius,
connectivity_factor,
min_distance_factor,
placement_radius_scaling,
effective_min_distance=effective_min_distance,
effective_scaling=effective_scaling,
)
)
candidates = _generate_batch_positions_on_convex_hull(
new_atoms,
n_candidates=1,
bond_distance=bond_distance,
rng=rng,
min_connectivity_dist=min_dist,
max_connectivity_dist=max_connectivity_dist,
use_all_facets=False,
connectivity_factor=connectivity_factor,
)
if not candidates:
# Convex hull needs >=4 atoms; anchor on an existing atom so
# bond_distance is measured from a real neighbour, not the COM.
anchor_idx = int(rng.integers(n_current))
anchor_pos = current_positions[anchor_idx]
anchor_radius = covalent_radii[new_atoms.numbers[anchor_idx]]
anchor_bond_distance, _, _ = compute_bond_distance_params(
anchor_radius,
atom_radius,
connectivity_factor,
min_distance_factor,
placement_radius_scaling,
effective_min_distance=effective_min_distance,
effective_scaling=effective_scaling,
)
candidate_pos = _sample_connectivity_anchored_position(
anchor_pos, anchor_bond_distance, rng
)
else:
candidate_pos = candidates[0]
if len(current_positions) > 0:
current_radii = np.array([covalent_radii[n] for n in new_atoms.numbers])
# Use KDTree for large clusters to optimize distance checks
if use_kdtree and tree is not None:
dists, _ = tree.query(candidate_pos, k=len(current_positions))
dists = np.asarray(dists).flatten()
else:
dists = np.linalg.norm(current_positions - candidate_pos, axis=1)
min_allowed_dists = (
current_radii + atom_radius
) * effective_min_distance
if np.any(dists < min_allowed_dists):
consecutive_failures += 1
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
break # Early termination
continue
max_connectivity_dists = (
current_radii + atom_radius
) * connectivity_factor
if not np.any(dists <= max_connectivity_dists):
consecutive_failures += 1
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
break # Early termination
continue
new_pos = candidate_pos
consecutive_failures = 0 # Reset on success
break
if new_pos is None:
current_count = len(new_atoms)
total_target = len(base_atoms) + len(atoms_to_add)
attempts_made = min(attempt + 1, max_attempts_per_atom)
early_term = consecutive_failures >= MAX_CONSECUTIVE_FAILURES
reason = "early termination" if early_term else f"{attempts_made} attempts"
# Get current composition state
current_composition = new_atoms.get_chemical_symbols()
current_counts = get_composition_counts(current_composition)
remaining_atoms = atoms_to_add[atom_idx:]
remaining_counts = get_composition_counts(remaining_atoms)
diagnostics = get_structure_diagnostics(
new_atoms, min_distance_factor, connectivity_factor, use_mic=False
)
additional_info = (
f"Failed to place atom {atom_symbol} ({current_count + 1}/{total_target}) after {reason}.\n"
f"Current state: {current_count} atoms placed, composition counts: {current_counts}\n"
f"Remaining atoms to place: {len(remaining_atoms)} atoms, composition counts: {remaining_counts}"
)
error_msg = format_placement_error_message(
context="place atom",
composition=None,
n_atoms=None,
placement_radius_scaling=placement_radius_scaling,
min_distance_factor=min_distance_factor,
connectivity_factor=connectivity_factor,
diagnostics=diagnostics,
additional_info=additional_info,
)
logger.debug(error_msg)
return None
new_atoms.append(Atom(atom_symbol, new_pos))
if len(new_atoms) >= 2 and not is_cluster_connected(
new_atoms, connectivity_factor, use_mic=False
):
disconnection_distance, suggested_factor, analysis_msg = (
analyze_disconnection(new_atoms, connectivity_factor, use_mic=False)
)
current_composition = new_atoms.get_chemical_symbols()
current_counts = get_composition_counts(current_composition)
remaining_atoms = atoms_to_add[atom_idx + 1 :]
remaining_counts = (
get_composition_counts(remaining_atoms) if remaining_atoms else {}
)
diagnostics = get_structure_diagnostics(
new_atoms, min_distance_factor, connectivity_factor, use_mic=False
)
additional_info = (
f"Cluster became disconnected after placing atom {atom_symbol} "
f"({len(new_atoms)}/{len(base_atoms) + len(atoms_to_add)} atoms placed).\n"
f"Current state: {len(new_atoms)} atoms, composition counts: {current_counts}\n"
f"Remaining atoms: {len(remaining_atoms)} atoms, composition counts: {remaining_counts}\n"
f"Analysis: {analysis_msg}\n"
f"Suggested connectivity_factor: {suggested_factor:.2f}"
)
error_msg = format_placement_error_message(
context="maintain connectivity",
composition=None,
n_atoms=None,
placement_radius_scaling=placement_radius_scaling,
min_distance_factor=min_distance_factor,
connectivity_factor=connectivity_factor,
diagnostics=diagnostics,
additional_info=additional_info,
)
logger.debug(error_msg)
return None
return new_atoms
def _add_atoms_batch_mode(
new_atoms: Atoms,
atoms_to_add: list[str],
radii_to_add: dict[str, float],
min_distance_factor: float,
placement_radius_scaling: float,
rng: np.random.Generator,
connectivity_factor: float,
max_attempts_per_atom: int,
logger,
base_atoms: Atoms,
*,
steric_floor: float | None = None,
) -> Atoms | None:
"""Batch placement mode for clusters with ≥4 atoms."""
total_target = len(base_atoms) + len(atoms_to_add)
batch_attempt = 0
max_batch_attempts = max_attempts_per_atom # Limit total batch attempts
while atoms_to_add:
batch_attempt += 1
if batch_attempt > max_batch_attempts:
current_count = len(new_atoms)
current_composition = new_atoms.get_chemical_symbols()
current_counts = get_composition_counts(current_composition)
remaining_counts = get_composition_counts(atoms_to_add)
diagnostics = get_structure_diagnostics(
new_atoms, min_distance_factor, connectivity_factor, use_mic=False
)
additional_info = (
f"Failed to place remaining {len(atoms_to_add)} atoms "
f"({current_count}/{total_target} placed) after {max_batch_attempts} batch attempts.\n"
f"Current state: {current_count} atoms placed, composition counts: {current_counts}\n"
f"Remaining atoms: {len(atoms_to_add)} atoms, composition counts: {remaining_counts}"
)
error_msg = format_placement_error_message(
context="place remaining atoms in batch mode",
composition=None,
n_atoms=None,
placement_radius_scaling=placement_radius_scaling,
min_distance_factor=min_distance_factor,
connectivity_factor=connectivity_factor,
diagnostics=diagnostics,
additional_info=additional_info,
)
logger.debug(error_msg)
return None
# Progressive relaxation for batch attempts
attempt_ratio = batch_attempt / max_batch_attempts
effective_scaling, effective_min_distance = _compute_effective_placement_params(
attempt_ratio,
min_distance_factor,
placement_radius_scaling,
steric_floor=steric_floor,
)
current_positions = new_atoms.get_positions()
# Calculate placement parameters
current_radii = np.array([covalent_radii[n] for n in new_atoms.numbers])
max_existing_radius = float(
np.max(current_radii) if len(current_radii) > 0 else 0.0
)
avg_atom_radius = float(np.mean([radii_to_add[s] for s in atoms_to_add]))
bond_distance, min_dist, max_connectivity_dist = compute_bond_distance_params(
max_existing_radius,
avg_atom_radius,
connectivity_factor,
min_distance_factor,
placement_radius_scaling,
effective_min_distance=effective_min_distance,
effective_scaling=effective_scaling,
)
# Generate batch of candidate positions
batch_size = min(len(atoms_to_add), 100) # Limit batch size for efficiency
candidates = _generate_batch_positions_on_convex_hull(
new_atoms,
batch_size,
bond_distance,
rng,
min_connectivity_dist=min_dist,
max_connectivity_dist=max_connectivity_dist,
connectivity_factor=connectivity_factor,
)
if not candidates:
# Fall back to single-atom mode if batch generation fails
return _add_atoms_single_mode(
new_atoms,
atoms_to_add,
radii_to_add,
min_distance_factor,
placement_radius_scaling,
rng,
connectivity_factor,
False,
max_attempts_per_atom,
logger,
base_atoms,
steric_floor=steric_floor,
)
# Validate and place candidates
atoms_to_place = atoms_to_add[: len(candidates)]
valid_placements = []
# Use KDTree for large clusters to optimize distance checks
use_kdtree = len(new_atoms) >= KDTREE_THRESHOLD
if use_kdtree and len(current_positions) > 0:
tree = KDTree(current_positions)
for i, (atom_symbol, candidate_pos) in enumerate(
zip(atoms_to_place, candidates, strict=True)
):
atom_radius = radii_to_add[atom_symbol]
# Check distance to existing atoms
if len(current_positions) > 0:
if use_kdtree:
dists_to_existing, _ = tree.query(
candidate_pos, k=len(current_positions)
)
dists_to_existing = np.asarray(dists_to_existing).flatten()
else:
dists_to_existing = np.linalg.norm(
current_positions - candidate_pos, axis=1
)
min_allowed_dists = (
current_radii + atom_radius
) * effective_min_distance
if np.any(dists_to_existing < min_allowed_dists):
continue # Clash with existing atoms
# Check connectivity
max_connectivity_dists = (
current_radii + atom_radius
) * connectivity_factor
if not np.any(dists_to_existing <= max_connectivity_dists):
continue # Too far for connectivity
# Check distance to other candidates in batch
valid = True
for j, (other_symbol, other_pos) in enumerate(
zip(atoms_to_place, candidates, strict=True)
):
if i == j:
continue
other_radius = radii_to_add[other_symbol]
dist = np.linalg.norm(candidate_pos - other_pos)
min_allowed = (atom_radius + other_radius) * effective_min_distance
if dist < min_allowed:
valid = False
break
if valid:
valid_placements.append((atom_symbol, candidate_pos))
# Place all valid candidates
if not valid_placements:
# No valid placements in this batch, try again
continue
# Remove placed atoms from atoms_to_add (one per placement, not all of each symbol)
placed_counts = Counter(sym for sym, _ in valid_placements)
remaining = []
for atom in atoms_to_add:
if placed_counts.get(atom, 0) > 0:
placed_counts[atom] -= 1
else:
remaining.append(atom)
atoms_to_add = remaining
for atom_symbol, pos in valid_placements:
new_atoms.append(Atom(atom_symbol, pos))
if len(new_atoms) >= 2 and not is_cluster_connected(
new_atoms, connectivity_factor, use_mic=False
):
disconnection_distance, suggested_factor, analysis_msg = (
analyze_disconnection(new_atoms, connectivity_factor, use_mic=False)
)
current_composition = new_atoms.get_chemical_symbols()
current_counts = get_composition_counts(current_composition)
remaining_counts = get_composition_counts(atoms_to_add)
diagnostics = get_structure_diagnostics(
new_atoms, min_distance_factor, connectivity_factor, use_mic=False
)
additional_info = (
f"Cluster became disconnected after batch placement "
f"({len(new_atoms)}/{total_target} atoms placed).\n"
f"Current state: {len(new_atoms)} atoms, composition counts: {current_counts}\n"
f"Remaining atoms: {len(atoms_to_add)} atoms, composition counts: {remaining_counts}\n"
f"Analysis: {analysis_msg}\n"
f"Suggested connectivity_factor: {suggested_factor:.2f}"
)
error_msg = format_placement_error_message(
context="maintain connectivity after batch placement",
composition=None,
n_atoms=None,
placement_radius_scaling=placement_radius_scaling,
min_distance_factor=min_distance_factor,
connectivity_factor=connectivity_factor,
diagnostics=diagnostics,
additional_info=additional_info,
)
logger.warning(error_msg)
return None
# Reset batch attempt counter on successful placement
if valid_placements:
batch_attempt = 0
return new_atoms