Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions src/agentex/lib/core/temporal/workers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,17 @@ def _validate_interceptors(interceptors: list) -> None:
)


async def get_temporal_client(temporal_address: str, metrics_url: str | None = None, plugins: list = []) -> Client:
async def get_temporal_client(
temporal_address: str, metrics_url: str | None = None, plugins: list = []
) -> Client:
if plugins != []: # We don't need to validate the plugins if they are empty
_validate_plugins(plugins)

# Check if OpenAI plugin is present - it needs to configure its own data converter
# Lazy import to avoid pulling in opentelemetry.sdk for non-Temporal agents
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
has_openai_plugin = any(
isinstance(p, OpenAIAgentsPlugin) for p in (plugins or [])
)

has_openai_plugin = any(isinstance(p, OpenAIAgentsPlugin) for p in (plugins or []))

# Build connection kwargs
connect_kwargs = {
Expand All @@ -113,7 +114,9 @@ async def get_temporal_client(temporal_address: str, metrics_url: str | None = N
if not metrics_url:
client = await Client.connect(**connect_kwargs)
else:
runtime = Runtime(telemetry=TelemetryConfig(metrics=OpenTelemetryConfig(url=metrics_url)))
runtime = Runtime(
telemetry=TelemetryConfig(metrics=OpenTelemetryConfig(url=metrics_url))
)
connect_kwargs["runtime"] = runtime
client = await Client.connect(**connect_kwargs)
return client
Expand All @@ -128,16 +131,22 @@ def __init__(
health_check_port: int | None = None,
plugins: list = [],
interceptors: list = [],
metrics_url: str | None = None,
):
self.task_queue = task_queue
self.activity_handles = []
self.max_workers = max_workers
self.max_concurrent_activities = max_concurrent_activities
self.health_check_server_running = False
self.healthy = False
self.health_check_port = health_check_port if health_check_port is not None else EnvironmentVariables.refresh().HEALTH_CHECK_PORT
self.health_check_port = (
health_check_port
if health_check_port is not None
else EnvironmentVariables.refresh().HEALTH_CHECK_PORT
)
self.plugins = plugins
self.interceptors = interceptors
self.metrics_url = metrics_url

@overload
async def run(
Expand Down Expand Up @@ -172,12 +181,17 @@ async def run(
temporal_client = await get_temporal_client(
temporal_address=os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
plugins=self.plugins,
metrics_url=self.metrics_url,
)

# Enable debug mode if AgentEx debug is enabled (disables deadlock detection)
debug_enabled = os.environ.get("AGENTEX_DEBUG_ENABLED", "false").lower() == "true"
debug_enabled = (
os.environ.get("AGENTEX_DEBUG_ENABLED", "false").lower() == "true"
)
if debug_enabled:
logger.info("🐛 [WORKER] Temporal debug mode enabled - deadlock detection disabled")
logger.info(
"🐛 [WORKER] Temporal debug mode enabled - deadlock detection disabled"
)

if workflow is None and workflows is None:
raise ValueError("Either workflow or workflows must be provided")
Expand Down Expand Up @@ -207,7 +221,9 @@ async def _health_check(self):
async def start_health_check_server(self):
if not self.health_check_server_running:
app = web.Application()
app.router.add_get("/readyz", lambda request: self._health_check()) # noqa: ARG005
app.router.add_get(
"/readyz", lambda request: self._health_check()
) # noqa: ARG005

# Disable access logging
runner = web.AppRunner(app, access_log=None)
Expand All @@ -216,19 +232,27 @@ async def start_health_check_server(self):
try:
site = web.TCPSite(runner, "0.0.0.0", self.health_check_port)
await site.start()
logger.info(f"Health check server running on http://0.0.0.0:{self.health_check_port}/readyz")
logger.info(
f"Health check server running on http://0.0.0.0:{self.health_check_port}/readyz"
)
self.health_check_server_running = True
except OSError as e:
logger.error(f"Failed to start health check server on port {self.health_check_port}: {e}")
logger.error(
f"Failed to start health check server on port {self.health_check_port}: {e}"
)
# Try alternative port if default fails
try:
alt_port = self.health_check_port + 1
site = web.TCPSite(runner, "0.0.0.0", alt_port)
await site.start()
logger.info(f"Health check server running on alternative port http://0.0.0.0:{alt_port}/readyz")
logger.info(
f"Health check server running on alternative port http://0.0.0.0:{alt_port}/readyz"
)
self.health_check_server_running = True
except OSError as e:
logger.error(f"Failed to start health check server on alternative port {alt_port}: {e}")
logger.error(
f"Failed to start health check server on alternative port {alt_port}: {e}"
)
raise

"""
Expand Down
Loading