Skip to content

capturegraph.scheduling.organize.batched_distance_store #

Batched Distance Store - Distribution-Aware Assignment Management#

Manages session assignments using a distance function to detect when assignments have been fulfilled by new captures. As new sessions are captured, assignments that are "close enough" to a new capture are automatically removed (fulfilled).

Example
import capturegraph.scheduling as cgsh

store = cgsh.organize.BatchedDistanceStore(
    persistence=persistence,
    sessions=target.captures,
    distance_fn=cgsh.distance.location(sigma_m=50),
    threshold=1.0,
)

# Add candidate slots to assign from
store.set_batch(candidate_slots)

# Assign to a user
slot = store.request_slot(user_id)

# Later, create new store with updated sessions to detect fulfillment
store = cgsh.organize.BatchedDistanceStore(
    persistence=persistence,
    sessions=target.captures,  # New captures fulfill nearby assignments
    distance_fn=cgsh.distance.location(sigma_m=50),
    threshold=1.0,
)

BatchedDistanceStore #

Manages slot assignments with distance-aware fulfillment detection.

When new sessions (captures) are added, the store computes distances from the new captures to all currently assigned slots. Any assigned slot within threshold distance of a new capture is considered "fulfilled" and removed.

State is persisted in the provided persistence dict: - persistence.batched_assignments: user_id -> assigned slot - persistence.batched_batch: list of unassigned candidate slots - persistence.batched_prior_count: session count baseline

Parameters:

Name Type Description Default
persistence Dict

Server persistence dict (must support attribute access).

required
sessions List[Dict[Any]]

Current list of captured sessions. Used to establish baseline count and detect new captures. Sessions beyond the prior count are checked for fulfillment of existing assignments.

required
distance_fn Callable[[Any, Any], float]

A distance function (a, b) -> float. Should ideally support batch computation via the BatchedDistanceFunction protocol.

required
threshold float

Distance threshold for fulfillment. Assignments with distance < threshold to any new capture are removed. Default is 1.0.

1.0
Example
import capturegraph.scheduling as cgsh

store = cgsh.organize.BatchedDistanceStore(
    persistence=persistence,
    sessions=target.captures,
    distance_fn=cgsh.distance.location(sigma_m=50),
    threshold=1.0,
)

store.set_batch(candidate_slots)
slot = store.request_slot("user_123")  # Returns first available
slot = store.request_slot("user_123")  # Returns same slot (sticky)

