Skip to content

capturegraph.scheduling.organize.assignment_store #

Assignment Store - Per-User Session Management#

Manages per-user notification assignments with automatic expiry and drift reconciliation for robust scheduling persistence.

Example
from capturegraph.scheduling.collaboration import AssignmentStore

store = AssignmentStore(
    persistence=persistence,
    expire_after=timedelta(hours=0.5),
    last_capture=sessions[-1].date,
)

# Get existing assignment or assign next available session
next_session = store.assign_next(user_id, candidate_sessions)

AssignmentStore #

Manages per-user session assignments with automatic expiry.

Stores assignments in persistence.assignments and handles:

  • Automatic cleanup: Removes stale assignments on initialization
  • Capture-aware expiry: Discards assignments after a new capture
  • Drift reconciliation: Rounds assignments to nearest candidate sessions

Sessions must have a date attribute (datetime) for sorting and expiry checks.

Parameters:

Name Type Description Default
persistence Dict

Server persistence dict (must support attribute access).

required
expire_after timedelta

Duration before an assignment expires. Defaults to 30 minutes.

timedelta(hours=0.5)
last_capture datetime | None

Timestamp of most recent capture session. If provided, all assignments older than last_capture + expiry are immediately discarded.

None
Example
from capturegraph.scheduling.collaboration import AssignmentStore

store = AssignmentStore(
    persistence=persistence,
    expire_after=timedelta(hours=3.0),
    last_capture=target.Sunflower[-1].date,
)

# Check existing assignment
existing = store[user_id]
if existing is None:
    # Assign to next available session
    store[user_id] = candidate_sessions[0]
