Skip to content

Agent

polymathera.colony.agents.base.Agent

Bases: BaseModel

Base agent class representing an autonomous computational entity.

All agents have: - Unique ID and type - Lifecycle state - Optional page binding - Access to system services (VCM, LLM, blackboard)

Control flow should be driven by a reasoning LLM given sufficient context, not hardcoded. Agents adapt behavior based on current context.

Attributes:

Name Type Description
agent_id str

Unique identifier

agent_type str

Type of agent ("specialized", "general", "service", "supervisor")

state AgentState

Current lifecycle state

bound_pages list[str]

Page IDs this agent is bound to (empty for unbound agents)

created_at float

Creation timestamp

metadata AgentMetadata

Arbitrary metadata

state = Field(default=(AgentState.INITIALIZED)) class-attribute instance-attribute

agent_id instance-attribute

metadata = Field(default_factory=AgentMetadata) class-attribute instance-attribute

run_step() async

Execute one step of agent logic.

This is the core method that defines agent behavior. Override in subclasses to implement specific agent logic.

This method should be idempotent and handle its own errors.

This method is @hookable, so capabilities can register BEFORE hooks to prepare for the step, AFTER hooks to post-process the step, AROUND hooks to wrap the step execution entirely, and to handle errors during execution.

NOTE: We use the repeated run_step approach instead of a single long-running run method to facilitate easier suspension and state management. This is necessary for distributed agents that may be suspended and resumed across different replicas.

Source code in src/polymathera/colony/agents/base.py
@hookable
async def run_step(self) -> None:
    """Execute one step of agent logic.

    This is the core method that defines agent behavior. Override in
    subclasses to implement specific agent logic.

    This method should be idempotent and handle its own errors.

    This method is @hookable, so capabilities can register BEFORE hooks
    to prepare for the step,
    AFTER hooks to post-process the step,
    AROUND hooks to wrap the step execution entirely,
    and to handle errors during execution.

    NOTE: We use the repeated `run_step` approach instead of a single
    long-running `run` method to facilitate easier suspension and state
    management. This is necessary for distributed agents that may be
    suspended and resumed across different replicas.
    """
    if self.state not in (AgentState.RUNNING, AgentState.IDLE):
        logger.warning(
            f"Agent {self.agent_id} in state {self.state}, cannot run step"
        )
        return

    # Set up session_id context for the ENTIRE run_step
    # Session_id is propagated from requests via action_policy_state.custom["current_session_id"]
    # This ensures ALL operations (control messages, hooks, capabilities) have session_id
    from .sessions.context import session_id_context
    current_session_id = None
    if self.action_policy_state is not None:
        current_session_id = self.action_policy_state.custom.get("current_session_id")

    with session_id_context(current_session_id):
        # Process control messages from parent/children (mailbox)
        await self._process_control_messages()

        # Check child health (periodic health monitoring)
        await self._check_child_health()

        if self.action_policy_state is None:
            self.action_policy_state = ActionPolicyExecutionState()

        logger.warning(
            f"\n"
            f"  ┌──────────────────────────────────────────────┐\n"
            f"  │  ▶ RUN_STEP: calling execute_iteration       │\n"
            f"  │  agent={self.agent_id:<40}\n"
            f"  └──────────────────────────────────────────────┘"
        )
        iteration_result = await self.action_policy.execute_iteration(self.action_policy_state)
        logger.warning(
            f"  ◀ RUN_STEP returned: success={iteration_result.success} "
            f"completed={iteration_result.policy_completed} "
            f"idle={iteration_result.idle} "
            f"action={iteration_result.action_executed}"
        )

        # Policy-driven state transitions
        if iteration_result.policy_completed:
            self.state = AgentState.STOPPED
        elif iteration_result.idle:
            if self.state != AgentState.IDLE:
                self._idle_since = time.time()
            self.state = AgentState.IDLE
        elif self.state == AgentState.IDLE:
            # Policy returned work while we were IDLE — wake up
            self.state = AgentState.RUNNING
            self._idle_since = None

        if self.state == AgentState.SUSPENDED:
            logger.info(f"Agent {self.agent_id} has entered SUSPENDED state")
            # Suspend agent until dependencies resolved
            await self.suspend(reason=f"Blocked: {iteration_result.blocked_reason}")

        if self.state == AgentState.STOPPED:
            logger.info(f"Agent {self.agent_id} has entered STOPPED state")
            # Stop agent gracefully
            await self.stop()

        if self.state == AgentState.IDLE:
            timeout = self.metadata.idle_timeout
            if timeout is not None and hasattr(self, '_idle_since'):
                if (time.time() - self._idle_since) > timeout:
                    logger.info(
                        f"Agent {self.agent_id} idle timeout ({timeout}s) reached, stopping"
                    )
                    await self.stop()
                    return
            await self.on_idle()

        if iteration_result.error_context:
            logger.error(
                f"Agent {self.agent_id} encountered error in run_step: "
                f"{iteration_result.error_context.error_details}"
            )
            # Optionally, could set state to FAILED here
            # self.state = AgentState.FAILED

        # Sleep interval: longer when IDLE to avoid busy-looping
        sleep_interval = (
            self.metadata.idle_sleep_interval
            if self.state == AgentState.IDLE
            else 0.1
        )
        await asyncio.sleep(sleep_interval)

