Skip to content

Memory System

MemoryQuery

polymathera.colony.agents.patterns.memory.types.MemoryQuery

Bases: BaseModel

Query parameters for recalling memories.

Used by MemoryCapability.recall() and AgentContextEngine.gather_context().

Supports three query modes based on which fields are populated: - Semantic: query text for vector similarity search - Logical: tag_filter, time_range, key_pattern for structured filtering - Hybrid: both semantic + logical (results are filtered then ranked by similarity)

The LLM planner constructs MemoryQuery objects. Use list_tags action to discover available tags before constructing tag-based queries.

Example
# Semantic query
MemoryQuery(query="What authentication approach was used?")

# Logical query by tags
MemoryQuery(tag_filter=TagFilter(all_of={"action", "success"}))

# Hybrid: semantic + tag filter
MemoryQuery(
    query="security analysis results",
    tag_filter=TagFilter(any_of={"action_type:infer", "action_type:plan"}),
    max_results=10,
)

has_semantic property

Whether this query requests semantic similarity search.

has_logical property

Whether this query has logical filter constraints.

TagFilter

polymathera.colony.agents.patterns.memory.types.TagFilter

Bases: BaseModel

Logical filter expression over entry tags.

Supports AND (all_of), OR (any_of), NOT (none_of) combinators. The LLM planner constructs these to do precise tag-based retrieval.

Example
# Entries that are actions AND successful
TagFilter(all_of={"action", "success"})

# Entries that are either infer or plan actions
TagFilter(any_of={"action_type:infer", "action_type:plan"})

# Successful actions, excluding infer
TagFilter(all_of={"action", "success"}, none_of={"action_type:infer"})

matches(entry_tags)

Check if a set of entry tags satisfies this filter.

Source code in src/polymathera/colony/agents/patterns/memory/types.py
def matches(self, entry_tags: set[str]) -> bool:
    """Check if a set of entry tags satisfies this filter."""
    if self.all_of and not self.all_of.issubset(entry_tags):
        return False
    if self.any_of and not self.any_of.intersection(entry_tags):
        return False
    if self.none_of and self.none_of.intersection(entry_tags):
        return False
    return True

MemoryCapability

polymathera.colony.agents.patterns.memory.capability.MemoryCapability(agent, scope_id, *, producers=None, ingestion_policy=None, ttl_seconds=None, max_entries=None, storage_backend_factory=None, retrieval_strategy=None, lenses=None, maintenance_policies=None, maintenance_interval_seconds=60.0, utility_scorer=None, maintenance=None, map_to_vcm=False, vcm_config=None, capability_key=None)

Bases: AgentCapability

Unified memory capability: ingestion + storage + maintenance (cognitive processes).

A MemoryCapability manages ONE memory scope. It: 1. INGESTS data from sources (subscriptions + producers) - Zero or more producers (hook-based capture of memory-producing methods) - Zero or more subscriptions (pull/push from other scopes) 2. Transforms ALL inputs together via ingestion_policy.transformer 3. Writes transformed data to THIS scope 4. MAINTAINS this scope in background cognitive processes (decay, prune, dedupe, consolidation) 5. Provides conscious LLM-plannable actions (store, recall, forget, transfer) 6. Provides query interface for recall

Pull model: if another scope wants data from this scope, it subscribes. No push/export logic. Each capability manages its own scope.

This pull model (observer pattern) is motivated by evolutionary history of the brain. - A new memory layer evolves to observe and consolidate data from existing layers. - An existing memory layer does not have to be aware of new layers, which makes the memory system more extensible.

The producers parameter enables hook-based memory capture: the memory capability registers AFTER hooks on specified methods to automatically capture their outputs. This implements the principle: "memory is an observer of agent behavior."

Attributes:

Name Type Description
scope_id

Blackboard scope for this memory level

producers

Hook-based memory capture configs

ingestion_policy

Ingestion policy including memory scopes to listen to and pull data from, and when to trigger ingestion (default: OnDemand) and how to consolidate the entries.

ttl_seconds

Default TTL for stored memories

max_entries

Maximum entries before eviction

retrieval RetrievalStrategy

Strategy for memory retrieval

lenses dict[str, MemoryLens]

Named query configurations

maintenance_policies list[MaintenancePolicy]

List of maintenance policies

utility_scorer UtilityScorer | None