# When new captures arrive, create new store to detect fulfillment
store = cgsh.organize.BatchedDistanceStore(
    persistence=persistence,
    sessions=target.captures,  # Fulfilled assignments removed
    distance_fn=cgsh.distance.location(sigma_m=50),
    threshold=1.0,
)
Source code in capturegraph-lib/capturegraph/scheduling/organize/batched_distance_store.py
class BatchedDistanceStore:
    """Manages slot assignments with distance-aware fulfillment detection.

    When new sessions (captures) are added, the store computes distances from
    the new captures to all currently assigned slots. Any assigned slot within
    `threshold` distance of a new capture is considered "fulfilled" and removed.

    State is persisted in the provided persistence dict:
    - `persistence.batched_assignments`: user_id -> assigned slot
    - `persistence.batched_batch`: list of unassigned candidate slots
    - `persistence.batched_prior_count`: session count baseline

    Args:
        persistence: Server persistence dict (must support attribute access).
        sessions: Current list of captured sessions. Used to establish baseline
            count and detect new captures. Sessions beyond the prior count are
            checked for fulfillment of existing assignments.
        distance_fn: A distance function `(a, b) -> float`. Should ideally
            support batch computation via the `BatchedDistanceFunction` protocol.
        threshold: Distance threshold for fulfillment. Assignments with distance
            < threshold to any new capture are removed. Default is 1.0.

    Example:
        ```python
        import capturegraph.scheduling as cgsh

        store = cgsh.organize.BatchedDistanceStore(
            persistence=persistence,
            sessions=target.captures,
            distance_fn=cgsh.distance.location(sigma_m=50),
            threshold=1.0,
        )

        store.set_batch(candidate_slots)
        slot = store.request_slot("user_123")  # Returns first available
        slot = store.request_slot("user_123")  # Returns same slot (sticky)

        # When new captures arrive, create new store to detect fulfillment
        store = cgsh.organize.BatchedDistanceStore(
            persistence=persistence,
            sessions=target.captures,  # Fulfilled assignments removed
            distance_fn=cgsh.distance.location(sigma_m=50),
            threshold=1.0,
        )
        ```
    """

    def __init__(
        self,
        persistence: cg.Dict,
        sessions: cg.List[cg.Dict[Any]],
        distance_fn: Callable[[Any, Any], float],
        threshold: float = 1.0,
    ):
        self._persistence = persistence
        self._distance_fn = distance_fn
        self._threshold = threshold

        # Initialize persistence fields if missing
        if cg.is_missing(self._persistence.batched_assignments):
            self._persistence.batched_assignments = cg.Dict({})

        if cg.is_missing(self._persistence.batched_batch):
            self._persistence.batched_batch = cg.List([])

        if cg.is_missing(self._persistence.batched_prior_count):
            self._persistence.batched_prior_count = 0

        # Process initial sessions to detect fulfilled assignments
        self._process_new_sessions(sessions)

    @property
    def _assignments(self) -> cg.Dict:
        return self._persistence.batched_assignments

    @property
    def _batch(self) -> cg.List:
        return self._persistence.batched_batch

    @property
    def _prior_count(self) -> int:
        return self._persistence.batched_prior_count

    def is_empty(self) -> bool:
        """Check if the batch is exhausted."""
        return len(self._batch) == 0

    def request_slot(self, user_id: str) -> cg.Dict[Any] | None:
        """Request a slot for a user.

        If the user already has an assignment, returns it (sticky assignment).
        Otherwise, pops the next slot from the batch and assigns it.

        Args:
            user_id: The user's unique identifier.

        Returns:
            The assigned slot, or None if the batch is empty.

        Example:
            ```python
            slot = store.request_slot("user_123")
            if slot is None:
                # Batch exhausted, need to add more slots
                store.set_batch(more_candidates)
                slot = store.request_slot("user_123")
            ```
        """
        # Return existing assignment if present
        if user_id in self._assignments:
            return self._assignments[user_id]

        # Batch exhausted
        if len(self._batch) == 0:
            return None

        # Assign next available slot
        current_batch = list(self._batch)
        slot = current_batch.pop(0)
        self._persistence.batched_batch = cg.List(current_batch)
        self._persistence.batched_assignments[user_id] = slot
        return slot

    def _process_new_sessions(self, sessions: list[cg.Dict[Any]]) -> None:
        """Internal: detect and remove fulfilled assignments."""
        new_count = len(sessions)
        prior_count = self._prior_count
        self._persistence.batched_prior_count = new_count

        if new_count <= prior_count:
            # No new sessions
            return

        # Get only the new sessions (captures)
        new_captures = sessions[prior_count:]

        if len(self._assignments) == 0:
            # No assignments to check
            return

        # Get assigned slots as a list
        assigned_user_ids = list(self._assignments.keys())
        assigned_slots = cg.List([self._assignments[uid] for uid in assigned_user_ids])
        new_captures_list = cg.List(new_captures)

        if not assigned_slots:
            return

        if not new_captures_list:
            return

        try:
            distances = batch_matrix(
                self._distance_fn, new_captures_list, assigned_slots
            )
        except Exception:
            return

        # Find minimum distance from any new capture to each assigned slot
        min_distances = distances.min(axis=0)  # Shape: (num_assigned,)

        # Remove assignments that were fulfilled (capture close to assigned slot)
        users_to_remove = []
        for user_id, min_dist in zip(assigned_user_ids, min_distances):
            if min_dist < self._threshold:
                users_to_remove.append(user_id)

        # Remove fulfilled assignments
        for user_id in users_to_remove:
            del self._persistence.batched_assignments[user_id]

    def set_batch(self, slots: list[cg.Dict[Any]]) -> None:
        """Replace the current batch with new candidate slots.

        Slots are reordered using a greedy nearest-neighbor TSP approximation
        so that consecutive slots are close together, making it easier for
        users to visit them in sequence.

        Args:
            slots: New candidate slots to assign from.
        """
        if len(slots) <= 1:
            self._persistence.batched_batch = cg.List(slots)
            return

        slots = cg.List(slots)
        ordered = self._tsp_nearest_neighbor(slots)
        self._persistence.batched_batch = cg.List(ordered)

    def _tsp_nearest_neighbor(self, slots: cg.List) -> list[cg.Dict[Any]]:
        """Greedy nearest-neighbor TSP approximation.

        Starts from the first slot and repeatedly picks the closest unvisited
        slot. This is O(n²) but simple and typically gives a reasonable tour.

        Args:
            slots: List of slots to order.

        Returns:
            Slots reordered to minimize consecutive distances.
        """
        import numpy as np

        n = len(slots)
        if n <= 1:
            return list(slots)

        # Compute full distance matrix
        distance_matrix = batch_matrix(self._distance_fn, slots, slots)

        # Set diagonal to infinity to avoid self-selection
        np.fill_diagonal(distance_matrix, np.inf)

        visited = [False] * n
        order = []

        # Start from first slot
        current = 0
        visited[current] = True
        order.append(current)

        # Greedily pick nearest unvisited neighbor
        for _ in range(n - 1):
            distances = distance_matrix[current].copy()
            # Mask visited nodes
            for j in range(n):
                if visited[j]:
                    distances[j] = np.inf

            nearest = int(np.argmin(distances))
            visited[nearest] = True
            order.append(nearest)
            current = nearest

        return [slots[i] for i in order]

