diff --git a/lightx2v/models/runners/hidream_o1_image/hidream_o1_image_runner.py b/lightx2v/models/runners/hidream_o1_image/hidream_o1_image_runner.py index 260012b1b..06ef32fa8 100644 --- a/lightx2v/models/runners/hidream_o1_image/hidream_o1_image_runner.py +++ b/lightx2v/models/runners/hidream_o1_image/hidream_o1_image_runner.py @@ -262,7 +262,7 @@ def run_pipeline(self, input_info): save_result_path = self.inputs.get("save_result_path") if self.input_info.return_result_tensor: self.end_run() - return {"image": image} + return {"images": [image]} if save_result_path: os.makedirs(os.path.dirname(os.path.abspath(save_result_path)), exist_ok=True) image.save(save_result_path) @@ -271,7 +271,7 @@ def run_pipeline(self, input_info): if GET_RECORDER_MODE(): monitor_cli.lightx2v_worker_request_success.inc() self.end_run() - return {"image": None} + return {"images": None} def end_run(self): if hasattr(self, "scheduler") and self.scheduler is not None: diff --git a/lightx2v/server/api/openai_images.py b/lightx2v/server/api/openai_images.py index a4bd2b6f8..e11f2864d 100644 --- a/lightx2v/server/api/openai_images.py +++ b/lightx2v/server/api/openai_images.py @@ -3,6 +3,7 @@ import re import time import uuid +from datetime import datetime from pathlib import Path from typing import Literal, Optional @@ -17,6 +18,7 @@ router = APIRouter() _SIZE_PATTERN = re.compile(r"^\s*(\d+)\s*x\s*(\d+)\s*$", re.IGNORECASE) +OPENAI_IMAGE_RESULT_POLL_INTERVAL_SECONDS = 0.2 class OpenAIImageGenerationRequest(BaseModel): @@ -52,7 +54,9 @@ def _shape_from_size(size: str) -> tuple[int, int]: async def _wait_task_result_png(task_id: str, timeout_seconds: int, poll_interval_seconds: float) -> bytes: start_time = time.monotonic() + status_checks = 0 while True: + status_checks += 1 task_status = task_manager.get_task_status(task_id) if not task_status: raise HTTPException(status_code=500, detail=f"Task status not found: {task_id}") @@ -61,6 +65,16 @@ async def _wait_task_result_png(task_id: str, timeout_seconds: int, poll_interva if status == TaskStatus.COMPLETED.value: result_png = task_manager.get_task_result_png(task_id) if result_png: + wait_elapsed_ms = (time.monotonic() - start_time) * 1000 + completion_observe_lag_ms = 0.0 + end_time = task_status.get("end_time") + if end_time: + completion_observe_lag_ms = (datetime.now() - end_time).total_seconds() * 1000 + logger.info( + f"Task {task_id} OpenAI image wait_task_result cost total={wait_elapsed_ms:.2f} ms " + f"completion_observe_lag={completion_observe_lag_ms:.2f} ms " + f"poll_interval={poll_interval_seconds:.2f} s status_checks={status_checks}" + ) return result_png raise HTTPException(status_code=500, detail=f"Task completed but no in-memory image found: {task_id}") @@ -89,28 +103,39 @@ async def _watch_client_disconnect(request: Request, task_id: str, poll_interval async def _run_sync_image_task(request: Request, message: ImageTaskRequest) -> bytes: task_id = None timeout_seconds = 600 - poll_interval_seconds = 0.5 + poll_interval_seconds = OPENAI_IMAGE_RESULT_POLL_INTERVAL_SECONDS try: message.prefer_memory_result = True + create_task_start = time.perf_counter() task_id = task_manager.create_task(message) + create_task_elapsed_ms = (time.perf_counter() - create_task_start) * 1000 message.task_id = task_id + logger.info(f"Task {task_id} OpenAI image create_task cost {create_task_elapsed_ms:.2f} ms prompt_chars={len(message.prompt)} target_shape={message.target_shape}") wait_task = asyncio.create_task(_wait_task_result_png(task_id, timeout_seconds, poll_interval_seconds)) disconnect_task = asyncio.create_task(_watch_client_disconnect(request, task_id)) done, pending = await asyncio.wait({wait_task, disconnect_task}, return_when=asyncio.FIRST_COMPLETED) - for pending_task in pending: - pending_task.cancel() - await asyncio.gather(*pending, return_exceptions=True) + + if wait_task in done: + result_png = wait_task.result() + for pending_task in pending: + pending_task.cancel() + if pending: + _, still_pending = await asyncio.wait(pending, timeout=0) + if still_pending: + logger.debug(f"Task {task_id} disconnect watcher cancellation is still pending") + logger.info(f"Task {task_id} OpenAI image task result ready, building response") + return result_png if disconnect_task in done and disconnect_task.result(): if not wait_task.done(): wait_task.cancel() - await asyncio.gather(wait_task, return_exceptions=True) + await asyncio.wait({wait_task}, timeout=0) raise HTTPException(status_code=499, detail=f"Client disconnected, task {task_id} cancelled") - return wait_task.result() + raise HTTPException(status_code=500, detail=f"Task {task_id} ended without image result") except RuntimeError as e: raise HTTPException(status_code=503, detail=str(e)) except HTTPException: @@ -138,10 +163,25 @@ def _build_url_response(request: Request, task_id: str, image_bytes: bytes) -> s def _build_openai_response(request: Request, task_id: str, image_bytes: bytes, response_format: Literal["url", "b64_json"]): + total_start = time.perf_counter() if response_format == "b64_json": - return OpenAIImageResponse(created=int(time.time()), data=[{"b64_json": base64.b64encode(image_bytes).decode("utf-8")}]) - - return OpenAIImageResponse(created=int(time.time()), data=[{"url": _build_url_response(request, task_id, image_bytes)}]) + base64_start = time.perf_counter() + b64_json = base64.b64encode(image_bytes).decode("utf-8") + base64_elapsed_ms = (time.perf_counter() - base64_start) * 1000 + response = OpenAIImageResponse(created=int(time.time()), data=[{"b64_json": b64_json}]) + total_elapsed_ms = (time.perf_counter() - total_start) * 1000 + logger.info( + f"Task {task_id} OpenAI image response build cost total={total_elapsed_ms:.2f} ms base64={base64_elapsed_ms:.2f} ms format=b64_json png_bytes={len(image_bytes)} b64_chars={len(b64_json)}" + ) + return response + + url_start = time.perf_counter() + url = _build_url_response(request, task_id, image_bytes) + url_elapsed_ms = (time.perf_counter() - url_start) * 1000 + response = OpenAIImageResponse(created=int(time.time()), data=[{"url": url}]) + total_elapsed_ms = (time.perf_counter() - total_start) * 1000 + logger.info(f"Task {task_id} OpenAI image response build cost total={total_elapsed_ms:.2f} ms url_write={url_elapsed_ms:.2f} ms format=url png_bytes={len(image_bytes)}") + return response def _build_image_task_request( diff --git a/lightx2v/server/api/server.py b/lightx2v/server/api/server.py index 7027b5a1a..af438b855 100644 --- a/lightx2v/server/api/server.py +++ b/lightx2v/server/api/server.py @@ -1,6 +1,7 @@ import asyncio import threading import time +from datetime import datetime from pathlib import Path from typing import Any, Optional @@ -14,6 +15,8 @@ from .deps import ServiceContainer, get_services from .router import create_api_router +TASK_PROCESSING_IDLE_WAIT_TIMEOUT_SECONDS = 0.2 + class ApiServer: def __init__(self, max_queue_size: int = 10, app: Optional[FastAPI] = None): @@ -85,10 +88,9 @@ def _task_processing_loop(self): loop = asyncio.get_event_loop() while not self.stop_processing.is_set(): - task_id = task_manager.get_next_pending_task() + task_id = task_manager.wait_for_next_pending_task(timeout=TASK_PROCESSING_IDLE_WAIT_TIMEOUT_SECONDS) if task_id is None: - time.sleep(1) continue task_info = task_manager.get_task(task_id) @@ -112,6 +114,8 @@ async def _process_single_task(self, task_info: Any): return try: + pending_elapsed_ms = (datetime.now() - task_info.start_time).total_seconds() * 1000 + logger.info(f"Task {task_id} scheduler pending wait {pending_elapsed_ms:.2f} ms") task_manager.start_task(task_id) if task_info.stop_event.is_set(): diff --git a/lightx2v/server/task_manager.py b/lightx2v/server/task_manager.py index 5c53ea096..b7db45029 100644 --- a/lightx2v/server/task_manager.py +++ b/lightx2v/server/task_manager.py @@ -40,6 +40,7 @@ def __init__(self, max_queue_size: int = 100): self._tasks: OrderedDict[str, TaskInfo] = OrderedDict() self._lock = threading.RLock() + self._task_available = threading.Condition(self._lock) self._processing_lock = threading.Lock() self._current_processing_task: Optional[str] = None @@ -50,7 +51,7 @@ def __init__(self, max_queue_size: int = 100): self._emit_queue_metrics_unlocked() def create_task(self, message: Any) -> str: - with self._lock: + with self._task_available: if hasattr(message, "task_id") and message.task_id in self._tasks: raise RuntimeError(f"Task ID {message.task_id} already exists") @@ -66,6 +67,7 @@ def create_task(self, message: Any) -> str: self._cleanup_old_tasks() self._emit_queue_metrics_unlocked() + self._task_available.notify() return task_id @@ -202,9 +204,20 @@ def release_processing_lock(self, task_id: str): def get_next_pending_task(self) -> Optional[str]: with self._lock: - for task_id, task in self._tasks.items(): - if task.status == TaskStatus.PENDING: - return task_id + return self._get_next_pending_task_unlocked() + + def wait_for_next_pending_task(self, timeout: Optional[float] = None) -> Optional[str]: + with self._task_available: + task_id = self._get_next_pending_task_unlocked() + if task_id: + return task_id + self._task_available.wait(timeout=timeout) + return self._get_next_pending_task_unlocked() + + def _get_next_pending_task_unlocked(self) -> Optional[str]: + for task_id, task in self._tasks.items(): + if task.status == TaskStatus.PENDING: + return task_id return None def get_service_status(self) -> Dict[str, Any]: diff --git a/scripts/hidream_o1_image/post_t2i_openai.sh b/scripts/hidream_o1_image/post_t2i_openai.sh new file mode 100755 index 000000000..3c5d3f340 --- /dev/null +++ b/scripts/hidream_o1_image/post_t2i_openai.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +lightx2v_path=/root/yongyang3/LightX2V +test_json=/root/test.json +base_url=http://127.0.0.1:8000/v1 +output_dir=${lightx2v_path}/save_results/hidream_o1_image_openai_test + +export PYTHONPATH="${lightx2v_path}" + +python "${lightx2v_path}/scripts/hidream_o1_image/test_openai_images_client.py" \ +--base_url "${base_url}" \ +--api_key "dummy-key" \ +--model "gpt-image-1" \ +--mode generate \ +--prompt_json "${test_json}" \ +--seed 42 \ +--size "2048x2048" \ +--response_format "b64_json" \ +--output_dir "${output_dir}" \ +--output_prefix "hidream_o1_image_openai" diff --git a/scripts/hidream_o1_image/test_openai_images_client.py b/scripts/hidream_o1_image/test_openai_images_client.py new file mode 100644 index 000000000..b6faa96a4 --- /dev/null +++ b/scripts/hidream_o1_image/test_openai_images_client.py @@ -0,0 +1,348 @@ +import argparse +import base64 +import json +import os +import time +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any +from urllib.parse import urlparse + +import requests + +try: + from openai import OpenAI # pyright: ignore[reportMissingImports] +except ImportError: + OpenAI = None # type: ignore[assignment] + + +@dataclass +class SaveImageResult: + path: Path + source: str + bytes_written: int + total_seconds: float + decode_seconds: float = 0.0 + download_seconds: float = 0.0 + write_seconds: float = 0.0 + + +@dataclass +class ImageRequestResult: + path: Path + http_sdk_parse_seconds: float + save: SaveImageResult + + +def _extract_data_item(response: Any) -> dict[str, Any]: + if not hasattr(response, "data") or not response.data: + raise RuntimeError(f"Invalid OpenAI images response: {response}") + item = response.data[0] + if hasattr(item, "model_dump"): + return item.model_dump() # openai pydantic object + if isinstance(item, dict): + return item + raise RuntimeError(f"Unsupported data item type: {type(item)!r}") + + +def _save_image_from_item(item: dict[str, Any], output_path: Path) -> SaveImageResult: + output_path.parent.mkdir(parents=True, exist_ok=True) + total_start = time.perf_counter() + + if "b64_json" in item and item["b64_json"]: + decode_start = time.perf_counter() + image_bytes = base64.b64decode(item["b64_json"]) + decode_seconds = time.perf_counter() - decode_start + + write_start = time.perf_counter() + output_path.write_bytes(image_bytes) + write_seconds = time.perf_counter() - write_start + + return SaveImageResult( + path=output_path, + source="b64_json", + bytes_written=len(image_bytes), + total_seconds=time.perf_counter() - total_start, + decode_seconds=decode_seconds, + write_seconds=write_seconds, + ) + + if "url" in item and item["url"]: + download_start = time.perf_counter() + resp = requests.get(item["url"], timeout=120) + resp.raise_for_status() + download_seconds = time.perf_counter() - download_start + + write_start = time.perf_counter() + output_path.write_bytes(resp.content) + write_seconds = time.perf_counter() - write_start + + return SaveImageResult( + path=output_path, + source="url", + bytes_written=len(resp.content), + total_seconds=time.perf_counter() - total_start, + download_seconds=download_seconds, + write_seconds=write_seconds, + ) + + raise RuntimeError(f"Response item has neither b64_json nor url: {item}") + + +def _summarize_response_item(item: dict[str, Any]) -> dict[str, Any]: + summary = dict(item) + if "b64_json" in summary and summary["b64_json"]: + summary["b64_json"] = f"" + return summary + + +def _format_duration(seconds: float) -> str: + total_ms = int(round(seconds * 1000)) + total_seconds, milliseconds = divmod(total_ms, 1000) + hours, remainder = divmod(total_seconds, 3600) + minutes, seconds = divmod(remainder, 60) + return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{milliseconds:03d}" + + +def _format_request_timing(result: ImageRequestResult) -> str: + save = result.save + parts = [ + f"http_sdk_parse={result.http_sdk_parse_seconds:.3f}s", + f"save_total={save.total_seconds:.3f}s", + ] + if save.decode_seconds: + parts.append(f"base64_decode={save.decode_seconds:.3f}s") + if save.download_seconds: + parts.append(f"download={save.download_seconds:.3f}s") + parts.extend( + [ + f"disk_write={save.write_seconds:.3f}s", + f"bytes={save.bytes_written}", + f"source={save.source}", + ] + ) + return ", ".join(parts) + + +def _ensure_local_no_proxy(base_url: str) -> None: + hostname = urlparse(base_url).hostname + if hostname not in {"127.0.0.1", "localhost", "0.0.0.0", "::1"}: + return + + local_hosts = ["127.0.0.1", "localhost", "0.0.0.0", "::1"] + for env_name in ("NO_PROXY", "no_proxy"): + existing = [item.strip() for item in os.environ.get(env_name, "").split(",") if item.strip()] + merged = local_hosts + [item for item in existing if item not in local_hosts] + os.environ[env_name] = ",".join(merged) + + +def _load_prompts(prompt_json: str) -> list[str]: + path = Path(prompt_json) + data = json.loads(path.read_text(encoding="utf-8")) + if not isinstance(data, list): + raise ValueError(f"{path} must contain a JSON array") + + prompts = [] + for index, item in enumerate(data): + if isinstance(item, str): + prompt = item + elif isinstance(item, dict) and isinstance(item.get("prompt"), str): + prompt = item["prompt"] + else: + raise ValueError(f"{path}[{index}] must be a string or an object with a string prompt") + prompts.append(prompt) + return prompts + + +def _write_summary_line(summary_file: Path, line: str) -> None: + with summary_file.open("a", encoding="utf-8") as f: + f.write(line) + f.write("\n") + + +def _extra_body_from_args(args: argparse.Namespace) -> dict[str, Any] | None: + if args.seed is None: + return None + return {"seed": args.seed} + + +def run_generate(client: Any, args: argparse.Namespace, prompt: str | None = None, output_path: Path | None = None) -> ImageRequestResult: + prompt = args.prompt if prompt is None else prompt + output_path = Path(args.output_dir) / "generate.png" if output_path is None else output_path + + request_start = time.perf_counter() + response = client.images.generate( + model=args.model, + prompt=prompt, + size=args.size, + response_format=args.response_format, + extra_body=_extra_body_from_args(args), + ) + http_sdk_parse_seconds = time.perf_counter() - request_start + + item = _extract_data_item(response) + print(f"[generate] response item: {_summarize_response_item(item)}") + save_result = _save_image_from_item(item, output_path) + result = ImageRequestResult(path=save_result.path, http_sdk_parse_seconds=http_sdk_parse_seconds, save=save_result) + print(f"[generate] timing: {_format_request_timing(result)}") + return result + + +def run_edit(client: Any, args: argparse.Namespace) -> ImageRequestResult: + if not args.image: + raise ValueError("--image is required for edit mode") + + image_path = Path(args.image) + if not image_path.exists(): + raise FileNotFoundError(f"Image file not found: {image_path}") + + with image_path.open("rb") as image_file: + kwargs = { + "model": args.model, + "image": image_file, + "prompt": args.edit_prompt or args.prompt, + "size": args.size, + "response_format": args.response_format, + "extra_body": _extra_body_from_args(args), + } + request_start = time.perf_counter() + if args.mask: + mask_path = Path(args.mask) + if not mask_path.exists(): + raise FileNotFoundError(f"Mask file not found: {mask_path}") + with mask_path.open("rb") as mask_file: + response = client.images.edit(mask=mask_file, **kwargs) + else: + response = client.images.edit(**kwargs) + http_sdk_parse_seconds = time.perf_counter() - request_start + + item = _extract_data_item(response) + print(f"[edit] response item: {_summarize_response_item(item)}") + save_result = _save_image_from_item(item, Path(args.output_dir) / "edit.png") + result = ImageRequestResult(path=save_result.path, http_sdk_parse_seconds=http_sdk_parse_seconds, save=save_result) + print(f"[edit] timing: {_format_request_timing(result)}") + return result + + +def run_generate_batch(client: Any, args: argparse.Namespace) -> int: + prompts = _load_prompts(args.prompt_json) + output_dir = Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + run_stamp = datetime.now().strftime("%Y%m%d_%H%M%S") + summary_file = Path(args.summary_file) if args.summary_file else output_dir / f"{args.output_prefix}_summary_{run_stamp}.log" + summary_file.parent.mkdir(parents=True, exist_ok=True) + + total = len(prompts) + completed = 0 + failed = 0 + batch_start = time.perf_counter() + started_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + summary_file.write_text( + "\n".join( + [ + f"Run started at: {started_at}", + f"Prompt JSON: {args.prompt_json}", + f"Base URL: {args.base_url}", + f"Model: {args.model}", + f"Seed: {args.seed}", + f"Size: {args.size}", + f"Response format: {args.response_format}", + f"Output directory: {output_dir}", + f"Total prompts: {total}", + "", + ] + ), + encoding="utf-8", + ) + + print(f"Posting {total} OpenAI-format image prompts from {args.prompt_json} to {args.base_url}") + print(f"Output directory: {output_dir}") + print(f"Summary file: {summary_file}") + + for index, prompt in enumerate(prompts, 1): + number = f"{index:03d}" + output_path = output_dir / f"{args.output_prefix}_{number}.png" + case_start = time.perf_counter() + case_started_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + print(f"[{index}/{total}] submitting {output_path.name}") + _write_summary_line(summary_file, f"Case {number} started at: {case_started_at}") + + try: + result = run_generate(client, args, prompt=prompt, output_path=output_path) + except Exception as e: + failed += 1 + elapsed = time.perf_counter() - case_start + print(f"[{index}/{total}] failed: {output_path.name}: {e}") + _write_summary_line(summary_file, f"Case {number} status: failed, elapsed: {_format_duration(elapsed)} ({elapsed:.3f}s), error: {e}") + if args.stop_on_error: + break + else: + completed += 1 + elapsed = time.perf_counter() - case_start + print(f"[{index}/{total}] saved {result.path}") + print(f"[{index}/{total}] elapsed: {_format_duration(elapsed)} ({elapsed:.3f}s)") + _write_summary_line( + summary_file, + f"Case {number} status: success, elapsed: {_format_duration(elapsed)} ({elapsed:.3f}s), {_format_request_timing(result)}, output: {result.path}", + ) + + total_elapsed = time.perf_counter() - batch_start + ended_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + _write_summary_line(summary_file, "") + _write_summary_line(summary_file, f"Run ended at: {ended_at}") + _write_summary_line(summary_file, f"Elapsed seconds: {total_elapsed:.3f}") + _write_summary_line(summary_file, f"Elapsed time: {_format_duration(total_elapsed)}") + _write_summary_line(summary_file, f"Completed prompts: {completed}") + _write_summary_line(summary_file, f"Failed prompts: {failed}") + + print(f"Finished: completed={completed}/{total}, failed={failed}/{total}") + print(f"Total elapsed: {_format_duration(total_elapsed)} ({total_elapsed:.3f}s)") + print(f"Summary written to: {summary_file}") + return 1 if failed else 0 + + +def main() -> None: + parser = argparse.ArgumentParser(description="Test OpenAI-compatible image APIs on LightX2V server.") + parser.add_argument("--base_url", type=str, default="http://127.0.0.1:8000/v1", help="OpenAI-compatible base URL") + parser.add_argument("--api_key", type=str, default="dummy-key", help="OpenAI API key placeholder") + parser.add_argument("--model", type=str, default="gpt-image-1", help="Model name (for compatibility only)") + parser.add_argument("--mode", choices=["generate", "edit", "all"], default="generate", help="Test mode") + parser.add_argument("--prompt", type=str, default="a futuristic city at sunset", help="Prompt for generation") + parser.add_argument("--prompt_json", "--json", dest="prompt_json", type=str, default="", help="JSON file containing prompts for batch generation") + parser.add_argument("--edit_prompt", type=str, default="", help="Prompt for edit (defaults to --prompt)") + parser.add_argument("--seed", type=int, default=None, help="Optional generation seed") + parser.add_argument("--size", type=str, default="1024x1024", help="Image size, e.g. 1024x1024") + parser.add_argument("--response_format", choices=["url", "b64_json"], default="b64_json", help="OpenAI response format") + parser.add_argument("--image", type=str, default="", help="Input image path for edit mode") + parser.add_argument("--mask", type=str, default="", help="Optional mask image path for edit mode") + parser.add_argument("--output_dir", type=str, default="outputs/openai_images_test", help="Directory to save outputs") + parser.add_argument("--output_prefix", type=str, default="openai_generate", help="Batch output filename prefix") + parser.add_argument("--summary_file", type=str, default="", help="Batch timing summary file") + parser.add_argument("--stop_on_error", action="store_true", help="Stop batch mode after the first failed prompt") + args = parser.parse_args() + + if OpenAI is None: + raise RuntimeError("Missing dependency: openai. Please install it with `pip install openai`.") + + _ensure_local_no_proxy(args.base_url) + client = OpenAI(api_key=args.api_key, base_url=args.base_url) + + if args.prompt_json: + raise SystemExit(run_generate_batch(client, args)) + + output_results: list[ImageRequestResult] = [] + if args.mode in ("generate", "all"): + output_results.append(run_generate(client, args)) + if args.mode in ("edit", "all"): + output_results.append(run_edit(client, args)) + + for result in output_results: + print(f"[saved] {result.path}") + + +if __name__ == "__main__": + main()