stop() async

Stop agent execution.

This method is @hookable, so capabilities can register BEFORE hooks to flush memories, AFTER hooks for cleanup, etc.

Override in subclasses to perform cleanup logic.

Source code in src/polymathera/colony/agents/base.py
@hookable
async def stop(self) -> None:
    """Stop agent execution.

    This method is @hookable, so capabilities can register BEFORE hooks
    to flush memories, AFTER hooks for cleanup, etc.

    Override in subclasses to perform cleanup logic.
    """
    self._stop_requested = True
    self.state = AgentState.STOPPED
    self._running = False
    self.action_policy_state = None

get_blackboard(scope='shared', scope_id=None, backend_type='redis', enable_events=True) async

Get blackboard for reading/writing shared state.

Parameters:

Name Type Description Default
scope str

Blackboard scope ("local", "shared", "global")

'shared'
scope_id str | None

Scope identifier (defaults to agent_id for shared scope)

None
backend_type str

Backend type for the blackboard (e.g., "redis")

'redis'
enable_events bool

Whether to enable events on the blackboard

True

Returns:

Type Description
EnhancedBlackboard

Blackboard instance

Example
# Get shared blackboard with parent
parent_id = self.metadata.parent_id
board = await self.get_blackboard(scope="shared", scope_id=parent_id)

# Write results
await board.write("analysis_complete", my_results)

# Parent reads
result = await board.read("analysis_complete")
Source code in src/polymathera/colony/agents/base.py
async def get_blackboard(
    self,
    scope: str = "shared",
    scope_id: str | None = None,
    backend_type: str = "redis",
    enable_events: bool = True,
) -> EnhancedBlackboard:
    """Get blackboard for reading/writing shared state.

    Args:
        scope: Blackboard scope (`"local"`, `"shared"`, `"global"`)
        scope_id: Scope identifier (defaults to `agent_id` for shared scope)
        backend_type: Backend type for the blackboard (e.g., "redis")
        enable_events: Whether to enable events on the blackboard

    Returns:
        Blackboard instance

    Example:
        ```python
        # Get shared blackboard with parent
        parent_id = self.metadata.parent_id
        board = await self.get_blackboard(scope="shared", scope_id=parent_id)

        # Write results
        await board.write("analysis_complete", my_results)

        # Parent reads
        result = await board.read("analysis_complete")
        ```
    """
    if not self._manager:
        raise RuntimeError(f"Agent {self.agent_id} not attached to manager")

    # Default scope_id to agent_id for shared scope
    if scope == "shared" and scope_id is None:
        scope_id = self.agent_id

    return await self._manager.get_blackboard(
        scope=scope,
        scope_id=scope_id,
        backend_type=backend_type,
        enable_events=enable_events,
    )