Skip to content

Blackboard

EnhancedBlackboard

polymathera.colony.agents.blackboard.EnhancedBlackboard(app_name, scope=BlackboardScope.LOCAL, scope_id='default', access_policy=None, eviction_policy=None, validation_policy=None, backend=None, backend_type=None, enable_events=True, max_event_queue_size=1000, max_entries=None)

Production-grade blackboard with all the bells and whistles.

Features: - Pluggable backends (memory, distributed, Redis) - Event-driven notifications via Redis pub-sub - Policy-based customization (access, eviction, validation) - Transactions with optimistic locking - Rich metadata (TTL, tags, versioning) - Efficient backend-specific queries

Example
# Create blackboard with custom policies
board = EnhancedBlackboard(
    app_name="my-app",
    scope=BlackboardScope.SHARED,
    scope_id="team-1",
    access_policy=MyAccessPolicy(),
    validation_policy=SchemaValidator(MySchema),
)
await board.initialize()

# Write with metadata
await board.write(
    "analysis_results",
    my_results,
    created_by="agent-123",
    tags={"analysis", "final"},
    ttl_seconds=3600,
)

# Subscribe to changes
async def on_result_updated(event: BlackboardEvent):
    print(f"Result updated: {event.value}")

board.subscribe(on_result_updated, filter=KeyPatternFilter("*_results"))

# Atomic transaction
async with board.transaction() as txn:
    counter = await txn.read("counter") or 0
    await txn.write("counter", counter + 1)

# Query by namespace
results = await board.query(namespace="agent:*:results")

# Introspection
stats = await board.get_statistics()
print(f"Total entries: {stats['entry_count']}")

Initialize enhanced blackboard.

Parameters:

Name Type Description Default
app_name str

Application name for namespacing

required
scope BlackboardScope

Visibility scope (LOCAL, SHARED, GLOBAL)

LOCAL
scope_id str

Scope identifier for SHARED/GLOBAL scopes

'default'
access_policy AccessPolicy | None

Policy for access control

None
eviction_policy EvictionPolicy | None

Policy for evicting entries when memory is constrained

None
validation_policy ValidationPolicy | None

Policy for validating values before write

None
backend BlackboardBackend | None

Backend instance (if None, created based on backend_type)

None
backend_type str | None

Type of backend to create ("memory", "distributed", "redis")

None
enable_events bool

Whether to enable event system

True
max_event_queue_size int

Maximum size of event queue

1000
max_entries int | None

Maximum number of entries (None = unlimited)

None
Source code in src/polymathera/colony/agents/blackboard/blackboard.py
def __init__(
    self,
    app_name: str,
    scope: BlackboardScope = BlackboardScope.LOCAL,
    scope_id: str = "default",
    # Policy customization points
    access_policy: AccessPolicy | None = None,
    eviction_policy: EvictionPolicy | None = None,
    validation_policy: ValidationPolicy | None = None,
    # Backend selection
    backend: BlackboardBackend | None = None,
    backend_type: str | None = None,  # "memory", "distributed", "redis"
    # Event system
    enable_events: bool = True,
    max_event_queue_size: int = 1000,
    # Resource limits
    max_entries: int | None = None,
):
    """Initialize enhanced blackboard.

    Args:
        app_name: Application name for namespacing
        scope: Visibility scope (LOCAL, SHARED, GLOBAL)
        scope_id: Scope identifier for SHARED/GLOBAL scopes
        access_policy: Policy for access control
        eviction_policy: Policy for evicting entries when memory is constrained
        validation_policy: Policy for validating values before write
        backend: Backend instance (if None, created based on backend_type)
        backend_type: Type of backend to create ("memory", "distributed", "redis")
        enable_events: Whether to enable event system
        max_event_queue_size: Maximum size of event queue
        max_entries: Maximum number of entries (None = unlimited)
    """
    self.app_name = app_name
    self.scope = scope
    self.scope_id = scope_id

    # Policies (use defaults if not provided)

    self.access_policy = access_policy or NoOpAccessPolicy()
    self.eviction_policy = eviction_policy or LRUEvictionPolicy()
    self.validation_policy = validation_policy or NoOpValidationPolicy()

    # Storage backend
    self.backend = backend
    self.backend_type = backend_type

    # Event system
    self.event_bus = (
        EventBus(
            app_name=app_name,
            scope=scope.value,
            scope_id=scope_id,
            max_queue_size=max_event_queue_size,
            distributed=(scope != BlackboardScope.LOCAL),
        )
        if enable_events
        else None
    )

    # Resource limits
    self.max_entries = max_entries

    self._initialized = False
    self._subscribed_callbacks: list[Callable[[BlackboardEvent], None]] = []

