-
Notifications
You must be signed in to change notification settings - Fork 108
feat: add async support to MemorySessionManager #478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
nborges-aws
wants to merge
1
commit into
main
Choose a base branch
from
fireAndForget
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+218
−4
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| """AgentCore Memory-based session manager for Bedrock AgentCore Memory integration.""" | ||
|
|
||
| import asyncio | ||
| import json | ||
| import logging | ||
| import threading | ||
|
|
@@ -10,7 +11,13 @@ | |
|
|
||
| import boto3 | ||
| from botocore.config import Config as BotocoreConfig | ||
| from strands.experimental.hooks.multiagent.events import ( | ||
| AfterMultiAgentInvocationEvent, | ||
| AfterNodeCallEvent, | ||
| MultiAgentInitializedEvent, | ||
| ) | ||
| from strands.hooks import AfterInvocationEvent, MessageAddedEvent | ||
| from strands.hooks.events import AgentInitializedEvent | ||
| from strands.hooks.registry import HookRegistry | ||
| from strands.session.repository_session_manager import RepositorySessionManager | ||
| from strands.session.session_repository import SessionRepository | ||
|
|
@@ -906,16 +913,79 @@ def retrieve_for_namespace(namespace: str, retrieval_config: RetrievalConfig): | |
| def register_hooks(self, registry: HookRegistry, **kwargs) -> None: | ||
| """Register additional hooks. | ||
|
|
||
| In sync mode (the default), delegates to the base class and adds the | ||
| retrieve_customer_context + batching callbacks synchronously, preserving | ||
| existing behavior exactly. | ||
|
|
||
| In async mode, registers async callbacks that wrap every per-turn | ||
| boto3-backed operation (append_message, sync_agent, buffer flushes, | ||
| customer-context retrieval) with asyncio.to_thread, so the asyncio | ||
| event loop stays free while boto3 is blocking on the network. | ||
|
|
||
| Note: AgentInitializedEvent cannot be async per Strands' HookRegistry, | ||
| so agent restoration (read_session / read_agent / list_messages) still | ||
| blocks the calling thread in async mode — see AgentCoreMemoryConfig | ||
| docstring for mitigations. | ||
|
|
||
| Args: | ||
| registry (HookRegistry): The hook registry to register callbacks with. | ||
| **kwargs: Additional keyword arguments. | ||
| """ | ||
| RepositorySessionManager.register_hooks(self, registry, **kwargs) | ||
| registry.add_callback(MessageAddedEvent, lambda event: self.retrieve_customer_context(event)) | ||
| if not self.config.async_mode: | ||
| RepositorySessionManager.register_hooks(self, registry, **kwargs) | ||
| registry.add_callback(MessageAddedEvent, lambda event: self.retrieve_customer_context(event)) | ||
|
|
||
| # Only register AfterInvocationEvent hook when batching is enabled | ||
| if self.config.batch_size > 1: | ||
| registry.add_callback(AfterInvocationEvent, lambda event: self._flush_messages()) | ||
| return | ||
|
|
||
| # Async mode: register async callbacks that offload the existing sync | ||
| # methods to a worker thread via asyncio.to_thread. AgentInitializedEvent | ||
| # must stay sync (Strands disallows async callbacks on this event; see | ||
| # strands/hooks/registry.py:174). | ||
| logger.warning( | ||
| "AgentCoreMemorySessionManager async_mode=True: the agent must be invoked " | ||
| "via the async path (e.g. agent.stream_async(...) or agent.invoke_async(...)). " | ||
| "Sync invocation will raise RuntimeError from Strands' hook registry." | ||
| ) | ||
|
|
||
| registry.add_callback(AgentInitializedEvent, lambda event: self.initialize(event.agent)) | ||
|
|
||
| async def _on_message_added_persist(event: MessageAddedEvent) -> None: | ||
| await asyncio.to_thread(self.append_message, event.message, event.agent) | ||
| await asyncio.to_thread(self.sync_agent, event.agent) | ||
|
|
||
| async def _on_message_added_retrieve(event: MessageAddedEvent) -> None: | ||
| await asyncio.to_thread(self.retrieve_customer_context, event) | ||
|
|
||
| async def _on_after_invocation_sync(event: AfterInvocationEvent) -> None: | ||
| await asyncio.to_thread(self.sync_agent, event.agent) | ||
|
|
||
| registry.add_callback(MessageAddedEvent, _on_message_added_persist) | ||
| registry.add_callback(AfterInvocationEvent, _on_after_invocation_sync) | ||
| registry.add_callback(MessageAddedEvent, _on_message_added_retrieve) | ||
|
|
||
| # Only register AfterInvocationEvent hook when batching is enabled | ||
| if self.config.batch_size > 1: | ||
| registry.add_callback(AfterInvocationEvent, lambda event: self._flush_messages()) | ||
|
|
||
| async def _on_after_invocation_flush(event: AfterInvocationEvent) -> None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: could we add a small helper to reduce boilerplate here? |
||
| await asyncio.to_thread(self._flush_messages) | ||
|
|
||
| registry.add_callback(AfterInvocationEvent, _on_after_invocation_flush) | ||
|
|
||
| # Register multi-agent callbacks so async-mode parity matches sync-mode | ||
| async def _on_multi_agent_initialized(event: MultiAgentInitializedEvent) -> None: | ||
| await asyncio.to_thread(self.initialize_multi_agent, event.source) | ||
|
|
||
| async def _on_after_node_call(event: AfterNodeCallEvent) -> None: | ||
| await asyncio.to_thread(self.sync_multi_agent, event.source) | ||
|
|
||
| async def _on_after_multi_agent_invocation(event: AfterMultiAgentInvocationEvent) -> None: | ||
| await asyncio.to_thread(self.sync_multi_agent, event.source) | ||
|
|
||
| registry.add_callback(MultiAgentInitializedEvent, _on_multi_agent_initialized) | ||
| registry.add_callback(AfterNodeCallEvent, _on_after_node_call) | ||
| registry.add_callback(AfterMultiAgentInvocationEvent, _on_after_multi_agent_invocation) | ||
|
|
||
| @override | ||
| def initialize(self, agent: "Agent", **kwargs: Any) -> None: | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note (for my own understanding): because this path doesn't call
RepositorySessionManager.register_hooks, we must manually register the initialize hook: https://github.com/strands-agents/sdk-python/blob/main/src/strands/session/session_manager.py#L43.In other words, we pick this synchronous hook from that implementation and leave out the rest to overwrite them with our own async hooks.