Scorer for memory utility (SEDM-inspired)

Initialize memory capability.

Parameters:

Name Type Description Default
agent 'Agent'

Agent that owns this capability

required
scope_id str

Blackboard scope for this memory level

required
producers list[MemoryProducerConfig] | None

Hook-based memory capture configurations

None
ingestion_policy MemoryIngestPolicy | None

Dataflow edges from other scopes, and when to trigger ingestion (default: OnDemand) and how to consolidate the entries. Each subscription listens and collects entries into pending queue. ALL pending entries are then transformed together before writing to this scope. If transformer=None, entries stored as-is.

None
ttl_seconds float | None

Default TTL for stored memories (None = no expiration)

None
max_entries int | None

Maximum entries before eviction (None = no limit)

None
storage_backend_factory StorageBackendFactory | None

Factory for creating storage backends for any scope (e.g., this scope or other scopes). Defaults to BlackboardStorageBackendFactory.

None
retrieval_strategy RetrievalStrategy | None

Strategy for memory retrieval (default: RecencyRetrieval)

None
lenses list[MemoryLens] | None

Named query configurations

None
maintenance_policies list[MaintenancePolicy] | None

List of maintenance policies to run Use ConsolidationMaintenancePolicy for within-scope consolidation.

None
maintenance_interval_seconds float

How often to run maintenance (default: 60s)

60.0
utility_scorer UtilityScorer | None

Scorer for memory utility

None
maintenance MaintenanceConfig | None

Legacy MaintenanceConfig (deprecated, use maintenance_policies)

None
map_to_vcm bool

If True, map this scope into VCM pages during initialize(). Enables attention-based discovery of this scope's contents.

False
vcm_config MmapConfig | None

Configuration for VCM mapping (controls flushing, locality, etc.) Only used if map_to_vcm=True. Defaults to MmapConfig() if None.

None
capability_key str | None

Override for the capability dict key (allows multiple instances of MemoryCapability with distinct keys).

None
Source code in src/polymathera/colony/agents/patterns/memory/capability.py
def __init__(
    self,
    agent: "Agent",
    scope_id: str,
    *,
    # === INGESTION (Pull data INTO this scope) ===
    producers: list[MemoryProducerConfig] | None = None,
    ingestion_policy: MemoryIngestPolicy | None = None,

    # === STORAGE ===
    ttl_seconds: float | None = None,
    max_entries: int | None = None,
    storage_backend_factory: StorageBackendFactory | None = None,

    # === RETRIEVAL (Retrieval is the bottleneck) ===
    retrieval_strategy: RetrievalStrategy | None = None,
    lenses: list[MemoryLens] | None = None,

    # === MAINTENANCE (Keep this scope healthy) ===
    maintenance_policies: list[MaintenancePolicy] | None = None,
    maintenance_interval_seconds: float = 60.0,

    # === UTILITY SCORING (Memory as self-optimizing) ===
    utility_scorer: UtilityScorer | None = None,

    # === LEGACY COMPAT ===
    maintenance: MaintenanceConfig | None = None,  # TODO: Remove

    # === VCM MAPPING ===
    map_to_vcm: bool = False,
    vcm_config: MmapConfig | None = None,

    # === IDENTITY ===
    capability_key: str | None = None,
):
    """Initialize memory capability.

    Args:
        agent: Agent that owns this capability
        scope_id: Blackboard scope for this memory level

        producers: Hook-based memory capture configurations
        ingestion_policy: Dataflow edges from other scopes, and
            when to trigger ingestion (default: OnDemand) and
            how to consolidate the entries. Each subscription listens
            and collects entries into pending queue. ALL pending
            entries are then transformed together before writing
            to this scope. If transformer=None, entries stored as-is.

        ttl_seconds: Default TTL for stored memories (None = no expiration)
        max_entries: Maximum entries before eviction (None = no limit)
        storage_backend_factory: Factory for creating storage backends for
            any scope (e.g., this scope or other scopes). Defaults to
            BlackboardStorageBackendFactory.

        retrieval_strategy: Strategy for memory retrieval (default: RecencyRetrieval)
        lenses: Named query configurations

        maintenance_policies: List of maintenance policies to run
            Use ConsolidationMaintenancePolicy for within-scope consolidation.
        maintenance_interval_seconds: How often to run maintenance (default: 60s)

        utility_scorer: Scorer for memory utility

        maintenance: Legacy MaintenanceConfig (deprecated, use maintenance_policies)

        map_to_vcm: If True, map this scope into VCM pages during initialize().
            Enables attention-based discovery of this scope's contents.
        vcm_config: Configuration for VCM mapping (controls flushing, locality, etc.)
            Only used if map_to_vcm=True. Defaults to MmapConfig() if None.

        capability_key: Override for the capability dict key (allows multiple
            instances of MemoryCapability with distinct keys).
    """
    super().__init__(agent=agent, scope_id=scope_id, capability_key=capability_key)

    # Ingestion: sources and transformation
    self.producers = producers or []

    # Storage
    self.ttl_seconds = ttl_seconds
    self.max_entries = max_entries
    self._storage_backend = None  # Resolved in initialize()
    self._storage_backend_factory = storage_backend_factory  # Resolved in initialize()

    # Retrieval
    self._retrieval_strategy = retrieval_strategy or RecencyRetrieval()
    self._lenses: dict[str, MemoryLens] = {lens.name: lens for lens in (lenses or [])}

    # Maintenance
    self._maintenance_policies = maintenance_policies
    self._maintenance_interval = maintenance_interval_seconds
    self._legacy_maintenance = maintenance  # For backwards compat

    # VCM mapping
    self.map_to_vcm = map_to_vcm
    self.vcm_config = vcm_config

    # Ingestion policy (when to process pending entries)
    self._ingestion_policy = ingestion_policy or MemoryIngestPolicy()

    # Utility
    self._utility_scorer = utility_scorer

    # Pending entries from subscriptions (collected, not yet transformed)
    self._pending_entries: list[PendingEntry] = []

    # Background task handles
    self._maintenance_task: asyncio.Task | None = None
    self._subscription_tasks: list[asyncio.Task] = []
    self._ingestion_task: asyncio.Task | None = None  # Checks policy, triggers ingestion
    self._producer_hook_ids: list[str] = []  # Track registered hooks for cleanup
    self._running = False
    self._initialized = False

    # Track last maintenance run times
    self._last_maintenance_run: dict[int, float] = {}
    self._last_ingestion_time: float | None = None