write(key, value, created_by=None, tags=None, ttl_seconds=None, metadata=None) async

Write to blackboard with rich metadata.

Parameters:

Name Type Description Default
key str

Key to write

required
value Any

Value to write

required
created_by str | None

Agent ID that created this entry

None
tags set[str] | None

Tags for querying

None
ttl_seconds float | None

Time-to-live in seconds

None
metadata dict[str, Any] | None

Additional metadata

None

Raises:

Type Description
PermissionError

If access policy denies write

ValidationError

If validation policy rejects value

Source code in src/polymathera/colony/agents/blackboard/blackboard.py
async def write(
    self,
    key: str,
    value: Any,
    created_by: str | None = None,
    tags: set[str] | None = None,
    ttl_seconds: float | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Write to blackboard with rich metadata.

    Args:
        key: Key to write
        value: Value to write
        created_by: Agent ID that created this entry
        tags: Tags for querying
        ttl_seconds: Time-to-live in seconds
        metadata: Additional metadata

    Raises:
        PermissionError: If access policy denies write
        ValidationError: If validation policy rejects value
    """
    await self.initialize()

    # Ambient transaction routing: buffer writes and commit atomically on __aexit__.
    if _get_ambient_tx() is not None:
        await self.write_tx(
            key=key,
            value=value,
            created_by=created_by,
            tags=tags,
            ttl_seconds=ttl_seconds,
            metadata=metadata,
        )
        return

    # Access control
    if self.access_policy and not await self.access_policy.can_write(
        created_by or "unknown", key, value, self.scope_id
    ):
        raise PermissionError(f"Access denied for write: {key}")

    # Validation
    if self.validation_policy:
        await self.validation_policy.validate(key, value, metadata or {})

    # Check resource limits
    await self._check_resource_limits()

    # Auto-include session_id for distributed traceability
    # This ensures ALL blackboard writes include session_id when available
    from ..sessions.context import get_current_session_id
    session_id = (metadata or {}).get("session_id") or get_current_session_id()

    effective_metadata = metadata or {}
    if session_id and "session_id" not in effective_metadata:
        effective_metadata = {**effective_metadata, "session_id": session_id}

    effective_tags = tags or set()
    if session_id:
        effective_tags = effective_tags | {f"session:{session_id}"}

    # Get existing entry from backend
    old_entry = await self.backend.read(key)
    old_value = old_entry.value if old_entry else None

    # Create new entry
    now = time.time()
    entry = BlackboardEntry(
        key=key,
        value=value,
        version=(old_entry.version + 1) if old_entry else 0,
        created_at=old_entry.created_at if old_entry else now,
        updated_at=now,
        created_by=old_entry.created_by if old_entry else created_by,
        updated_by=created_by,
        ttl_seconds=ttl_seconds,
        tags=effective_tags,
        metadata=effective_metadata,
    )

    # Store in backend
    await self.backend.write(key, entry)

    # Emit event with session_id included
    await self.emit_event(
        BlackboardEvent(
            event_type="write",
            key=key,
            value=value,
            version=entry.version,
            old_value=old_value,
            agent_id=created_by,
            tags=effective_tags,
            metadata=effective_metadata,
        )
    )

    logger.debug(f"Wrote to blackboard: {key} (version={entry.version})")

read(key, agent_id=None) async

Read from blackboard.

Parameters:

Name Type Description Default
key str

Key to read

required
agent_id str | None

Agent ID requesting read (for access control)

None

Returns:

Type Description
Any | None

Value if exists and not expired, None otherwise

Raises:

Type Description
PermissionError

If access policy denies read

Source code in src/polymathera/colony/agents/blackboard/blackboard.py
async def read(self, key: str, agent_id: str | None = None) -> Any | None:
    """Read from blackboard.

    Args:
        key: Key to read
        agent_id: Agent ID requesting read (for access control)

    Returns:
        Value if exists and not expired, None otherwise

    Raises:
        PermissionError: If access policy denies read
    """
    await self.initialize()

    # Ambient transaction routing: read from transaction buffers when active.
    if _get_ambient_tx() is not None:
        return await self.read_tx(key, agent_id=agent_id)

    # Access control
    if self.access_policy and not await self.access_policy.can_read(
        agent_id or "unknown", key, self.scope_id
    ):
        raise PermissionError(f"Access denied for read: {key}")

    entry = await self.backend.read(key)
    if entry is None:
        return None

    # Check TTL (backend-specific, but we double-check here)
    if entry.ttl_seconds is not None:
        if time.time() - entry.created_at > entry.ttl_seconds:
            # Expired
            await self.delete(key, agent_id=agent_id)
            return None

    return entry.value

query(namespace=None, tags=None, limit=100, offset=0) async

Query blackboard entries using backend-specific efficient implementation.

Parameters:

Name Type Description Default
namespace str | None

Namespace pattern (glob-style, e.g., "agent:*:results")

None
tags set[str] | None

Tags to filter by (must match ALL tags)

None
limit int

Maximum number of entries to return

100
offset int

Offset for pagination

0

Returns:

Type Description
list[BlackboardEntry]

List of matching entries

Source code in src/polymathera/colony/agents/blackboard/blackboard.py
async def query(
    self,
    namespace: str | None = None,
    tags: set[str] | None = None,  # TODO: Tags should be a dict with keys and values
    limit: int = 100,
    offset: int = 0,
) -> list[BlackboardEntry]:
    """Query blackboard entries using backend-specific efficient implementation.

    Args:
        namespace: Namespace pattern (glob-style, e.g., "agent:*:results")
        tags: Tags to filter by (must match ALL tags)
        limit: Maximum number of entries to return
        offset: Offset for pagination

    Returns:
        List of matching entries
    """
    await self.initialize()

    # Delegate to backend's efficient query implementation
    return await self.backend.query(
        namespace=namespace,
        tags=tags,
        limit=limit,
        offset=offset,
    )

delete(key, agent_id=None) async

Delete from blackboard.

Parameters:

Name Type Description Default
key str

Key to delete

required
agent_id str | None

Agent ID requesting delete (for access control)

None

Raises:

Type Description
PermissionError

If access policy denies delete

Source code in src/polymathera/colony/agents/blackboard/blackboard.py
async def delete(self, key: str, agent_id: str | None = None) -> None:
    """Delete from blackboard.

    Args:
        key: Key to delete
        agent_id: Agent ID requesting delete (for access control)

    Raises:
        PermissionError: If access policy denies delete
    """
    await self.initialize()

    # Ambient transaction routing: buffer delete and commit atomically on __aexit__.
    if _get_ambient_tx() is not None:
        await self.delete_tx(key, agent_id=agent_id)
        return

    # Access control
    if self.access_policy and not await self.access_policy.can_delete(
        agent_id or "unknown", key, self.scope_id
    ):
        raise PermissionError(f"Access denied for delete: {key}")

    entry = await self.backend.read(key)
    if entry is None:
        return

    # Delete from backend
    await self.backend.delete(key)

    # Emit event
    await self.emit_event(
        BlackboardEvent(
            event_type="delete",
            key=key,
            value=None,
            version=entry.version,
            old_value=entry.value,
            tags=entry.tags,
            metadata=entry.metadata or {},
            agent_id=agent_id,
        )
    )

    logger.debug(f"Deleted from blackboard: {key}")

subscribe(callback, filter=None)

Subscribe to blackboard events.

Parameters:

Name Type Description Default
callback Callable[[BlackboardEvent], Awaitable[None]]

Async function to call when event occurs

required
filter EventFilter | None

Optional filter for events

None
Example

async def on_complete(event: BlackboardEvent): print(f"Agent completed: {event.key}")

blackboard.subscribe(on_complete, filter=KeyPatternFilter("*:complete"))

Source code in src/polymathera/colony/agents/blackboard/blackboard.py
def subscribe(
    self,
    callback: Callable[[BlackboardEvent], Awaitable[None]],
    filter: EventFilter | None = None,
) -> None:
    """Subscribe to blackboard events.

    Args:
        callback: Async function to call when event occurs
        filter: Optional filter for events

    Example:
        async def on_complete(event: BlackboardEvent):
            print(f"Agent completed: {event.key}")

        blackboard.subscribe(on_complete, filter=KeyPatternFilter("*:complete"))
    """
    if self.event_bus is None:
        raise RuntimeError(
            "Event system not enabled. Set enable_events=True when creating blackboard."
        )

    self.event_bus.subscribe(callback, filter)