Skip to content

Action Policies

Base Action Policy

polymathera.colony.agents.patterns.actions.policies.BaseActionPolicy(agent, action_map=None, action_providers=[], io=None)

Bases: ActionPolicy

Base class for action policies with dataflow and nested policy support.

Provides: - Automatic action dispatcher creation - Integration with agent capabilities - Nested policy execution with scope inheritance - Dispatch with automatic Ref resolution

Subclasses implement plan_step to produce the next action or child policy. The base execute_iteration handles: - Delegating to active child policies - Executing actions returned by plan_step - Setting up child policies returned by plan_step

TODO: For example, we can orchestrate iterative reasoning to follow the pattern (PLAN → ACT → REFLECT → CRITIQUE → ADAPT) by adding AgentCapabilities that implement each step as an action executor, and then implementing plan_step to select the next action based on the current state. This can be enforced by: - Restricting available actions in the action dispatcher depending on the last completed step, or - Using an ActionPolicy subclass that implements the iterative pattern by overriding execute_iteration to enforce the sequence of steps, and only calling plan_step to get parameters for each step, or - Prompting the LLM planner with this workflow.

Example
class MyPolicy(BaseActionPolicy):
    io = ActionPolicyIO(
        inputs={"query": str},
        outputs={"result": dict}
    )

    async def plan_step(self, state) -> Action | None:
        # Return None when policy is complete
        if state.custom.get("done"):
            return None

        # Return an Action to execute
        return Action(
            action_id="analyze_001",
            action_type="analyze",
            parameters={"query": state.scope.get("query")}
        )

        # Or return an ActionPolicy for nested execution
        # return ChildPolicy(self.agent)
Source code in src/polymathera/colony/agents/patterns/actions/policies.py
def __init__(
    self,
    agent: Agent,
    action_map: list[ActionGroup] | None = None,
    action_providers: list[Any] = [],
    io: ActionPolicyIO | None = None, # Declare I/O contract (override in subclasses)
):
    super().__init__(agent)
    self._action_map = action_map
    self._action_providers = action_providers
    self._action_dispatcher: ActionDispatcher | None = None
    self.io: ActionPolicyIO = io or ActionPolicyIO()

execute_iteration(state) async

Execute one iteration of this policy.

This method is @hookable so memory capabilities can observe iterations.

Calls plan_step to get next action, then dispatches it.

For hierarchical composition (nested policies), spawn child agents instead of nesting policies. Use self.agent.spawn_child_agents().

Parameters:

Name Type Description Default
state ActionPolicyExecutionState

Execution state for this policy (all mutable state lives here)

required

Returns:

Type Description
ActionPolicyIterationResult

Iteration result

