Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ravendb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@
AiAgentConfiguration,
AiAgentConfigurationResult,
AiAgentParameter,
AiAgentParameterPolicy,
AiAgentParameterValueType,
AiAgentToolAction,
AiAgentToolQuery,
AiAgentToolSubAgent,
AiAgentPersistenceConfiguration,
AiAgentChatTrimmingConfiguration,
AiAgentSummarizationByTokens,
Expand All @@ -101,10 +104,13 @@
RunConversationOperation,
ConversationResult,
AiAgentActionRequest,
AiAgentActionRequestType,
AiAgentActionResponse,
AiAgentArtificialActionResponse,
AiUsage,
AiConversationCreationOptions,
AiConversationParameter,
AiConversationParameterOptions,
GetAiAgentOperation,
GetAiAgentsResponse,
AddOrUpdateAiAgentOperation,
Expand Down Expand Up @@ -195,6 +201,7 @@
from ravendb.documents.queries.highlighting import HighlightingOptions, QueryHighlightings
from ravendb.documents.queries.index_query import IndexQuery
from ravendb.documents.queries.misc import SearchOperator
from ravendb.documents.queries.raven_document_query import RavenDocumentQuery
from ravendb.documents.queries.more_like_this import (
MoreLikeThisOperations,
MoreLikeThisBase,
Expand Down Expand Up @@ -238,6 +245,7 @@
DocumentsChanges,
ForceRevisionStrategy,
MethodCall,
OptimisticConcurrencyMode,
OrderingType,
JavaScriptMap,
DocumentQueryCustomization,
Expand Down
153 changes: 37 additions & 116 deletions ravendb/documents/ai/ai_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import json
import traceback
from typing import List, Dict, Any, Optional, TypeVar, TYPE_CHECKING, Callable
from typing import List, Dict, Any, IO, Optional, TypeVar, TYPE_CHECKING, Callable, Union
from datetime import timedelta

from ravendb.documents.ai.ai_answer import AiAnswer, AiConversationStatus
Expand Down Expand Up @@ -33,14 +33,7 @@ def __init__(self, sender: AiConversation, action: AiAgentActionRequest):


class AiConversation:
"""
Implementation of AI conversation operations for managing conversations with AI agents.

Can be used as a context manager for automatic cleanup:
with store.ai.conversation(agent_id) as conversation:
conversation.set_user_prompt("Hello!")
result = conversation.run()
"""
# Usable as a context manager: `with store.ai.conversation(agent_id) as c:`.

def __init__(
self,
Expand All @@ -60,35 +53,43 @@ def __init__(
self._action_responses: Dict[str, AiAgentActionResponse] = {}
self._artificial_actions: List[AiAgentArtificialActionResponse] = []
self._action_requests: Optional[List[AiAgentActionRequest]] = None
self._attachments_commands: List = []

# Action handlers
self._invocations: Dict[str, Callable[[AiAgentActionRequest], None]] = {}

self.on_unhandled_action: Optional[Callable[[UnhandledActionEventArgs], None]] = None

def add_attachment(self, name: str, stream: Union[bytes, IO[bytes]], content_type: str) -> None:
# `stream` is raw bytes or any binary file-like; each stream may only
# be used once per turn (SingleNodeBatchCommand enforces uniqueness).
if stream is None:
raise ValueError("stream cannot be None")
from ravendb.documents.commands.batches import PutAttachmentCommandData

self._attachments_commands.append(
PutAttachmentCommandData("__this__", name, stream, content_type, change_vector=None)
)

def copy_attachment_from(self, source_document_id: str, file_name: str) -> None:
if not source_document_id or (isinstance(source_document_id, str) and source_document_id.isspace()):
raise ValueError("source_document_id cannot be None or empty")
if not file_name or (isinstance(file_name, str) and file_name.isspace()):
raise ValueError("file_name cannot be None or empty")
from ravendb.documents.commands.batches import CopyAttachmentCommandData

self._attachments_commands.append(
CopyAttachmentCommandData(source_document_id, file_name, "__this__", file_name, change_vector=None)
)

def __enter__(self) -> AiConversation:
"""Context manager entry."""
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit - cleanup resources."""
pass

@classmethod
def with_conversation_id(
cls, store: DocumentStore, conversation_id: str, change_vector: str = None
) -> AiConversation:
"""
Creates a conversation instance for continuing an existing conversation.

Args:
store: The document store
conversation_id: The ID of the existing conversation
change_vector: Optional change vector for optimistic concurrency

Returns:
A new conversation instance
"""
return cls(
store=store,
conversation_id=conversation_id,
Expand All @@ -97,28 +98,11 @@ def with_conversation_id(

@property
def required_actions(self) -> List[AiAgentActionRequest]:
"""
Gets the list of action requests that need to be fulfilled before
the conversation can continue.

Raises:
RuntimeError: If run() hasn't been called yet
"""
if self._action_requests is None:
raise RuntimeError("You have to call run() first")
return self._action_requests

def add_action_response(self, action_id: str, action_response: str) -> None:
"""
Adds a response for a given action request.

Args:
action_id: The ID of the action to respond to
action_response: The response content

Raises:
InvalidOperationException: If a response for the given tool-id was already added
"""
from ravendb.documents.operations.ai.agents import AiAgentActionResponse

if action_id in self._action_responses:
Expand All @@ -136,16 +120,8 @@ def add_action_response(self, action_id: str, action_response: str) -> None:
self._action_responses[action_id] = response

def add_artificial_action_with_response(self, tool_id: str, action_response) -> None:
"""
Injects an artificial action (tool call) and a response into the model's conversation context.
This is an advanced mechanism to programmatically prompt the agent, causing it to "believe"
it successfully executed a tool and received the specified action_response.

Args:
tool_id: The name of the tool to simulate the agent called.
action_response: The response to supply to the agent as the result of the simulated action.
Can be a string or any object that will be serialized to JSON.
"""
# Injects a synthetic tool-call + response so the agent "believes" it
# already executed `tool_id` and got `action_response` back.
if not tool_id or (isinstance(tool_id, str) and tool_id.isspace()):
raise ValueError("tool_id cannot be None or empty")
if action_response is None:
Expand All @@ -159,22 +135,12 @@ def add_artificial_action_with_response(self, tool_id: str, action_response) ->
self._artificial_actions.append(AiAgentArtificialActionResponse(tool_id=tool_id, content=content))

def run(self) -> AiAnswer:
"""
Executes the conversation loop, automatically handling action requests
until the conversation is complete or no handlers are available.

Returns:
AiAnswer with the final response, status, usage, and elapsed time
"""
while True:
r = self._run_internal()
if self._handle_server_reply(r):
return r

def stream(self, stream_property_path: str = None, on_chunk: Optional[Callable[[str], None]] = None) -> AiAnswer:
"""
Stream the LLM response for the given property and return the final AiAnswer when done.
"""
while True:
r = self._run_internal(stream_property_path=stream_property_path, streamed_chunks_callback=on_chunk)
if self._handle_server_reply(r):
Expand All @@ -185,21 +151,16 @@ def _run_internal(
stream_property_path: Optional[str] = None,
streamed_chunks_callback: Optional[Callable[[str], None]] = None,
) -> AiAnswer:
"""
Internal method that executes a single server call.

Returns:
AiAnswer from this single turn
"""
from ravendb.documents.operations.ai.agents import RunConversationOperation
import time

# If we already went to the server and have nothing new to tell it, we're done
# Already round-tripped and nothing new to send.
if (
self._action_requests is not None
and len(self._prompt_parts) == 0
and len(self._action_responses) == 0
and len(self._artificial_actions) == 0
and len(self._attachments_commands) == 0
):
return AiAnswer(
answer=None,
Expand All @@ -208,40 +169,35 @@ def _run_internal(
elapsed=None,
)

# Build the operation
if not self._agent_id:
raise ValueError("Agent ID is required")

# If we don't have a conversation ID yet, generate one with the prefix
# The server will complete it with a unique ID
# Trailing "/" tells the server to assign a unique id.
if not self._conversation_id:
self._conversation_id = "conversations/"

# Create operation with all required parameters
operation = RunConversationOperation(
agent_id=self._agent_id,
conversation_id=self._conversation_id,
prompt_parts=self._prompt_parts, # Always send list, even if empty
action_responses=list(self._action_responses.values()), # Always send list, even if empty
artificial_actions=self._artificial_actions, # Always send list, even if empty
prompt_parts=self._prompt_parts,
action_responses=list(self._action_responses.values()),
artificial_actions=self._artificial_actions,
options=self._options,
change_vector=self._change_vector,
stream_property_path=stream_property_path,
streamed_chunks_callback=streamed_chunks_callback,
attachments_commands=self._attachments_commands,
)

try:
# Track elapsed time
start_time = time.time()
result = self._store.maintenance.send(operation)
elapsed = timedelta(seconds=time.time() - start_time)

# Update conversation state
self._change_vector = result.change_vector
self._conversation_id = result.conversation_id
self._action_requests = result.action_requests or []

# Build AiAnswer
return AiAnswer(
answer=result.response,
status=(
Expand All @@ -252,25 +208,14 @@ def _run_internal(
usage=result.usage,
elapsed=elapsed,
)
# except ConcurrencyException as e:
# self._change_vector = e.actual_change_vector
# raise
finally:
# Clear the user prompt and tool responses after running the conversation
self._prompt_parts.clear()
self._action_responses.clear()
self._artificial_actions.clear()
self._attachments_commands.clear()

def _handle_server_reply(self, answer: AiAnswer) -> bool:
"""
Handles the server reply by invoking registered action handlers.

Args:
answer: The answer from the server

Returns:
True if the conversation is done, False if it should continue
"""
# Returns True when the conversation is done.
if answer.status == AiConversationStatus.DONE:
return True

Expand All @@ -279,52 +224,28 @@ def _handle_server_reply(self, answer: AiAnswer) -> bool:
f"There are no action requests to process, but Status was {answer.status}, should not be possible."
)

# Process each action request
for action in self._action_requests:
if action.name in self._invocations:
# Invoke the registered handler
# Error handling is done by the invocation based on the error strategy
self._invocations[action.name](action)
elif self.on_unhandled_action is not None:
self.on_unhandled_action(UnhandledActionEventArgs(self, action))
else:
# No handler registered for this action
raise RuntimeError(
f"There is no action defined for action '{action.name}' on agent '{self._agent_id}' "
f"({self._conversation_id}), but it was invoked by the model with: {action.arguments}. "
f"Did you forget to call {self.receive.__name__}() or {self.handle.__name__}()? You can also handle unexpected action invocations using the 'on_unhandled_action' event."
)

# If we have nothing to tell the server (no action responses), we're done
# Otherwise, continue the loop to send the responses
# No responses to deliver => nothing more to tell the server.
return len(self._action_responses) == 0

def set_user_prompt(self, user_prompt: str) -> None:
"""
Sets the user prompt to send to the AI agent.
Clears any existing prompt parts and adds the new prompt.

Args:
user_prompt: The prompt text to send to the agent

Raises:
ValueError: If user_prompt is empty or whitespace-only
"""
if not user_prompt or user_prompt.isspace():
raise ValueError("User prompt cannot be empty or whitespace-only")
self._prompt_parts.clear()
self.add_user_prompt(user_prompt)

def add_user_prompt(self, *prompts: str) -> None:
"""
Adds one or more user prompts to the conversation.

Args:
*prompts: One or more prompt strings to add

Raises:
ValueError: If any prompt is empty or whitespace-only
"""
for prompt in prompts:
if not prompt or prompt.isspace():
raise ValueError("User prompt cannot be empty or whitespace-only")
Expand Down
18 changes: 1 addition & 17 deletions ravendb/documents/ai/content_part.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,16 @@


class AiMessagePromptFields:
"""Constants for AI message prompt field names."""

TEXT = "text"
TYPE = "type"
IMAGE = "image"


class AiMessagePromptTypes:
"""Constants for AI message prompt types."""

TEXT = "text"


class ContentPart:
"""
Base class for content parts in AI prompts.
Content parts allow structured prompt content with different types (text, etc.).
"""

def __init__(self, content_type: str):
self._type = content_type

Expand All @@ -29,18 +21,10 @@ def type(self) -> str:
return self._type

def to_json(self) -> Dict[str, Any]:
"""
Converts the content part to a JSON-serializable dictionary.
Subclasses should override this method to include their specific fields.
"""
return {AiMessagePromptFields.TYPE: self._type}


class TextPart(ContentPart):
"""
Represents a text content part in AI prompts.
"""

def __init__(self, text: str):
super().__init__(AiMessagePromptTypes.TEXT)
self._text = text
Expand Down
Loading
Loading