Source code in capturegraph-lib/capturegraph/scheduling/organize/assignment_store.py
class AssignmentStore:
    """Manages per-user session assignments with automatic expiry.

    Stores assignments in `persistence.assignments` and handles:

    - **Automatic cleanup**: Removes stale assignments on initialization
    - **Capture-aware expiry**: Discards assignments after a new capture
    - **Drift reconciliation**: Rounds assignments to nearest candidate sessions

    Sessions must have a `date` attribute (datetime) for sorting and expiry checks.

    Args:
        persistence: Server persistence dict (must support attribute access).
        expire_after: Duration before an assignment expires. Defaults to 30 minutes.
        last_capture: Timestamp of most recent capture session.
            If provided, all assignments older than `last_capture + expiry`
            are immediately discarded.

    Example:
        ```python
        from capturegraph.scheduling.collaboration import AssignmentStore

        store = AssignmentStore(
            persistence=persistence,
            expire_after=timedelta(hours=3.0),
            last_capture=target.Sunflower[-1].date,
        )

        # Check existing assignment
        existing = store[user_id]
        if existing is None:
            # Assign to next available session
            store[user_id] = candidate_sessions[0]
        ```
    """

    def __init__(
        self,
        persistence: cg.Dict,
        expire_after: timedelta = timedelta(hours=0.5),
        last_capture: datetime | None = None,
    ):
        self._persistence = persistence
        if cg.is_missing(self._persistence.assignments):
            self._persistence.assignments = cg.Dict({})

        self._last_capture = last_capture
        self._expire_after = expire_after

        threshold = datetime.now() - self._expire_after

        if last_capture is not None and not cg.is_missing(last_capture):
            threshold = max(threshold, last_capture + self._expire_after)

        # Clean up expired assignments
        self._persistence.assignments = {
            k: v
            for k, v in sorted(
                self._assignments.items(),
                key=lambda x: x[1].date,
            )
            if v.date > threshold
        }

    @property
    def _assignments(self) -> cg.Dict:
        return self._persistence.assignments

    def __getitem__(self, user_id: str) -> cg.Dict[Any] | None:
        """Get the current assignment for a user.

        Args:
            user_id: The user's unique identifier.

        Returns:
            The assigned session Dict, or None if not assigned.
        """
        if user_id not in self._assignments:
            return None

        return self._assignments[user_id]

    def __setitem__(self, user_id: str, session: cg.Dict[Any]) -> None:
        """Set an assignment for a user (no-op if already assigned).

        Args:
            user_id: The user's unique identifier.
            session: The session Dict to assign (must have a `date` attribute).
        """
        if user_id in self._assignments:
            return

        self._assignments[user_id] = session
        self._persistence.assignments = {
            k: v for k, v in sorted(self._assignments.items(), key=lambda x: x[1].date)
        }

    def assign_next(
        self, user_id: str, sessions: list[cg.Dict[Any]]
    ) -> cg.Dict[Any] | None:
        """Assign the next available session to a user.

        If the user already has an assignment, returns it. Otherwise,
        finds the first session not taken by another user and assigns it.

        Args:
            user_id: The user's unique identifier.
            sessions: Candidate sessions in chronological order.

        Returns:
            The assigned session, or `sessions[0]` as fallback if all taken.

        Example:
            ```python
            next_session = store.assign_next(user_id, candidate_sessions)
            ```
        """
        if user_id in self._assignments:
            return self._assignments[user_id]

        taken_dates = {s.date for s in self.values(sessions)}
        for session in sessions:
            if session.date not in taken_dates:
                self[user_id] = session
                return session

        return sessions[0] if sessions else None

    def values(
        self,
        sessions: list[cg.Dict[Any]] | None = None,
    ) -> list[cg.Dict[Any]]:
        """Get all assigned sessions, optionally rounded to candidates.

        When `sessions` is provided, performs drift reconciliation:
        rounds each assignment to the nearest candidate session using
        microsecond-precision distance checks on the `date` attribute.

        Args:
            sessions: Optional candidate sessions for rounding.

        Returns:
            List of assigned sessions (possibly rounded).
        """
        if sessions is None:
            return list(self._assignments.values())

        if not all(
            hasattr(s, "date") and isinstance(s.date, datetime) for s in sessions
        ):
            raise ValueError("All sessions must have a `date` attribute.")

        # Sort sessions by date
        sorted_sessions = sorted(sessions, key=lambda s: s.date)
        session_dates = np.array([s.date for s in sorted_sessions])

        assignment_dates = np.array(
            sorted([a.date for a in self._assignments.values()])
        )

        if len(assignment_dates) == 0:
            return []

        # searchsorted returns insertion point (first element >= value)
        # We need to compare with element at insertion point AND the previous element
        insert_pos = np.searchsorted(session_dates, assignment_dates)

        # Right candidate: element at insertion point (or last if past end)
        right_idx = np.minimum(insert_pos, len(session_dates) - 1)
        right_distance = np.abs(assignment_dates - session_dates[right_idx])

        # Left candidate: element before insertion point (or first if at start)
        left_idx = np.maximum(insert_pos - 1, 0)
        left_distance = np.abs(assignment_dates - session_dates[left_idx])

        # Choose the closer candidate
        search = np.where(left_distance <= right_distance, left_idx, right_idx)
        distance = np.minimum(left_distance, right_distance)
        expiry = np.array([self._expire_after])

        valid_indices = search[np.where(distance < expiry)]
        return [sorted_sessions[i] for i in valid_indices]

__getitem__(user_id) #

Get the current assignment for a user.

Parameters:

Name Type Description Default
user_id str

The user's unique identifier.

required

Returns:

Type Description
Dict[Any] | None

The assigned session Dict, or None if not assigned.

Source code in capturegraph-lib/capturegraph/scheduling/organize/assignment_store.py
def __getitem__(self, user_id: str) -> cg.Dict[Any] | None:
    """Get the current assignment for a user.

    Args:
        user_id: The user's unique identifier.

    Returns:
        The assigned session Dict, or None if not assigned.
    """
    if user_id not in self._assignments:
        return None

    return self._assignments[user_id]

__setitem__(user_id, session) #

Set an assignment for a user (no-op if already assigned).

Parameters:

Name Type Description Default
user_id str

The user's unique identifier.

required
session Dict[Any]

The session Dict to assign (must have a date attribute).