Source code in src/polymathera/colony/agents/patterns/actions/policies.py
@hookable
@override
async def execute_iteration(
    self,
    state: ActionPolicyExecutionState
) -> ActionPolicyIterationResult:
    """Execute one iteration of this policy.

    This method is @hookable so memory capabilities can observe iterations.

    Calls `plan_step` to get next action, then dispatches it.

    For hierarchical composition (nested policies), spawn child agents
    instead of nesting policies. Use `self.agent.spawn_child_agents()`.

    Args:
        state: Execution state for this policy (all mutable state lives here)

    Returns:
        Iteration result
    """

    # TODO: Add iteration time limit check here
    ### if len(self.iteration_history) >= self.max_iterations:
    ###     logger.info("Analysis complete (max iterations reached)")
    ###     return ActionPolicyIterationResult(
    ###         success=True,
    ###         policy_completed=True
    ###     )

    # Set up session_id context for the ENTIRE iteration
    # Session_id may be set by a previous iteration's plan_step() from event metadata
    # This ensures all memory operations, hooks, and capabilities have session_id
    from ...sessions.context import session_id_context
    current_session_id = state.custom.get("current_session_id")

    with session_id_context(current_session_id):
        # Ensure dispatcher is initialized
        await self._create_action_dispatcher()

        # Get next action from subclass (plan_step may update current_session_id)
        logger.warning(
            f"\n"
            f"    ┌────────────────────────────────────────────┐\n"
            f"    │  ⚙ EXEC_ITER: calling plan_step            │\n"
            f"    │  agent={self.agent.agent_id:<38}\n"
            f"    └────────────────────────────────────────────┘"
        )
        next_action = await self.plan_step(state)
        logger.warning(f"    ⚙ EXEC_ITER: plan_step returned → {type(next_action).__name__}: {next_action}")

        # Re-check session_id in case plan_step updated it from a new event
        updated_session_id = state.custom.get("current_session_id")
        if updated_session_id != current_session_id:
            # Session changed mid-iteration, update context for dispatch
            # This handles the case where plan_step processes a new event with different session_id
            from ...sessions.context import set_current_session_id
            set_current_session_id(updated_session_id)

        if next_action is None:
            # Check if policy signaled completion
            if state.custom.get("policy_complete"):
                logger.warning("    ⚙ EXEC_ITER: policy_complete=True → TERMINATING")
                return ActionPolicyIterationResult(
                    success=True,
                    policy_completed=True
                )

            # Check if policy signaled idle (no work, but not completed)
            if state.custom.get("idle"):
                logger.warning("    ⚙ EXEC_ITER: idle=True → IDLE")
                return ActionPolicyIterationResult(
                    success=True,
                    policy_completed=False,
                    idle=True,
                )

            # Otherwise just skip this iteration (policy continues)
            logger.warning("    ⚙ EXEC_ITER: next_action=None → skipping iteration")
            return ActionPolicyIterationResult(
                success=True,
                policy_completed=False
            )

        # dispatch is @hookable, memory captures action there
        logger.warning(
            f"\n"
            f"    ╔════════════════════════════════════════════╗\n"
            f"    ║  🚀 DISPATCHING ACTION                    ║\n"
            f"    ║  id={next_action.action_id:<40}\n"
            f"    ║  type={next_action.action_type:<38}\n"
            f"    ╚════════════════════════════════════════════╝"
        )
        result = await self.dispatch(next_action)
        logger.warning(f"    🚀 DISPATCH returned: success={result.success}")

        return ActionPolicyIterationResult(
            success=result.success,
            policy_completed=False,
            action_executed=next_action,
            result=result,
        )

plan_step(state) async

Produce the next action to execute.

Override this method to implement policy-specific planning logic.

For hierarchical composition, spawn child agents instead of nesting policies. Use self.agent.spawn_child_agents() with appropriate action policies for child agents.

Parameters:

Name Type Description Default
state ActionPolicyExecutionState

Execution state for this policy

required

Returns:

Type Description
Action | None
  • Action: Execute this action
Action | None
  • None: Skip this iteration. Set state.custom["policy_complete"] = True before returning None to signal that the policy is finished.
Example
async def plan_step(self, state) -> Action | None:
    phase = state.custom.get("phase", "act")

    if phase == "act":
        action = self._get_next_action(state)
        if action is None:
            state.custom["policy_complete"] = True
            return None
        state.custom["phase"] = "process"
        return action

    elif phase == "process":
        # Do some processing without dispatching an action
        self._process_results(state)
        state.custom["phase"] = "act"
        return None  # Skip iteration, continue policy
Source code in src/polymathera/colony/agents/patterns/actions/policies.py
async def plan_step(
    self,
    state: ActionPolicyExecutionState
) -> Action | None:
    """Produce the next action to execute.

    Override this method to implement policy-specific planning logic.

    For hierarchical composition, spawn child agents instead of nesting
    policies. Use `self.agent.spawn_child_agents()` with appropriate
    action policies for child agents.

    Args:
        state: Execution state for this policy

    Returns:
        - Action: Execute this action
        - None: Skip this iteration. Set `state.custom["policy_complete"] = True`
            before returning None to signal that the policy is finished.

    Example:
        ```python
        async def plan_step(self, state) -> Action | None:
            phase = state.custom.get("phase", "act")

            if phase == "act":
                action = self._get_next_action(state)
                if action is None:
                    state.custom["policy_complete"] = True
                    return None
                state.custom["phase"] = "process"
                return action

            elif phase == "process":
                # Do some processing without dispatching an action
                self._process_results(state)
                state.custom["phase"] = "act"
                return None  # Skip iteration, continue policy
        ```
    """
    raise NotImplementedError(
        f"{self.__class__.__name__} must implement plan_step"
    )

action_executor decorator

