Enhance processing of Record components in Replies#8524
Enhance processing of Record components in Replies#8524tjc6666666666666 wants to merge 1 commit into
Conversation
Added support for processing Record components within Reply chains for WAV conversion and STT.
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- The STT logic for
Recordcomponents is now duplicated for top-level messages andReplychains; consider extracting a shared helper that handles retries, logging, and message-chain mutation to reduce repetition and keep behavior consistent. - Similarly, the WAV conversion for
Recordcomponents in the main message list and inReplychains is conceptually the same; factoring this into a reusable function would make it easier to maintain and extend the audio-processing pipeline. - When updating
event.message_strandevent.message_obj.message_strfor reply-chain STT results, double-check whether you need to distinguish between content from the main message and from quoted messages, e.g., by adding separators or metadata, to avoid confusing concatenated text.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The STT logic for `Record` components is now duplicated for top-level messages and `Reply` chains; consider extracting a shared helper that handles retries, logging, and message-chain mutation to reduce repetition and keep behavior consistent.
- Similarly, the WAV conversion for `Record` components in the main message list and in `Reply` chains is conceptually the same; factoring this into a reusable function would make it easier to maintain and extend the audio-processing pipeline.
- When updating `event.message_str` and `event.message_obj.message_str` for reply-chain STT results, double-check whether you need to distinguish between content from the main message and from quoted messages, e.g., by adding separators or metadata, to avoid confusing concatenated text.
## Individual Comments
### Comment 1
<location path="astrbot/core/pipeline/preprocess_stage/stage.py" line_range="134-143" />
<code_context>
+ logger.warning(f"重试中: {i + 1}/{retry}")
+ await asyncio.sleep(0.5)
+ continue
+ except BaseException as e:
+ logger.error(traceback.format_exc())
+ logger.error(f"语音转文本(引用消息)失败: {e}")
+ break
</code_context>
<issue_to_address>
**issue (bug_risk):** Catching BaseException may unintentionally swallow important exceptions like CancelledError; consider narrowing to Exception.
Here you’re catching `BaseException`, which also includes `asyncio.CancelledError`, `KeyboardInterrupt`, and `SystemExit`. In this async context that can break proper task cancellation and shutdown. Narrow this to `except Exception as e:` so fatal conditions can propagate while still handling STT-related runtime errors.
</issue_to_address>
### Comment 2
<location path="astrbot/core/pipeline/preprocess_stage/stage.py" line_range="83" />
<code_context>
except Exception as e:
logger.warning(f"Voice processing failed: {e}")
+ # Also process Record components inside Reply chains (wav conversion)
+ for component in event.get_messages():
+ if isinstance(component, Reply) and component.chain:
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting shared helpers for record normalization, STT with retries, and optional record iteration to remove duplicated logic between top-level and reply-chain handling.
You can significantly reduce duplication by:
1. Extracting a reusable helper for “normalize `Record` to wav”.
2. Extracting a reusable helper for “run STT with retries on a `Record` and write result back”.
3. Optionally, having a small iterator that yields all `Record` components (top-level and inside `Reply.chain`) with their containers and indexes.
### 1. Normalize `Record` to wav (shared for top-level + reply chain)
```python
async def _normalize_record_to_wav(self, event: AstrMessageEvent, record: Record) -> None:
try:
original_path = await record.convert_to_file_path()
record_path = await ensure_wav(original_path)
if record_path != original_path:
event.track_temporary_local_file(record_path)
record.file = record_path
record.path = record_path
except Exception as e:
logger.warning(f"Voice processing failed: {e}")
```
Usage for both places becomes:
```python
# top-level
message_chain = event.get_messages()
for idx, component in enumerate(message_chain):
if isinstance(component, Record):
await self._normalize_record_to_wav(event, component)
message_chain[idx] = component
# reply chain
for component in event.get_messages():
if isinstance(component, Reply) and component.chain:
for idx, reply_comp in enumerate(component.chain):
if isinstance(reply_comp, Record):
await self._normalize_record_to_wav(event, reply_comp)
component.chain[idx] = reply_comp
```
### 2. STT helper with retry (shared for top-level + reply chain)
```python
async def _stt_on_record(
self,
stt_provider,
record: Record,
*,
log_prefix: str,
) -> str | None:
try:
path = await record.convert_to_file_path()
except Exception as e:
logger.warning(f"{log_prefix} 获取语音路径失败: {e}")
return None
retry = 5
for i in range(retry):
try:
result = await stt_provider.get_text(audio_url=path)
if result:
logger.info(f"{log_prefix} 语音转文本结果: {result}")
return result
except FileNotFoundError as e:
logger.warning(e)
logger.warning(f"{log_prefix} 重试中: {i + 1}/{retry}")
await asyncio.sleep(0.5)
continue
except BaseException as e:
logger.error(traceback.format_exc())
logger.error(f"{log_prefix} 语音转文本失败: {e}")
break
return None
```
Then the STT logic shrinks to:
```python
message_chain = event.get_messages()
for idx, component in enumerate(message_chain):
if not isinstance(component, Record):
continue
result = await self._stt_on_record(stt_provider, component, log_prefix="")
if result:
message_chain[idx] = Plain(result)
event.message_str += result
event.message_obj.message_str += result
for component in event.get_messages():
if not isinstance(component, Reply) or not component.chain:
continue
for idx, reply_comp in enumerate(component.chain):
if not isinstance(reply_comp, Record):
continue
result = await self._stt_on_record(stt_provider, reply_comp, log_prefix="引用消息")
if result:
component.chain[idx] = Plain(result)
event.message_str += result
event.message_obj.message_str += result
```
This keeps behavior (including different log messages) but removes duplicated `convert_to_file_path`, retry loops, and error handling.
### 3. Optional: single iterator over all `Record` components
If you want to further simplify traversal:
```python
def _iter_records(self, event: AstrMessageEvent):
# top-level
message_chain = event.get_messages()
for idx, comp in enumerate(message_chain):
if isinstance(comp, Record):
yield ("top", message_chain, idx, comp)
# reply chains
for reply in event.get_messages():
if isinstance(reply, Reply) and reply.chain:
for idx, comp in enumerate(reply.chain):
if isinstance(comp, Record):
yield ("reply", reply.chain, idx, comp)
```
Then both wav normalization and STT can operate over `_iter_records(...)`, deciding behavior based on the `"top"` vs `"reply"` flag, which keeps `PreProcessStage` flatter and easier to reason about.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| except BaseException as e: | ||
| logger.error(traceback.format_exc()) | ||
| logger.error(f"语音转文本失败: {e}") | ||
| break | ||
|
|
||
| # Also STT for Record components inside Reply chains | ||
| for component in event.get_messages(): | ||
| if isinstance(component, Reply) and component.chain: | ||
| for idx, reply_comp in enumerate(component.chain): | ||
| if not isinstance(reply_comp, Record): |
There was a problem hiding this comment.
issue (bug_risk): Catching BaseException may unintentionally swallow important exceptions like CancelledError; consider narrowing to Exception.
Here you’re catching BaseException, which also includes asyncio.CancelledError, KeyboardInterrupt, and SystemExit. In this async context that can break proper task cancellation and shutdown. Narrow this to except Exception as e: so fatal conditions can propagate while still handling STT-related runtime errors.
| except Exception as e: | ||
| logger.warning(f"Voice processing failed: {e}") | ||
|
|
||
| # Also process Record components inside Reply chains (wav conversion) |
There was a problem hiding this comment.
issue (complexity): Consider extracting shared helpers for record normalization, STT with retries, and optional record iteration to remove duplicated logic between top-level and reply-chain handling.
You can significantly reduce duplication by:
- Extracting a reusable helper for “normalize
Recordto wav”. - Extracting a reusable helper for “run STT with retries on a
Recordand write result back”. - Optionally, having a small iterator that yields all
Recordcomponents (top-level and insideReply.chain) with their containers and indexes.
1. Normalize Record to wav (shared for top-level + reply chain)
async def _normalize_record_to_wav(self, event: AstrMessageEvent, record: Record) -> None:
try:
original_path = await record.convert_to_file_path()
record_path = await ensure_wav(original_path)
if record_path != original_path:
event.track_temporary_local_file(record_path)
record.file = record_path
record.path = record_path
except Exception as e:
logger.warning(f"Voice processing failed: {e}")Usage for both places becomes:
# top-level
message_chain = event.get_messages()
for idx, component in enumerate(message_chain):
if isinstance(component, Record):
await self._normalize_record_to_wav(event, component)
message_chain[idx] = component
# reply chain
for component in event.get_messages():
if isinstance(component, Reply) and component.chain:
for idx, reply_comp in enumerate(component.chain):
if isinstance(reply_comp, Record):
await self._normalize_record_to_wav(event, reply_comp)
component.chain[idx] = reply_comp2. STT helper with retry (shared for top-level + reply chain)
async def _stt_on_record(
self,
stt_provider,
record: Record,
*,
log_prefix: str,
) -> str | None:
try:
path = await record.convert_to_file_path()
except Exception as e:
logger.warning(f"{log_prefix} 获取语音路径失败: {e}")
return None
retry = 5
for i in range(retry):
try:
result = await stt_provider.get_text(audio_url=path)
if result:
logger.info(f"{log_prefix} 语音转文本结果: {result}")
return result
except FileNotFoundError as e:
logger.warning(e)
logger.warning(f"{log_prefix} 重试中: {i + 1}/{retry}")
await asyncio.sleep(0.5)
continue
except BaseException as e:
logger.error(traceback.format_exc())
logger.error(f"{log_prefix} 语音转文本失败: {e}")
break
return NoneThen the STT logic shrinks to:
message_chain = event.get_messages()
for idx, component in enumerate(message_chain):
if not isinstance(component, Record):
continue
result = await self._stt_on_record(stt_provider, component, log_prefix="")
if result:
message_chain[idx] = Plain(result)
event.message_str += result
event.message_obj.message_str += result
for component in event.get_messages():
if not isinstance(component, Reply) or not component.chain:
continue
for idx, reply_comp in enumerate(component.chain):
if not isinstance(reply_comp, Record):
continue
result = await self._stt_on_record(stt_provider, reply_comp, log_prefix="引用消息")
if result:
component.chain[idx] = Plain(result)
event.message_str += result
event.message_obj.message_str += resultThis keeps behavior (including different log messages) but removes duplicated convert_to_file_path, retry loops, and error handling.
3. Optional: single iterator over all Record components
If you want to further simplify traversal:
def _iter_records(self, event: AstrMessageEvent):
# top-level
message_chain = event.get_messages()
for idx, comp in enumerate(message_chain):
if isinstance(comp, Record):
yield ("top", message_chain, idx, comp)
# reply chains
for reply in event.get_messages():
if isinstance(reply, Reply) and reply.chain:
for idx, comp in enumerate(reply.chain):
if isinstance(comp, Record):
yield ("reply", reply.chain, idx, comp)Then both wav normalization and STT can operate over _iter_records(...), deciding behavior based on the "top" vs "reply" flag, which keeps PreProcessStage flatter and easier to reason about.
There was a problem hiding this comment.
Code Review
This pull request introduces voice processing (wav conversion) and speech-to-text (STT) support for Record components nested inside Reply chains. The review feedback highlights a correctness bug where STT results of quoted messages are incorrectly appended to the main event's message string instead of updating the reply component itself. Additionally, the feedback recommends refactoring duplicated voice processing and STT logic into shared helper functions to improve maintainability.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| for idx, component in enumerate(message_chain): | ||
| if isinstance(component, Record): | ||
| if not isinstance(component, Record): | ||
| continue | ||
| try: | ||
| path = await component.convert_to_file_path() | ||
| retry = 5 | ||
| for i in range(retry): | ||
| except Exception as e: | ||
| logger.warning(f"获取语音路径失败: {e}") | ||
| continue | ||
| retry = 5 | ||
| for i in range(retry): | ||
| try: | ||
| result = await stt_provider.get_text(audio_url=path) | ||
| if result: | ||
| logger.info("语音转文本结果: " + result) | ||
| message_chain[idx] = Plain(result) | ||
| event.message_str += result | ||
| event.message_obj.message_str += result | ||
| break | ||
| except FileNotFoundError as e: | ||
| # napcat workaround | ||
| logger.warning(e) | ||
| logger.warning(f"重试中: {i + 1}/{retry}") | ||
| await asyncio.sleep(0.5) | ||
| continue | ||
| except BaseException as e: | ||
| logger.error(traceback.format_exc()) | ||
| logger.error(f"语音转文本失败: {e}") | ||
| break | ||
|
|
||
| # Also STT for Record components inside Reply chains | ||
| for component in event.get_messages(): | ||
| if isinstance(component, Reply) and component.chain: | ||
| for idx, reply_comp in enumerate(component.chain): | ||
| if not isinstance(reply_comp, Record): | ||
| continue | ||
| try: | ||
| result = await stt_provider.get_text(audio_url=path) | ||
| if result: | ||
| logger.info("语音转文本结果: " + result) | ||
| message_chain[idx] = Plain(result) | ||
| event.message_str += result | ||
| event.message_obj.message_str += result | ||
| break | ||
| except FileNotFoundError as e: | ||
| # napcat workaround | ||
| logger.warning(e) | ||
| logger.warning(f"重试中: {i + 1}/{retry}") | ||
| await asyncio.sleep(0.5) | ||
| path = await reply_comp.convert_to_file_path() | ||
| except Exception as e: | ||
| logger.warning(f"获取引用消息语音路径失败: {e}") | ||
| continue | ||
| except BaseException as e: | ||
| logger.error(traceback.format_exc()) | ||
| logger.error(f"语音转文本失败: {e}") | ||
| break | ||
| retry = 5 | ||
| for i in range(retry): | ||
| try: | ||
| result = await stt_provider.get_text(audio_url=path) | ||
| if result: | ||
| logger.info("语音转文本(引用消息)结果: " + result) | ||
| component.chain[idx] = Plain(result) | ||
| event.message_str += result | ||
| event.message_obj.message_str += result | ||
| break | ||
| except FileNotFoundError as e: | ||
| logger.warning(f"文件不存在: {path}") | ||
| logger.warning(f"重试中: {i + 1}/{retry}") | ||
| await asyncio.sleep(0.5) | ||
| continue | ||
| except BaseException as e: | ||
| logger.error(traceback.format_exc()) | ||
| logger.error(f"语音转文本(引用消息)失败: {e}") | ||
| break |
There was a problem hiding this comment.
There are two main issues in this block:
- Code Duplication: The speech-to-text (STT) logic for direct records and records inside reply chains is almost identical, violating the general rule of refactoring similar functionality into a shared helper function.
- Correctness Bug: Appending the STT result of a quoted message (inside a
Replychain) toevent.message_strandevent.message_obj.message_strincorrectly pollutes the current message's content with the quoted message's content. Instead, it should update themessage_strof theReplycomponent itself.
We can resolve both issues by extracting a helper function perform_stt and correctly updating component.message_str for reply components.
async def perform_stt(record_component: Record, context_desc: str = "") -> str | None:
try:
path = await record_component.convert_to_file_path()
except Exception as e:
logger.warning(f"获取{context_desc or '语音'}路径失败: {e}")
return None
retry = 5
for i in range(retry):
try:
result = await stt_provider.get_text(audio_url=path)
if result:
logger.info(f"语音转文本{f'({context_desc})' if context_desc else ''}结果: {result}")
return result
return None
except FileNotFoundError as e:
logger.warning(e)
logger.warning(f"重试中: {i + 1}/{retry}")
await asyncio.sleep(0.5)
continue
except BaseException as e:
logger.error(traceback.format_exc())
logger.error(f"语音转文本{f'({context_desc})' if context_desc else ''}失败: {e}")
break
return None
for idx, component in enumerate(message_chain):
if isinstance(component, Record):
result = await perform_stt(component)
if result:
message_chain[idx] = Plain(result)
event.message_str += result
event.message_obj.message_str += result
# Also STT for Record components inside Reply chains
for component in event.get_messages():
if isinstance(component, Reply) and component.chain:
for idx, reply_comp in enumerate(component.chain):
if isinstance(reply_comp, Record):
result = await perform_stt(reply_comp, "引用消息")
if result:
component.chain[idx] = Plain(result)
if component.message_str is None:
component.message_str = ""
component.message_str += resultReferences
- When implementing similar functionality for different cases (e.g., direct vs. quoted attachments), refactor the logic into a shared helper function to avoid code duplication.
| # Also process Record components inside Reply chains (wav conversion) | ||
| for component in event.get_messages(): | ||
| if isinstance(component, Reply) and component.chain: | ||
| for idx, reply_comp in enumerate(component.chain): | ||
| if isinstance(reply_comp, Record): | ||
| try: | ||
| original_path = await reply_comp.convert_to_file_path() | ||
| record_path = await ensure_wav(original_path) | ||
| if record_path != original_path: | ||
| event.track_temporary_local_file(record_path) | ||
| reply_comp.file = record_path | ||
| reply_comp.path = record_path | ||
| component.chain[idx] = reply_comp | ||
| except Exception as e: | ||
| logger.warning(f"Voice processing in reply chain failed: {e}") |
There was a problem hiding this comment.
The voice processing logic (wav conversion) for records inside reply chains is duplicated with the logic for direct records (lines 70-81). Consider refactoring this logic into a shared helper function to adhere to the repository's general rules and improve maintainability.
References
- When implementing similar functionality for different cases (e.g., direct vs. quoted attachments), refactor the logic into a shared helper function to avoid code duplication.
Added support for processing Record components within Reply chains for WAV conversion and STT.
Modifications / 改动点
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Handle voice records contained in reply chains during preprocessing, including WAV conversion and speech-to-text, and improve robustness of STT processing for direct records.
New Features:
Bug Fixes: