From edf389e7f1fd60a72c41b8b30d2cde937e1e4617 Mon Sep 17 00:00:00 2001 From: tjc66666666 <3428979959@qq.com> Date: Wed, 3 Jun 2026 00:52:07 +0800 Subject: [PATCH] Enhance file source resolution and decoding methods Refactor file handling to resolve sources more effectively, including decoding file URIs and handling base64 data. Update methods to ensure compatibility with different file formats and paths. --- astrbot/core/message/components.py | 91 +++++++++++++++++++++++------- 1 file changed, 71 insertions(+), 20 deletions(-) diff --git a/astrbot/core/message/components.py b/astrbot/core/message/components.py index 872b0cc27a..5f2616d8ca 100644 --- a/astrbot/core/message/components.py +++ b/astrbot/core/message/components.py @@ -26,6 +26,7 @@ import json import os import sys +import urllib.parse import uuid from enum import Enum from pathlib import Path, PurePosixPath @@ -140,6 +141,55 @@ def fromURL(url: str, **_): def fromBase64(bs64_data: str, **_): return Record(file=f"base64://{bs64_data}", **_) + @staticmethod + def _decode_file_uri(uri: str) -> str: + """解码 file:/// URI 为本地文件路径。 + + file:///C:/Users/... → C:/Users/... (Windows) + file:///home/user/... → /home/user/... (Linux) + 其中的 URL 编码(如 %20 空格)也会被解码。 + """ + path = uri.removeprefix("file:///") + path = urllib.parse.unquote(path) + return path + + async def _resolve_file_source(self) -> str: + """选择可用的文件源。 + + NapCat 在 Windows 上可能只给 file 字段一个裸文件名(如 0d2bb1468a87d64414f8e563cc61c33c.amr), + 而真实路径在 url(如 file:///C:/Users/...)或 path(如 C:\\Users\\...)中。 + Image.convert_to_file_path 使用 self.url or self.file,Record 同样需要 fallback。 + """ + # 1) 优先尝试 file:如果它已包含完整 URI 或已知格式,直接使用 + if self.file: + if ( + self.file.startswith("file:///") + or self.file.startswith("http") + or self.file.startswith("base64://") + or os.path.exists(self.file) + ): + return self.file + + # 2) 尝试 url(可能是 file:/// 或 http 链接) + if self.url: + if ( + self.url.startswith("file:///") + or self.url.startswith("http") + or os.path.exists(self.url) + or ( + self.url.startswith("file:///") + and os.path.exists(self._decode_file_uri(self.url)) + ) + ): + return self.url + + # 3) 尝试 path(可能是 Windows 绝对路径如 C:\Users\...) + if self.path and os.path.exists(self.path): + return self.path + + # 4) 最后裸返回 file(即使不行也要让调用方看到原始内容) + return self.file or self.url or "" + async def convert_to_file_path(self) -> str: """将这个语音统一转换为本地文件路径。这个方法避免了手动判断语音数据类型,直接返回语音数据的本地路径(如果是网络 URL, 则会自动进行下载)。 @@ -147,24 +197,25 @@ async def convert_to_file_path(self) -> str: str: 语音的本地路径,以绝对路径表示。 """ - if not self.file: + file_source = await self._resolve_file_source() + if not file_source: raise Exception(f"not a valid file: {self.file}") - if self.file.startswith("file:///"): - return self.file[8:] - if self.file.startswith("http"): - file_path = await download_image_by_url(self.file) + if file_source.startswith("file:///"): + return self._decode_file_uri(file_source) + if file_source.startswith("http"): + file_path = await download_image_by_url(file_source) return os.path.abspath(file_path) - if self.file.startswith("base64://"): - bs64_data = self.file.removeprefix("base64://") + if file_source.startswith("base64://"): + bs64_data = file_source.removeprefix("base64://") image_bytes = base64.b64decode(bs64_data) file_path = os.path.join( - get_astrbot_temp_path(), f"recordseg_{uuid.uuid4()}.jpg" + get_astrbot_temp_path(), f"recordseg_{uuid.uuid4()}.wav" ) with open(file_path, "wb") as f: f.write(image_bytes) return os.path.abspath(file_path) - if os.path.exists(self.file): - return os.path.abspath(self.file) + if os.path.exists(file_source): + return os.path.abspath(file_source) raise Exception(f"not a valid file: {self.file}") async def convert_to_base64(self) -> str: @@ -174,18 +225,18 @@ async def convert_to_base64(self) -> str: str: 语音的 base64 编码,不以 base64:// 或者 data:image/jpeg;base64, 开头。 """ - # convert to base64 - if not self.file: + file_source = await self._resolve_file_source() + if not file_source: raise Exception(f"not a valid file: {self.file}") - if self.file.startswith("file:///"): - bs64_data = file_to_base64(self.file[8:]) - elif self.file.startswith("http"): - file_path = await download_image_by_url(self.file) + if file_source.startswith("file:///"): + bs64_data = file_to_base64(self._decode_file_uri(file_source)) + elif file_source.startswith("http"): + file_path = await download_image_by_url(file_source) bs64_data = file_to_base64(file_path) - elif self.file.startswith("base64://"): - bs64_data = self.file - elif os.path.exists(self.file): - bs64_data = file_to_base64(self.file) + elif file_source.startswith("base64://"): + bs64_data = file_source + elif os.path.exists(file_source): + bs64_data = file_to_base64(file_source) else: raise Exception(f"not a valid file: {self.file}") bs64_data = bs64_data.removeprefix("base64://")