polymathera.colony.agents.patterns.actions.policies.action_executor(action_key=None, *, input_schema=None, output_schema=None, reads=None, writes=None, exclude_from_planning=False, planning_summary=None, tags=None)

Decorator to turn any method into an action executor.

Automatically infers input/output schemas from type hints if not provided.

Parameters:

Name Type Description Default
action_key str | ActionType | None

Key identifying the action type. If None, uses method name.

None
input_schema type[BaseModel] | None

Optional Pydantic model for input validation. If None, inferred from method signature.

None
output_schema type[BaseModel] | None

Optional Pydantic model for output validation. If None, inferred from return type hint.

None
reads list[str] | None

List of scope variable names this action reads.

None
writes list[str] | None

List of scope variable names this action writes.

None
exclude_from_planning bool

If True, this action is not exposed to the LLM planner. Use this for actions that are only meant to be invoked programmatically in response to events (e.g., game moves in response to spawned agent events). Default is False.

False
tags frozenset[str] | None

Optional domain/modality tags for this action (e.g., frozenset({"memory", "expensive"})). Used for future per-action tag-based filtering and grouping.

None
Example
@action_executor()
async def route_query(
    self,
    query: str,
    max_results: int = 10
) -> list[str]:
    '''Route query to find relevant pages.'''
    ...

@action_executor(writes=["analysis_result"])
async def analyze_pages(
    self,
    page_ids: list[str],
    goal: str
) -> AnalysisResult:
    '''Analyze pages for the given goal.'''
    ...

# Event-driven action not visible to planner
@action_executor(exclude_from_planning=True)
async def submit_move(self, game_id: str, move: dict) -> None:
    '''Submit move in response to game event.'''
    ...
Source code in src/polymathera/colony/agents/patterns/actions/policies.py
def action_executor(
    action_key: str | ActionType | None = None,
    *,
    input_schema: type[BaseModel] | None = None,
    output_schema: type[BaseModel] | None = None,
    reads: list[str] | None = None,
    writes: list[str] | None = None,
    exclude_from_planning: bool = False,
    planning_summary: str | None = None,
    tags: frozenset[str] | None = None,
):
    """Decorator to turn any method into an action executor.

    Automatically infers input/output schemas from type hints if not provided.

    Args:
        action_key: Key identifying the action type. If None, uses method name.
        input_schema: Optional Pydantic model for input validation.
            If None, inferred from method signature.
        output_schema: Optional Pydantic model for output validation.
            If None, inferred from return type hint.
        reads: List of scope variable names this action reads.
        writes: List of scope variable names this action writes.
        exclude_from_planning: If True, this action is not exposed to the LLM
            planner. Use this for actions that are only meant to be invoked
            programmatically in response to events (e.g., game moves in response
            to spawned agent events). Default is False.
        tags: Optional domain/modality tags for this action (e.g., frozenset({"memory", "expensive"})).
            Used for future per-action tag-based filtering and grouping.

    Example:
        ```python
        @action_executor()
        async def route_query(
            self,
            query: str,
            max_results: int = 10
        ) -> list[str]:
            '''Route query to find relevant pages.'''
            ...

        @action_executor(writes=["analysis_result"])
        async def analyze_pages(
            self,
            page_ids: list[str],
            goal: str
        ) -> AnalysisResult:
            '''Analyze pages for the given goal.'''
            ...

        # Event-driven action not visible to planner
        @action_executor(exclude_from_planning=True)
        async def submit_move(self, game_id: str, move: dict) -> None:
            '''Submit move in response to game event.'''
            ...
        ```
    """
    def decorator(func):
        # Store action key
        func._action_key = action_key or func.__name__

        # Infer schemas from type hints if not provided
        func._action_input_schema = input_schema or _infer_input_schema(func)
        func._action_output_schema = output_schema or _infer_output_schema(func)

        # Store scope read/write declarations
        func._action_reads = reads or []
        func._action_writes = writes or []

        # Store planning visibility flag
        func._action_exclude_from_planning = exclude_from_planning

        # Store concise planning summary (used instead of full docstring in prompts)
        func._action_planning_summary = planning_summary

        # Store tags for future per-action tag-based filtering
        func._action_tags = tags or frozenset()

        return func

    return decorator