diff --git a/astrbot/core/message/components.py b/astrbot/core/message/components.py index 872b0cc27a..5f2616d8ca 100644 --- a/astrbot/core/message/components.py +++ b/astrbot/core/message/components.py @@ -26,6 +26,7 @@ import json import os import sys +import urllib.parse import uuid from enum import Enum from pathlib import Path, PurePosixPath @@ -140,6 +141,55 @@ def fromURL(url: str, **_): def fromBase64(bs64_data: str, **_): return Record(file=f"base64://{bs64_data}", **_) + @staticmethod + def _decode_file_uri(uri: str) -> str: + """解码 file:/// URI 为本地文件路径。 + + file:///C:/Users/... → C:/Users/... (Windows) + file:///home/user/... → /home/user/... (Linux) + 其中的 URL 编码(如 %20 空格)也会被解码。 + """ + path = uri.removeprefix("file:///") + path = urllib.parse.unquote(path) + return path + + async def _resolve_file_source(self) -> str: + """选择可用的文件源。 + + NapCat 在 Windows 上可能只给 file 字段一个裸文件名(如 0d2bb1468a87d64414f8e563cc61c33c.amr), + 而真实路径在 url(如 file:///C:/Users/...)或 path(如 C:\\Users\\...)中。 + Image.convert_to_file_path 使用 self.url or self.file,Record 同样需要 fallback。 + """ + # 1) 优先尝试 file:如果它已包含完整 URI 或已知格式,直接使用 + if self.file: + if ( + self.file.startswith("file:///") + or self.file.startswith("http") + or self.file.startswith("base64://") + or os.path.exists(self.file) + ): + return self.file + + # 2) 尝试 url(可能是 file:/// 或 http 链接) + if self.url: + if ( + self.url.startswith("file:///") + or self.url.startswith("http") + or os.path.exists(self.url) + or ( + self.url.startswith("file:///") + and os.path.exists(self._decode_file_uri(self.url)) + ) + ): + return self.url + + # 3) 尝试 path(可能是 Windows 绝对路径如 C:\Users\...) + if self.path and os.path.exists(self.path): + return self.path + + # 4) 最后裸返回 file(即使不行也要让调用方看到原始内容) + return self.file or self.url or "" + async def convert_to_file_path(self) -> str: """将这个语音统一转换为本地文件路径。这个方法避免了手动判断语音数据类型,直接返回语音数据的本地路径(如果是网络 URL, 则会自动进行下载)。 @@ -147,24 +197,25 @@ async def convert_to_file_path(self) -> str: str: 语音的本地路径,以绝对路径表示。 """ - if not self.file: + file_source = await self._resolve_file_source() + if not file_source: raise Exception(f"not a valid file: {self.file}") - if self.file.startswith("file:///"): - return self.file[8:] - if self.file.startswith("http"): - file_path = await download_image_by_url(self.file) + if file_source.startswith("file:///"): + return self._decode_file_uri(file_source) + if file_source.startswith("http"): + file_path = await download_image_by_url(file_source) return os.path.abspath(file_path) - if self.file.startswith("base64://"): - bs64_data = self.file.removeprefix("base64://") + if file_source.startswith("base64://"): + bs64_data = file_source.removeprefix("base64://") image_bytes = base64.b64decode(bs64_data) file_path = os.path.join( - get_astrbot_temp_path(), f"recordseg_{uuid.uuid4()}.jpg" + get_astrbot_temp_path(), f"recordseg_{uuid.uuid4()}.wav" ) with open(file_path, "wb") as f: f.write(image_bytes) return os.path.abspath(file_path) - if os.path.exists(self.file): - return os.path.abspath(self.file) + if os.path.exists(file_source): + return os.path.abspath(file_source) raise Exception(f"not a valid file: {self.file}") async def convert_to_base64(self) -> str: @@ -174,18 +225,18 @@ async def convert_to_base64(self) -> str: str: 语音的 base64 编码,不以 base64:// 或者 data:image/jpeg;base64, 开头。 """ - # convert to base64 - if not self.file: + file_source = await self._resolve_file_source() + if not file_source: raise Exception(f"not a valid file: {self.file}") - if self.file.startswith("file:///"): - bs64_data = file_to_base64(self.file[8:]) - elif self.file.startswith("http"): - file_path = await download_image_by_url(self.file) + if file_source.startswith("file:///"): + bs64_data = file_to_base64(self._decode_file_uri(file_source)) + elif file_source.startswith("http"): + file_path = await download_image_by_url(file_source) bs64_data = file_to_base64(file_path) - elif self.file.startswith("base64://"): - bs64_data = self.file - elif os.path.exists(self.file): - bs64_data = file_to_base64(self.file) + elif file_source.startswith("base64://"): + bs64_data = file_source + elif os.path.exists(file_source): + bs64_data = file_to_base64(file_source) else: raise Exception(f"not a valid file: {self.file}") bs64_data = bs64_data.removeprefix("base64://")