recall(query=None, lens=None, context=None) async

Recall memories (conscious read).

Parameters:

Name Type Description Default
query MemoryQuery | str | None

Query string or MemoryQuery object. LLM planners typically pass a plain string which is auto-wrapped.

None
lens str | None

Name of a predefined lens to apply

None
context RetrievalContext | None

Retrieval context (goal, agent state) for relevance

None

Returns:

Type Description
list[BlackboardEntry]

List of matching BlackboardEntry objects

Source code in src/polymathera/colony/agents/patterns/memory/capability.py
@action_executor(action_key="memory_recall", planning_summary="Recall memories. Supports semantic search (query text), logical filters (tag_filter with all_of/any_of/none_of), or hybrid. Use list_tags first to discover available tags.")
async def recall(
    self,
    query: MemoryQuery | str | None = None,
    lens: str | None = None,
    context: RetrievalContext | None = None,
) -> list[BlackboardEntry]:
    """Recall memories (conscious read).

    Args:
        query: Query string or MemoryQuery object.
            LLM planners typically pass a plain string which is auto-wrapped.
        lens: Name of a predefined lens to apply
        context: Retrieval context (goal, agent state) for relevance

    Returns:
        List of matching BlackboardEntry objects
    """
    scored_entries = await self.recall_with_scores(
        query=query,
        lens=lens,
        context=context,
    )
    return [se.entry for se in scored_entries]

recall_with_scores(query=None, lens=None, context=None) async

Recall memories (conscious read) with detailed scoring for debugging/analysis.

This action is exposed to the agent action policy for memory retrieval. The agent can search for relevant memories using semantic similarity, tags, or recency filtering.

Supports: - Query-based filtering (tags, recency, etc.) - Lens-based views (predefined query configurations) - Goal-aware retrieval via RetrievalContext

Parameters:

Name Type Description Default
query MemoryQuery | str | None

Query string or MemoryQuery object for filtering/ranking. LLM planners typically pass a plain string which is auto-wrapped.

None
lens str | None

Name of a predefined lens to apply

None
context RetrievalContext | None

Retrieval context (goal, agent state) for relevance

