Blackboard¶
EnhancedBlackboard¶
polymathera.colony.agents.blackboard.EnhancedBlackboard(app_name, scope=BlackboardScope.LOCAL, scope_id='default', access_policy=None, eviction_policy=None, validation_policy=None, backend=None, backend_type=None, enable_events=True, max_event_queue_size=1000, max_entries=None)
¶
Production-grade blackboard with all the bells and whistles.
Features: - Pluggable backends (memory, distributed, Redis) - Event-driven notifications via Redis pub-sub - Policy-based customization (access, eviction, validation) - Transactions with optimistic locking - Rich metadata (TTL, tags, versioning) - Efficient backend-specific queries
Example
# Create blackboard with custom policies
board = EnhancedBlackboard(
app_name="my-app",
scope=BlackboardScope.SHARED,
scope_id="team-1",
access_policy=MyAccessPolicy(),
validation_policy=SchemaValidator(MySchema),
)
await board.initialize()
# Write with metadata
await board.write(
"analysis_results",
my_results,
created_by="agent-123",
tags={"analysis", "final"},
ttl_seconds=3600,
)
# Subscribe to changes
async def on_result_updated(event: BlackboardEvent):
print(f"Result updated: {event.value}")
board.subscribe(on_result_updated, filter=KeyPatternFilter("*_results"))
# Atomic transaction
async with board.transaction() as txn:
counter = await txn.read("counter") or 0
await txn.write("counter", counter + 1)
# Query by namespace
results = await board.query(namespace="agent:*:results")
# Introspection
stats = await board.get_statistics()
print(f"Total entries: {stats['entry_count']}")
Initialize enhanced blackboard.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
app_name
|
str
|
Application name for namespacing |
required |
scope
|
BlackboardScope
|
Visibility scope (LOCAL, SHARED, GLOBAL) |
LOCAL
|
scope_id
|
str
|
Scope identifier for SHARED/GLOBAL scopes |
'default'
|
access_policy
|
AccessPolicy | None
|
Policy for access control |
None
|
eviction_policy
|
EvictionPolicy | None
|
Policy for evicting entries when memory is constrained |
None
|
validation_policy
|
ValidationPolicy | None
|
Policy for validating values before write |
None
|
backend
|
BlackboardBackend | None
|
Backend instance (if None, created based on backend_type) |
None
|
backend_type
|
str | None
|
Type of backend to create ("memory", "distributed", "redis") |
None
|
enable_events
|
bool
|
Whether to enable event system |
True
|
max_event_queue_size
|
int
|
Maximum size of event queue |
1000
|
max_entries
|
int | None
|
Maximum number of entries (None = unlimited) |
None
|
Source code in src/polymathera/colony/agents/blackboard/blackboard.py
write(key, value, created_by=None, tags=None, ttl_seconds=None, metadata=None)
async
¶
Write to blackboard with rich metadata.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Key to write |
required |
value
|
Any
|
Value to write |
required |
created_by
|
str | None
|
Agent ID that created this entry |
None
|
tags
|
set[str] | None
|
Tags for querying |
None
|
ttl_seconds
|
float | None
|
Time-to-live in seconds |
None
|
metadata
|
dict[str, Any] | None
|
Additional metadata |
None
|
Raises:
| Type | Description |
|---|---|
PermissionError
|
If access policy denies write |
ValidationError
|
If validation policy rejects value |
Source code in src/polymathera/colony/agents/blackboard/blackboard.py
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 | |
read(key, agent_id=None)
async
¶
Read from blackboard.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Key to read |
required |
agent_id
|
str | None
|
Agent ID requesting read (for access control) |
None
|
Returns:
| Type | Description |
|---|---|
Any | None
|
Value if exists and not expired, None otherwise |
Raises:
| Type | Description |
|---|---|
PermissionError
|
If access policy denies read |
Source code in src/polymathera/colony/agents/blackboard/blackboard.py
query(namespace=None, tags=None, limit=100, offset=0)
async
¶
Query blackboard entries using backend-specific efficient implementation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
str | None
|
Namespace pattern (glob-style, e.g., "agent:*:results") |
None
|
tags
|
set[str] | None
|
Tags to filter by (must match ALL tags) |
None
|
limit
|
int
|
Maximum number of entries to return |
100
|
offset
|
int
|
Offset for pagination |
0
|
Returns:
| Type | Description |
|---|---|
list[BlackboardEntry]
|
List of matching entries |
Source code in src/polymathera/colony/agents/blackboard/blackboard.py
delete(key, agent_id=None)
async
¶
Delete from blackboard.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Key to delete |
required |
agent_id
|
str | None
|
Agent ID requesting delete (for access control) |
None
|
Raises:
| Type | Description |
|---|---|
PermissionError
|
If access policy denies delete |
Source code in src/polymathera/colony/agents/blackboard/blackboard.py
subscribe(callback, filter=None)
¶
Subscribe to blackboard events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable[[BlackboardEvent], Awaitable[None]]
|
Async function to call when event occurs |
required |
filter
|
EventFilter | None
|
Optional filter for events |
None
|
Example
async def on_complete(event: BlackboardEvent): print(f"Agent completed: {event.key}")
blackboard.subscribe(on_complete, filter=KeyPatternFilter("*:complete"))