is_empty() #

Check if the batch is exhausted.

Source code in capturegraph-lib/capturegraph/scheduling/organize/batched_distance_store.py
def is_empty(self) -> bool:
    """Check if the batch is exhausted."""
    return len(self._batch) == 0

request_slot(user_id) #

Request a slot for a user.

If the user already has an assignment, returns it (sticky assignment). Otherwise, pops the next slot from the batch and assigns it.

Parameters:

Name Type Description Default
user_id str

The user's unique identifier.

required

Returns:

Type Description
Dict[Any] | None

The assigned slot, or None if the batch is empty.

Example
slot = store.request_slot("user_123")
if slot is None:
    # Batch exhausted, need to add more slots
    store.set_batch(more_candidates)
    slot = store.request_slot("user_123")
Source code in capturegraph-lib/capturegraph/scheduling/organize/batched_distance_store.py
def request_slot(self, user_id: str) -> cg.Dict[Any] | None:
    """Request a slot for a user.

    If the user already has an assignment, returns it (sticky assignment).
    Otherwise, pops the next slot from the batch and assigns it.

    Args:
        user_id: The user's unique identifier.

    Returns:
        The assigned slot, or None if the batch is empty.

    Example:
        ```python
        slot = store.request_slot("user_123")
        if slot is None:
            # Batch exhausted, need to add more slots
            store.set_batch(more_candidates)
            slot = store.request_slot("user_123")
        ```
    """
    # Return existing assignment if present
    if user_id in self._assignments:
        return self._assignments[user_id]

    # Batch exhausted
    if len(self._batch) == 0:
        return None

    # Assign next available slot
    current_batch = list(self._batch)
    slot = current_batch.pop(0)
    self._persistence.batched_batch = cg.List(current_batch)
    self._persistence.batched_assignments[user_id] = slot
    return slot

set_batch(slots) #

Replace the current batch with new candidate slots.

Slots are reordered using a greedy nearest-neighbor TSP approximation so that consecutive slots are close together, making it easier for users to visit them in sequence.

Parameters:

Name Type Description Default
slots list[Dict[Any]]

New candidate slots to assign from.

required
Source code in capturegraph-lib/capturegraph/scheduling/organize/batched_distance_store.py
def set_batch(self, slots: list[cg.Dict[Any]]) -> None:
    """Replace the current batch with new candidate slots.

    Slots are reordered using a greedy nearest-neighbor TSP approximation
    so that consecutive slots are close together, making it easier for
    users to visit them in sequence.

    Args:
        slots: New candidate slots to assign from.
    """
    if len(slots) <= 1:
        self._persistence.batched_batch = cg.List(slots)
        return

    slots = cg.List(slots)
    ordered = self._tsp_nearest_neighbor(slots)
    self._persistence.batched_batch = cg.List(ordered)