-
Notifications
You must be signed in to change notification settings - Fork 216
Update sever. (Add threading.Condition for create_task) #1144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |||||||||||||||
| import re | ||||||||||||||||
| import time | ||||||||||||||||
| import uuid | ||||||||||||||||
| from datetime import datetime | ||||||||||||||||
| from pathlib import Path | ||||||||||||||||
| from typing import Literal, Optional | ||||||||||||||||
|
|
||||||||||||||||
|
|
@@ -17,6 +18,7 @@ | |||||||||||||||
| router = APIRouter() | ||||||||||||||||
|
|
||||||||||||||||
| _SIZE_PATTERN = re.compile(r"^\s*(\d+)\s*x\s*(\d+)\s*$", re.IGNORECASE) | ||||||||||||||||
| OPENAI_IMAGE_RESULT_POLL_INTERVAL_SECONDS = 0.2 | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| class OpenAIImageGenerationRequest(BaseModel): | ||||||||||||||||
|
|
@@ -52,7 +54,9 @@ def _shape_from_size(size: str) -> tuple[int, int]: | |||||||||||||||
|
|
||||||||||||||||
| async def _wait_task_result_png(task_id: str, timeout_seconds: int, poll_interval_seconds: float) -> bytes: | ||||||||||||||||
| start_time = time.monotonic() | ||||||||||||||||
| status_checks = 0 | ||||||||||||||||
| while True: | ||||||||||||||||
| status_checks += 1 | ||||||||||||||||
| task_status = task_manager.get_task_status(task_id) | ||||||||||||||||
| if not task_status: | ||||||||||||||||
| raise HTTPException(status_code=500, detail=f"Task status not found: {task_id}") | ||||||||||||||||
|
|
@@ -61,6 +65,16 @@ async def _wait_task_result_png(task_id: str, timeout_seconds: int, poll_interva | |||||||||||||||
| if status == TaskStatus.COMPLETED.value: | ||||||||||||||||
| result_png = task_manager.get_task_result_png(task_id) | ||||||||||||||||
| if result_png: | ||||||||||||||||
| wait_elapsed_ms = (time.monotonic() - start_time) * 1000 | ||||||||||||||||
| completion_observe_lag_ms = 0.0 | ||||||||||||||||
| end_time = task_status.get("end_time") | ||||||||||||||||
| if end_time: | ||||||||||||||||
| completion_observe_lag_ms = (datetime.now() - end_time).total_seconds() * 1000 | ||||||||||||||||
| logger.info( | ||||||||||||||||
| f"Task {task_id} OpenAI image wait_task_result cost total={wait_elapsed_ms:.2f} ms " | ||||||||||||||||
| f"completion_observe_lag={completion_observe_lag_ms:.2f} ms " | ||||||||||||||||
| f"poll_interval={poll_interval_seconds:.2f} s status_checks={status_checks}" | ||||||||||||||||
| ) | ||||||||||||||||
| return result_png | ||||||||||||||||
| raise HTTPException(status_code=500, detail=f"Task completed but no in-memory image found: {task_id}") | ||||||||||||||||
|
|
||||||||||||||||
|
|
@@ -89,28 +103,39 @@ async def _watch_client_disconnect(request: Request, task_id: str, poll_interval | |||||||||||||||
| async def _run_sync_image_task(request: Request, message: ImageTaskRequest) -> bytes: | ||||||||||||||||
| task_id = None | ||||||||||||||||
| timeout_seconds = 600 | ||||||||||||||||
| poll_interval_seconds = 0.5 | ||||||||||||||||
| poll_interval_seconds = OPENAI_IMAGE_RESULT_POLL_INTERVAL_SECONDS | ||||||||||||||||
|
|
||||||||||||||||
| try: | ||||||||||||||||
| message.prefer_memory_result = True | ||||||||||||||||
| create_task_start = time.perf_counter() | ||||||||||||||||
| task_id = task_manager.create_task(message) | ||||||||||||||||
| create_task_elapsed_ms = (time.perf_counter() - create_task_start) * 1000 | ||||||||||||||||
| message.task_id = task_id | ||||||||||||||||
| logger.info(f"Task {task_id} OpenAI image create_task cost {create_task_elapsed_ms:.2f} ms prompt_chars={len(message.prompt)} target_shape={message.target_shape}") | ||||||||||||||||
|
|
||||||||||||||||
| wait_task = asyncio.create_task(_wait_task_result_png(task_id, timeout_seconds, poll_interval_seconds)) | ||||||||||||||||
| disconnect_task = asyncio.create_task(_watch_client_disconnect(request, task_id)) | ||||||||||||||||
|
|
||||||||||||||||
| done, pending = await asyncio.wait({wait_task, disconnect_task}, return_when=asyncio.FIRST_COMPLETED) | ||||||||||||||||
| for pending_task in pending: | ||||||||||||||||
| pending_task.cancel() | ||||||||||||||||
| await asyncio.gather(*pending, return_exceptions=True) | ||||||||||||||||
|
|
||||||||||||||||
| if wait_task in done: | ||||||||||||||||
| result_png = wait_task.result() | ||||||||||||||||
| for pending_task in pending: | ||||||||||||||||
| pending_task.cancel() | ||||||||||||||||
| if pending: | ||||||||||||||||
| _, still_pending = await asyncio.wait(pending, timeout=0) | ||||||||||||||||
| if still_pending: | ||||||||||||||||
| logger.debug(f"Task {task_id} disconnect watcher cancellation is still pending") | ||||||||||||||||
| logger.info(f"Task {task_id} OpenAI image task result ready, building response") | ||||||||||||||||
| return result_png | ||||||||||||||||
|
|
||||||||||||||||
| if disconnect_task in done and disconnect_task.result(): | ||||||||||||||||
| if not wait_task.done(): | ||||||||||||||||
| wait_task.cancel() | ||||||||||||||||
| await asyncio.gather(wait_task, return_exceptions=True) | ||||||||||||||||
| await asyncio.wait({wait_task}, timeout=0) | ||||||||||||||||
|
Comment on lines
133
to
+135
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly, using
Suggested change
|
||||||||||||||||
| raise HTTPException(status_code=499, detail=f"Client disconnected, task {task_id} cancelled") | ||||||||||||||||
|
|
||||||||||||||||
| return wait_task.result() | ||||||||||||||||
| raise HTTPException(status_code=500, detail=f"Task {task_id} ended without image result") | ||||||||||||||||
| except RuntimeError as e: | ||||||||||||||||
| raise HTTPException(status_code=503, detail=str(e)) | ||||||||||||||||
| except HTTPException: | ||||||||||||||||
|
|
@@ -138,10 +163,25 @@ def _build_url_response(request: Request, task_id: str, image_bytes: bytes) -> s | |||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| def _build_openai_response(request: Request, task_id: str, image_bytes: bytes, response_format: Literal["url", "b64_json"]): | ||||||||||||||||
| total_start = time.perf_counter() | ||||||||||||||||
| if response_format == "b64_json": | ||||||||||||||||
| return OpenAIImageResponse(created=int(time.time()), data=[{"b64_json": base64.b64encode(image_bytes).decode("utf-8")}]) | ||||||||||||||||
|
|
||||||||||||||||
| return OpenAIImageResponse(created=int(time.time()), data=[{"url": _build_url_response(request, task_id, image_bytes)}]) | ||||||||||||||||
| base64_start = time.perf_counter() | ||||||||||||||||
| b64_json = base64.b64encode(image_bytes).decode("utf-8") | ||||||||||||||||
| base64_elapsed_ms = (time.perf_counter() - base64_start) * 1000 | ||||||||||||||||
| response = OpenAIImageResponse(created=int(time.time()), data=[{"b64_json": b64_json}]) | ||||||||||||||||
| total_elapsed_ms = (time.perf_counter() - total_start) * 1000 | ||||||||||||||||
| logger.info( | ||||||||||||||||
| f"Task {task_id} OpenAI image response build cost total={total_elapsed_ms:.2f} ms base64={base64_elapsed_ms:.2f} ms format=b64_json png_bytes={len(image_bytes)} b64_chars={len(b64_json)}" | ||||||||||||||||
| ) | ||||||||||||||||
| return response | ||||||||||||||||
|
|
||||||||||||||||
| url_start = time.perf_counter() | ||||||||||||||||
| url = _build_url_response(request, task_id, image_bytes) | ||||||||||||||||
| url_elapsed_ms = (time.perf_counter() - url_start) * 1000 | ||||||||||||||||
| response = OpenAIImageResponse(created=int(time.time()), data=[{"url": url}]) | ||||||||||||||||
| total_elapsed_ms = (time.perf_counter() - total_start) * 1000 | ||||||||||||||||
| logger.info(f"Task {task_id} OpenAI image response build cost total={total_elapsed_ms:.2f} ms url_write={url_elapsed_ms:.2f} ms format=url png_bytes={len(image_bytes)}") | ||||||||||||||||
| return response | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| def _build_image_task_request( | ||||||||||||||||
|
|
||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,20 @@ | ||||||||||||
| #!/bin/bash | ||||||||||||
|
|
||||||||||||
| lightx2v_path=/root/yongyang3/LightX2V | ||||||||||||
| test_json=/root/test.json | ||||||||||||
|
Comment on lines
+3
to
+4
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The script hardcodes absolute paths specific to a local environment ( Consider resolving the paths dynamically relative to the script's directory, or using environment variables with sensible defaults.
Suggested change
|
||||||||||||
| base_url=http://127.0.0.1:8000/v1 | ||||||||||||
| output_dir=${lightx2v_path}/save_results/hidream_o1_image_openai_test | ||||||||||||
|
|
||||||||||||
| export PYTHONPATH="${lightx2v_path}" | ||||||||||||
|
|
||||||||||||
| python "${lightx2v_path}/scripts/hidream_o1_image/test_openai_images_client.py" \ | ||||||||||||
| --base_url "${base_url}" \ | ||||||||||||
| --api_key "dummy-key" \ | ||||||||||||
| --model "gpt-image-1" \ | ||||||||||||
| --mode generate \ | ||||||||||||
| --prompt_json "${test_json}" \ | ||||||||||||
| --seed 42 \ | ||||||||||||
| --size "2048x2048" \ | ||||||||||||
| --response_format "b64_json" \ | ||||||||||||
| --output_dir "${output_dir}" \ | ||||||||||||
| --output_prefix "hidream_o1_image_openai" | ||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using
asyncio.wait(pending, timeout=0)does not actually await the cancellation of the pending tasks, which can lead to task leaks and "Task was destroyed but it is pending!" warnings in the event loop. Since_watch_client_disconnectis a simple loop that yields control viaasyncio.sleep, awaiting its cancellation is extremely fast and safe.It is highly recommended to use the established pattern of
await asyncio.gather(*pending, return_exceptions=True)to ensure proper cleanup of cancelled tasks.