Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 146 additions & 33 deletions src/google/adk/code_executors/container_code_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
from __future__ import annotations

import atexit
import io
import logging
import os
import tarfile
from typing import Optional

import docker
Expand All @@ -29,9 +31,10 @@
from .base_code_executor import BaseCodeExecutor
from .code_execution_utils import CodeExecutionInput
from .code_execution_utils import CodeExecutionResult
from .code_execution_utils import File

logger = logging.getLogger('google_adk.' + __name__)
DEFAULT_IMAGE_TAG = 'adk-code-executor:latest'
logger = logging.getLogger("google_adk." + __name__)
DEFAULT_IMAGE_TAG = "adk-code-executor:latest"


class ContainerCodeExecutor(BaseCodeExecutor):
Expand All @@ -44,6 +47,9 @@ class ContainerCodeExecutor(BaseCodeExecutor):
docker_path: The path to the directory containing the Dockerfile. If set,
build the image from the dockerfile path instead of using the predefined
image. Either docker_path or image must be set.
input_dir: The directory in the container where input files will be placed.
output_dir: The directory in the container where output files will be
retrieved from.
"""

base_url: Optional[str] = None
Expand All @@ -61,14 +67,21 @@ class ContainerCodeExecutor(BaseCodeExecutor):
"""
The path to the directory containing the Dockerfile.
If set, build the image from the dockerfile path instead of using the
predefined image. Either docker_path or image must be set.
predefined image. Either docker_path or image must be set .
"""

input_dir: str = "/tmp/inputs"
"""
The directory in the container where input files will be placed.
"""

output_dir: str = "/tmp/outputs"
"""
The directory in the container where output files will be retrieved from.
"""

# Overrides the BaseCodeExecutor attribute: this executor cannot be stateful.
stateful: bool = Field(default=False, frozen=True, exclude=True)

# Overrides the BaseCodeExecutor attribute: this executor cannot
# optimize_data_file.
optimize_data_file: bool = Field(default=False, frozen=True, exclude=True)

_client: DockerClient = None
Expand All @@ -79,6 +92,8 @@ def __init__(
base_url: Optional[str] = None,
image: Optional[str] = None,
docker_path: Optional[str] = None,
input_dir: Optional[str] = None,
output_dir: Optional[str] = None,
**data,
):
"""Initializes the ContainerCodeExecutor.
Expand All @@ -90,33 +105,37 @@ def __init__(
docker_path: The path to the directory containing the Dockerfile. If set,
build the image from the dockerfile path instead of using the predefined
image. Either docker_path or image must be set.
input_dir: The directory in the container where input files will be placed.
Defaults to '/tmp/inputs'.
output_dir: The directory in the container where output files will be
retrieved from. Defaults to '/tmp/outputs'.
**data: The data to initialize the ContainerCodeExecutor.
"""
if not image and not docker_path:
raise ValueError(
'Either image or docker_path must be set for ContainerCodeExecutor.'
"Either image or docker_path must be set for ContainerCodeExecutor."
)
if 'stateful' in data and data['stateful']:
raise ValueError('Cannot set `stateful=True` in ContainerCodeExecutor.')
if 'optimize_data_file' in data and data['optimize_data_file']:
if "stateful" in data and data["stateful"]:
raise ValueError("Cannot set `stateful=True` in ContainerCodeExecutor.")
if "optimize_data_file" in data and data["optimize_data_file"]:
raise ValueError(
'Cannot set `optimize_data_file=True` in ContainerCodeExecutor.'
"Cannot set `optimize_data_file=True` in ContainerCodeExecutor."
)

super().__init__(**data)
self.base_url = base_url
self.image = image if image else DEFAULT_IMAGE_TAG
self.docker_path = os.path.abspath(docker_path) if docker_path else None
self.input_dir = input_dir if input_dir else "/tmp/inputs"
self.output_dir = output_dir if output_dir else "/tmp/outputs"

self._client = (
docker.from_env()
if not self.base_url
else docker.DockerClient(base_url=self.base_url)
)
# Initialize the container.
self.__init_container()

# Close the container when the on exit.
atexit.register(self.__cleanup_container)

@override
Expand All @@ -125,76 +144,170 @@ def execute_code(
invocation_context: InvocationContext,
code_execution_input: CodeExecutionInput,
) -> CodeExecutionResult:
output = ''
error = ''
if code_execution_input.input_files:
self._put_input_files(code_execution_input.input_files)

self._create_output_directory()

output = ""
error = ""
exec_result = self._container.exec_run(
['python3', '-c', code_execution_input.code],
["python3", "-c", code_execution_input.code],
demux=True,
)
logger.debug('Executed code:\n```\n%s\n```', code_execution_input.code)
logger.debug("Executed code:\n```\n%s\n```", code_execution_input.code)

if exec_result.output and exec_result.output[0]:
output = exec_result.output[0].decode('utf-8')
output = exec_result.output[0].decode("utf-8")
if (
exec_result.output
and len(exec_result.output) > 1
and exec_result.output[1]
):
error = exec_result.output[1].decode('utf-8')
error = exec_result.output[1].decode("utf-8")

output_files = self._get_output_files()

# Collect the final result.
return CodeExecutionResult(
stdout=output,
stderr=error,
output_files=[],
output_files=output_files,
)

def _build_docker_image(self):
"""Builds the Docker image."""
if not self.docker_path:
raise ValueError('Docker path is not set.')
raise ValueError("Docker path is not set.")
if not os.path.exists(self.docker_path):
raise FileNotFoundError(f'Invalid Docker path: {self.docker_path}')
raise FileNotFoundError(f"Invalid Docker path: {self.docker_path}")

logger.info('Building Docker image...')
logger.info("Building Docker image...")
self._client.images.build(
path=self.docker_path,
tag=self.image,
rm=True,
)
logger.info('Docker image: %s built.', self.image)
logger.info("Docker image: %s built.", self.image)

def _verify_python_installation(self):
"""Verifies the container has python3 installed."""
exec_result = self._container.exec_run(['which', 'python3'])
exec_result = self._container.exec_run(["which", "python3"])
if exec_result.exit_code != 0:
raise ValueError("python3 is not installed in the container.")

def _put_input_files(self, input_files: list[File]) -> None:
"""Puts input files into the container using put_archive.

Args:
input_files: The list of input files to copy into the container.
"""
tar_buffer = io.BytesIO()
with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
for file in input_files:
content = file.content
if isinstance(content, str):
content = content.encode("utf-8")
tarinfo = tarfile.TarInfo(name=file.name)
tarinfo.size = len(content)
tar.addfile(tarinfo, io.BytesIO(content))

tar_buffer.seek(0)
self._container.put_archive(
self.input_dir,
tar_buffer.read(),
)
logger.debug(
"Copied %d input files to %s", len(input_files), self.input_dir
)

def _create_output_directory(self) -> None:
"""Creates the output directory in the container if it doesn't exist."""
exec_result = self._container.exec_run(
["mkdir", "-p", self.output_dir],
)
if exec_result.exit_code != 0:
raise ValueError('python3 is not installed in the container.')
logger.warning(
"Failed to create output directory %s: %s",
self.output_dir,
exec_result.output,
)

def _get_output_files(self) -> list[File]:
"""Gets output files from the container.

Returns:
The list of output files retrieved from the container.
"""
try:
tar_bytes, stat = self._container.get_archive(self.output_dir)
except docker.errors.APIError as e:
if e.response.status_code == 404:
logger.debug("No output files found at %s", self.output_dir)
return []
raise

tar_buffer = io.BytesIO(tar_bytes)
output_files = []

with tarfile.open(fileobj=tar_buffer, mode="r") as tar:
for member in tar.getmembers():
if member.isfile():
file_obj = tar.extractfile(member)
if file_obj:
content = file_obj.read()
file_name = os.path.basename(member.name)
if file_name:
output_files.append(
File(
name=file_name,
content=content,
mime_type=self._guess_mime_type(file_name),
)
)

logger.debug(
"Retrieved %d output files from %s", len(output_files), self.output_dir
)
return output_files

def _guess_mime_type(self, file_name: str) -> str:
"""Guesses the MIME type based on the file extension.

Args:
file_name: The name of the file.

Returns:
The guessed MIME type, or 'application/octet-stream' if unknown.
"""
import mimetypes

mime_type, _ = mimetypes.guess_type(file_name)
return mime_type if mime_type else "application/octet-stream"

def __init_container(self):
"""Initializes the container."""
if not self._client:
raise RuntimeError('Docker client is not initialized.')
raise RuntimeError("Docker client is not initialized.")

if self.docker_path:
self._build_docker_image()

logger.info('Starting container for ContainerCodeExecutor...')
logger.info("Starting container for ContainerCodeExecutor...")
self._container = self._client.containers.run(
image=self.image,
detach=True,
tty=True,
)
logger.info('Container %s started.', self._container.id)
logger.info("Container %s started.", self._container.id)

# Verify the container is able to run python3.
self._verify_python_installation()

def __cleanup_container(self):
"""Closes the container on exit."""
if not self._container:
return

logger.info('[Cleanup] Stopping the container...')
logger.info("[Cleanup] Stopping the container...")
self._container.stop()
self._container.remove()
logger.info('Container %s stopped and removed.', self._container.id)
logger.info("Container %s stopped and removed.", self._container.id)
Loading