None

Returns:

Type Description
list[ScoredEntry]

List of matching ScoredEntry objects with score breakdowns

Source code in src/polymathera/colony/agents/patterns/memory/capability.py
@action_executor(action_key="memory_recall_with_scores", planning_summary="Recall memories with relevance scores. Supports semantic search (query text), logical filters (tag_filter), or hybrid. Use list_tags first to discover available tags.")
async def recall_with_scores(
    self,
    query: MemoryQuery | str | None = None,
    lens: str | None = None,
    context: RetrievalContext | None = None,
) -> list[ScoredEntry]:
    """Recall memories (conscious read) with detailed scoring for debugging/analysis.

    This action is exposed to the agent action policy for memory retrieval. The agent
    can search for relevant memories using semantic similarity, tags,
    or recency filtering.

    Supports:
    - Query-based filtering (tags, recency, etc.)
    - Lens-based views (predefined query configurations)
    - Goal-aware retrieval via RetrievalContext

    Args:
        query: Query string or MemoryQuery object for filtering/ranking.
            LLM planners typically pass a plain string which is auto-wrapped.
        lens: Name of a predefined lens to apply
        context: Retrieval context (goal, agent state) for relevance

    Returns:
        List of matching ScoredEntry objects with score breakdowns
    """
    if isinstance(query, str):
        query = MemoryQuery(query=query)
    elif isinstance(query, dict):
        query = MemoryQuery(**query)
    effective_query = query or MemoryQuery()

    # Apply lens if specified
    if lens and lens in self._lenses:
        effective_query = self._apply_lens(self._lenses[lens], effective_query)

    # Use retrieval strategy
    scored_entries = await self.retrieval.retrieve(
        query=effective_query,
        backend=self.storage,
        context=context,
    )

    # Track access if configured
    if self._legacy_maintenance and self._legacy_maintenance.track_access:
        for se in scored_entries[:effective_query.max_results]:
            await self._track_access(se.entry.key)

    return scored_entries

list_tags() async

List all unique tags and their counts in this scope.

Returns:

Type Description
dict[str, int]

Dict mapping tag name to entry count, sorted by count descending.

Source code in src/polymathera/colony/agents/patterns/memory/capability.py
@action_executor(
    action_key="memory_list_tags",
    planning_summary=(
        "List all tags stored in this memory scope with their counts. "
        "Use this BEFORE constructing tag-based queries to discover available tags."
    ),
)
async def list_tags(self) -> dict[str, int]:
    """List all unique tags and their counts in this scope.

    Returns:
        Dict mapping tag name to entry count, sorted by count descending.
    """
    entries = await self.storage.query(limit=100000) # TODO: WTF? There are better ways.
    tag_counts: dict[str, int] = {}
    for entry in entries:
        for tag in entry.tags:
            tag_counts[tag] = tag_counts.get(tag, 0) + 1
    return dict(sorted(tag_counts.items(), key=lambda x: -x[1]))

stats() async

Get statistics about stored memories.

Returns:

Type Description
dict[str, Any]

Dict with entry_count, unique_tags, oldest/newest entry age,

dict[str, Any]

and top 20 tags by count.

Source code in src/polymathera/colony/agents/patterns/memory/capability.py
@action_executor(
    action_key="memory_stats",
    planning_summary=(
        "Get statistics about this memory scope: entry count, "
        "tag distribution, age range."
    ),
)
async def stats(self) -> dict[str, Any]:
    """Get statistics about stored memories.

    Returns:
        Dict with entry_count, unique_tags, oldest/newest entry age,
        and top 20 tags by count.
    """
    entries = await self.storage.query(limit=100000) # TODO: WTF? There are better ways.
    if not entries:
        return {"entry_count": 0}

    now = time.time()
    tag_counts: dict[str, int] = {}
    for entry in entries:
        for tag in entry.tags:
            tag_counts[tag] = tag_counts.get(tag, 0) + 1

    ages = [now - e.created_at for e in entries]
    top_tags = dict(sorted(tag_counts.items(), key=lambda x: -x[1])[:20])

    return {
        "entry_count": len(entries),
        "unique_tags": len(tag_counts),
        "oldest_entry_age_seconds": round(max(ages), 1),
        "newest_entry_age_seconds": round(min(ages), 1),
        "tag_counts": top_tags,
    }