Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def run_pipeline(self, input_info):
save_result_path = self.inputs.get("save_result_path")
if self.input_info.return_result_tensor:
self.end_run()
return {"image": image}
return {"images": [image]}
if save_result_path:
os.makedirs(os.path.dirname(os.path.abspath(save_result_path)), exist_ok=True)
image.save(save_result_path)
Expand All @@ -271,7 +271,7 @@ def run_pipeline(self, input_info):
if GET_RECORDER_MODE():
monitor_cli.lightx2v_worker_request_success.inc()
self.end_run()
return {"image": None}
return {"images": None}

def end_run(self):
if hasattr(self, "scheduler") and self.scheduler is not None:
Expand Down
58 changes: 49 additions & 9 deletions lightx2v/server/api/openai_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Literal, Optional

Expand All @@ -17,6 +18,7 @@
router = APIRouter()

_SIZE_PATTERN = re.compile(r"^\s*(\d+)\s*x\s*(\d+)\s*$", re.IGNORECASE)
OPENAI_IMAGE_RESULT_POLL_INTERVAL_SECONDS = 0.2


class OpenAIImageGenerationRequest(BaseModel):
Expand Down Expand Up @@ -52,7 +54,9 @@ def _shape_from_size(size: str) -> tuple[int, int]:

async def _wait_task_result_png(task_id: str, timeout_seconds: int, poll_interval_seconds: float) -> bytes:
start_time = time.monotonic()
status_checks = 0
while True:
status_checks += 1
task_status = task_manager.get_task_status(task_id)
if not task_status:
raise HTTPException(status_code=500, detail=f"Task status not found: {task_id}")
Expand All @@ -61,6 +65,16 @@ async def _wait_task_result_png(task_id: str, timeout_seconds: int, poll_interva
if status == TaskStatus.COMPLETED.value:
result_png = task_manager.get_task_result_png(task_id)
if result_png:
wait_elapsed_ms = (time.monotonic() - start_time) * 1000
completion_observe_lag_ms = 0.0
end_time = task_status.get("end_time")
if end_time:
completion_observe_lag_ms = (datetime.now() - end_time).total_seconds() * 1000
logger.info(
f"Task {task_id} OpenAI image wait_task_result cost total={wait_elapsed_ms:.2f} ms "
f"completion_observe_lag={completion_observe_lag_ms:.2f} ms "
f"poll_interval={poll_interval_seconds:.2f} s status_checks={status_checks}"
)
return result_png
raise HTTPException(status_code=500, detail=f"Task completed but no in-memory image found: {task_id}")

Expand Down Expand Up @@ -89,28 +103,39 @@ async def _watch_client_disconnect(request: Request, task_id: str, poll_interval
async def _run_sync_image_task(request: Request, message: ImageTaskRequest) -> bytes:
task_id = None
timeout_seconds = 600
poll_interval_seconds = 0.5
poll_interval_seconds = OPENAI_IMAGE_RESULT_POLL_INTERVAL_SECONDS

try:
message.prefer_memory_result = True
create_task_start = time.perf_counter()
task_id = task_manager.create_task(message)
create_task_elapsed_ms = (time.perf_counter() - create_task_start) * 1000
message.task_id = task_id
logger.info(f"Task {task_id} OpenAI image create_task cost {create_task_elapsed_ms:.2f} ms prompt_chars={len(message.prompt)} target_shape={message.target_shape}")

wait_task = asyncio.create_task(_wait_task_result_png(task_id, timeout_seconds, poll_interval_seconds))
disconnect_task = asyncio.create_task(_watch_client_disconnect(request, task_id))

done, pending = await asyncio.wait({wait_task, disconnect_task}, return_when=asyncio.FIRST_COMPLETED)
for pending_task in pending:
pending_task.cancel()
await asyncio.gather(*pending, return_exceptions=True)

if wait_task in done:
result_png = wait_task.result()
for pending_task in pending:
pending_task.cancel()
if pending:
_, still_pending = await asyncio.wait(pending, timeout=0)
if still_pending:
logger.debug(f"Task {task_id} disconnect watcher cancellation is still pending")
Comment on lines +123 to +128

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using asyncio.wait(pending, timeout=0) does not actually await the cancellation of the pending tasks, which can lead to task leaks and "Task was destroyed but it is pending!" warnings in the event loop. Since _watch_client_disconnect is a simple loop that yields control via asyncio.sleep, awaiting its cancellation is extremely fast and safe.

It is highly recommended to use the established pattern of await asyncio.gather(*pending, return_exceptions=True) to ensure proper cleanup of cancelled tasks.

Suggested change
for pending_task in pending:
pending_task.cancel()
if pending:
_, still_pending = await asyncio.wait(pending, timeout=0)
if still_pending:
logger.debug(f"Task {task_id} disconnect watcher cancellation is still pending")
for pending_task in pending:
pending_task.cancel()
if pending:
await asyncio.gather(*pending, return_exceptions=True)

logger.info(f"Task {task_id} OpenAI image task result ready, building response")
return result_png

if disconnect_task in done and disconnect_task.result():
if not wait_task.done():
wait_task.cancel()
await asyncio.gather(wait_task, return_exceptions=True)
await asyncio.wait({wait_task}, timeout=0)
Comment on lines 133 to +135

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similarly, using asyncio.wait({wait_task}, timeout=0) does not await the cancellation of wait_task, which can leave the task pending in the event loop and trigger warnings. Please use await asyncio.gather(wait_task, return_exceptions=True) to properly await the cancelled task.

Suggested change
if not wait_task.done():
wait_task.cancel()
await asyncio.gather(wait_task, return_exceptions=True)
await asyncio.wait({wait_task}, timeout=0)
if not wait_task.done():
wait_task.cancel()
await asyncio.gather(wait_task, return_exceptions=True)

raise HTTPException(status_code=499, detail=f"Client disconnected, task {task_id} cancelled")

return wait_task.result()
raise HTTPException(status_code=500, detail=f"Task {task_id} ended without image result")
except RuntimeError as e:
raise HTTPException(status_code=503, detail=str(e))
except HTTPException:
Expand Down Expand Up @@ -138,10 +163,25 @@ def _build_url_response(request: Request, task_id: str, image_bytes: bytes) -> s


def _build_openai_response(request: Request, task_id: str, image_bytes: bytes, response_format: Literal["url", "b64_json"]):
total_start = time.perf_counter()
if response_format == "b64_json":
return OpenAIImageResponse(created=int(time.time()), data=[{"b64_json": base64.b64encode(image_bytes).decode("utf-8")}])

return OpenAIImageResponse(created=int(time.time()), data=[{"url": _build_url_response(request, task_id, image_bytes)}])
base64_start = time.perf_counter()
b64_json = base64.b64encode(image_bytes).decode("utf-8")
base64_elapsed_ms = (time.perf_counter() - base64_start) * 1000
response = OpenAIImageResponse(created=int(time.time()), data=[{"b64_json": b64_json}])
total_elapsed_ms = (time.perf_counter() - total_start) * 1000
logger.info(
f"Task {task_id} OpenAI image response build cost total={total_elapsed_ms:.2f} ms base64={base64_elapsed_ms:.2f} ms format=b64_json png_bytes={len(image_bytes)} b64_chars={len(b64_json)}"
)
return response

url_start = time.perf_counter()
url = _build_url_response(request, task_id, image_bytes)
url_elapsed_ms = (time.perf_counter() - url_start) * 1000
response = OpenAIImageResponse(created=int(time.time()), data=[{"url": url}])
total_elapsed_ms = (time.perf_counter() - total_start) * 1000
logger.info(f"Task {task_id} OpenAI image response build cost total={total_elapsed_ms:.2f} ms url_write={url_elapsed_ms:.2f} ms format=url png_bytes={len(image_bytes)}")
return response


def _build_image_task_request(
Expand Down
8 changes: 6 additions & 2 deletions lightx2v/server/api/server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Optional

Expand All @@ -14,6 +15,8 @@
from .deps import ServiceContainer, get_services
from .router import create_api_router

TASK_PROCESSING_IDLE_WAIT_TIMEOUT_SECONDS = 0.2


class ApiServer:
def __init__(self, max_queue_size: int = 10, app: Optional[FastAPI] = None):
Expand Down Expand Up @@ -85,10 +88,9 @@ def _task_processing_loop(self):
loop = asyncio.get_event_loop()

while not self.stop_processing.is_set():
task_id = task_manager.get_next_pending_task()
task_id = task_manager.wait_for_next_pending_task(timeout=TASK_PROCESSING_IDLE_WAIT_TIMEOUT_SECONDS)

if task_id is None:
time.sleep(1)
continue

task_info = task_manager.get_task(task_id)
Expand All @@ -112,6 +114,8 @@ async def _process_single_task(self, task_info: Any):
return

try:
pending_elapsed_ms = (datetime.now() - task_info.start_time).total_seconds() * 1000
logger.info(f"Task {task_id} scheduler pending wait {pending_elapsed_ms:.2f} ms")
task_manager.start_task(task_id)

if task_info.stop_event.is_set():
Expand Down
21 changes: 17 additions & 4 deletions lightx2v/server/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(self, max_queue_size: int = 100):

self._tasks: OrderedDict[str, TaskInfo] = OrderedDict()
self._lock = threading.RLock()
self._task_available = threading.Condition(self._lock)

self._processing_lock = threading.Lock()
self._current_processing_task: Optional[str] = None
Expand All @@ -50,7 +51,7 @@ def __init__(self, max_queue_size: int = 100):
self._emit_queue_metrics_unlocked()

def create_task(self, message: Any) -> str:
with self._lock:
with self._task_available:
if hasattr(message, "task_id") and message.task_id in self._tasks:
raise RuntimeError(f"Task ID {message.task_id} already exists")

Expand All @@ -66,6 +67,7 @@ def create_task(self, message: Any) -> str:

self._cleanup_old_tasks()
self._emit_queue_metrics_unlocked()
self._task_available.notify()

return task_id

Expand Down Expand Up @@ -202,9 +204,20 @@ def release_processing_lock(self, task_id: str):

def get_next_pending_task(self) -> Optional[str]:
with self._lock:
for task_id, task in self._tasks.items():
if task.status == TaskStatus.PENDING:
return task_id
return self._get_next_pending_task_unlocked()

def wait_for_next_pending_task(self, timeout: Optional[float] = None) -> Optional[str]:
with self._task_available:
task_id = self._get_next_pending_task_unlocked()
if task_id:
return task_id
self._task_available.wait(timeout=timeout)
return self._get_next_pending_task_unlocked()

def _get_next_pending_task_unlocked(self) -> Optional[str]:
for task_id, task in self._tasks.items():
if task.status == TaskStatus.PENDING:
return task_id
return None

def get_service_status(self) -> Dict[str, Any]:
Expand Down
20 changes: 20 additions & 0 deletions scripts/hidream_o1_image/post_t2i_openai.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

lightx2v_path=/root/yongyang3/LightX2V
test_json=/root/test.json
Comment on lines +3 to +4

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The script hardcodes absolute paths specific to a local environment (/root/yongyang3/LightX2V and /root/test.json). This makes the script non-portable and likely to fail on other machines or for other users.

Consider resolving the paths dynamically relative to the script's directory, or using environment variables with sensible defaults.

Suggested change
lightx2v_path=/root/yongyang3/LightX2V
test_json=/root/test.json
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
lightx2v_path="$(realpath "${SCRIPT_DIR}/../..")"
test_json="${lightx2v_path}/test.json"

base_url=http://127.0.0.1:8000/v1
output_dir=${lightx2v_path}/save_results/hidream_o1_image_openai_test

export PYTHONPATH="${lightx2v_path}"

python "${lightx2v_path}/scripts/hidream_o1_image/test_openai_images_client.py" \
--base_url "${base_url}" \
--api_key "dummy-key" \
--model "gpt-image-1" \
--mode generate \
--prompt_json "${test_json}" \
--seed 42 \
--size "2048x2048" \
--response_format "b64_json" \
--output_dir "${output_dir}" \
--output_prefix "hidream_o1_image_openai"
Loading
Loading