required
Source code in capturegraph-lib/capturegraph/scheduling/organize/assignment_store.py
def __setitem__(self, user_id: str, session: cg.Dict[Any]) -> None:
    """Set an assignment for a user (no-op if already assigned).

    Args:
        user_id: The user's unique identifier.
        session: The session Dict to assign (must have a `date` attribute).
    """
    if user_id in self._assignments:
        return

    self._assignments[user_id] = session
    self._persistence.assignments = {
        k: v for k, v in sorted(self._assignments.items(), key=lambda x: x[1].date)
    }

assign_next(user_id, sessions) #

Assign the next available session to a user.

If the user already has an assignment, returns it. Otherwise, finds the first session not taken by another user and assigns it.

Parameters:

Name Type Description Default
user_id str

The user's unique identifier.

required
sessions list[Dict[Any]]

Candidate sessions in chronological order.

required

Returns:

Type Description
Dict[Any] | None

The assigned session, or sessions[0] as fallback if all taken.

Example
next_session = store.assign_next(user_id, candidate_sessions)
Source code in capturegraph-lib/capturegraph/scheduling/organize/assignment_store.py
def assign_next(
    self, user_id: str, sessions: list[cg.Dict[Any]]
) -> cg.Dict[Any] | None:
    """Assign the next available session to a user.

    If the user already has an assignment, returns it. Otherwise,
    finds the first session not taken by another user and assigns it.

    Args:
        user_id: The user's unique identifier.
        sessions: Candidate sessions in chronological order.

    Returns:
        The assigned session, or `sessions[0]` as fallback if all taken.

    Example:
        ```python
        next_session = store.assign_next(user_id, candidate_sessions)
        ```
    """
    if user_id in self._assignments:
        return self._assignments[user_id]

    taken_dates = {s.date for s in self.values(sessions)}
    for session in sessions:
        if session.date not in taken_dates:
            self[user_id] = session
            return session

    return sessions[0] if sessions else None

values(sessions=None) #

Get all assigned sessions, optionally rounded to candidates.

When sessions is provided, performs drift reconciliation: rounds each assignment to the nearest candidate session using microsecond-precision distance checks on the date attribute.

Parameters:

Name Type Description Default
sessions list[Dict[Any]] | None

Optional candidate sessions for rounding.

None

Returns:

Type Description
list[Dict[Any]]

List of assigned sessions (possibly rounded).

Source code in capturegraph-lib/capturegraph/scheduling/organize/assignment_store.py
def values(
    self,
    sessions: list[cg.Dict[Any]] | None = None,
) -> list[cg.Dict[Any]]:
    """Get all assigned sessions, optionally rounded to candidates.

    When `sessions` is provided, performs drift reconciliation:
    rounds each assignment to the nearest candidate session using
    microsecond-precision distance checks on the `date` attribute.

    Args:
        sessions: Optional candidate sessions for rounding.

    Returns:
        List of assigned sessions (possibly rounded).
    """
    if sessions is None:
        return list(self._assignments.values())

    if not all(
        hasattr(s, "date") and isinstance(s.date, datetime) for s in sessions
    ):
        raise ValueError("All sessions must have a `date` attribute.")

    # Sort sessions by date
    sorted_sessions = sorted(sessions, key=lambda s: s.date)
    session_dates = np.array([s.date for s in sorted_sessions])

    assignment_dates = np.array(
        sorted([a.date for a in self._assignments.values()])
    )

    if len(assignment_dates) == 0:
        return []

    # searchsorted returns insertion point (first element >= value)
    # We need to compare with element at insertion point AND the previous element
    insert_pos = np.searchsorted(session_dates, assignment_dates)

    # Right candidate: element at insertion point (or last if past end)
    right_idx = np.minimum(insert_pos, len(session_dates) - 1)
    right_distance = np.abs(assignment_dates - session_dates[right_idx])

    # Left candidate: element before insertion point (or first if at start)
    left_idx = np.maximum(insert_pos - 1, 0)
    left_distance = np.abs(assignment_dates - session_dates[left_idx])

    # Choose the closer candidate
    search = np.where(left_distance <= right_distance, left_idx, right_idx)
    distance = np.minimum(left_distance, right_distance)
    expiry = np.array([self._expire_after])

    valid_indices = search[np.where(distance < expiry)]
    return [sorted_sessions[i] for i in valid_indices]