fix: 修复qqofficial上传图片超时问题 - 改用分片上传#7176
Conversation
There was a problem hiding this comment.
Hey - I've found 5 issues, and left some high level feedback:
- The new
chunk_textimplementation has fairly complex overlap logic (start = breakpoint - overlap if breakpoint > overlap else startwith an extraif start <= chunks[-1].find(...)check) that looks error‑prone and may cause incorrect or even infinite loops; consider simplifying this function and adding explicit bounds checks sostartalways strictly increases. MessageReplyLimitercurrently usesprintfor logging and a module‑level_global_limiterwithout any locking; it would be more consistent and safer to use the sharedloggerand avoid bare globals (e.g. by encapsulating the singleton or making it explicitly per‑process).- Some of the new upload helpers (e.g.
_base64_upload, the upload cache inchunked_upload.py) appear unused or only partially integrated after switching to always chunked uploads; consider trimming or clearly marking unused paths to keep the upload logic easier to reason about.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new `chunk_text` implementation has fairly complex overlap logic (`start = breakpoint - overlap if breakpoint > overlap else start` with an extra `if start <= chunks[-1].find(...)` check) that looks error‑prone and may cause incorrect or even infinite loops; consider simplifying this function and adding explicit bounds checks so `start` always strictly increases.
- `MessageReplyLimiter` currently uses `print` for logging and a module‑level `_global_limiter` without any locking; it would be more consistent and safer to use the shared `logger` and avoid bare globals (e.g. by encapsulating the singleton or making it explicitly per‑process).
- Some of the new upload helpers (e.g. `_base64_upload`, the upload cache in `chunked_upload.py`) appear unused or only partially integrated after switching to always chunked uploads; consider trimming or clearly marking unused paths to keep the upload logic easier to reason about.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="75-84" />
<code_context>
+def chunk_text(text: str, limit: int = TEXT_CHUNK_LIMIT, overlap: int = TEXT_CHUNK_OVERLAP) -> List[str]:
</code_context>
<issue_to_address>
**issue (bug_risk):** chunk_text 的游标更新逻辑存在潜在死循环/重复块风险
当前 while 末尾这段逻辑:
```python
start = breakpoint - overlap if breakpoint > overlap else start
if start <= chunks[-1].find(text[breakpoint - 1] if breakpoint > 0 else ''):
start = breakpoint
```
在 `chunks[-1].find(...)` 返回 -1 等情况下,`start` 可能保持不变,导致下一轮循环仍从同一位置开始,出现重复块甚至死循环;同时用 `find` 决定是否前进游标也较难推理。
建议改为始终按一个确定规则推进游标,例如:
```python
start = max(breakpoint - overlap, start + 1)
```
或如果不需要重叠则直接 `start = breakpoint`。请重新推演边界条件,确保每次循环 `start` 都单调前进,避免死循环/重复块。
</issue_to_address>
### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="370-371" />
<code_context>
- # C2C 流式仅用于文本分片,富媒体时降级为普通发送,避免平台侧流式校验报错。
- if stream and (image_base64 or record_file_path):
+ # C2C 流式仅用于文本分片,富媒体时降级为普通发送
+ if stream and image_source:
logger.debug("[QQOfficial] 检测到富媒体,降级为非流式发送。")
stream = None
</code_context>
<issue_to_address>
**issue (bug_risk):** 流式 C2C 降级条件只考虑图片,未覆盖语音/视频等富媒体
原先是 `if stream and (image_base64 or record_file_path):`,现在改成 `if stream and image_source:`,只考虑图片。这样在有语音、视频或文件附件时仍会走 C2C 流式路径,与“富媒体时降级为普通发送”的注释不一致。如果 QQ 仍不支持富媒体+流式组合,可能导致平台报错或未定义行为。
建议恢复为覆盖所有富媒体类型,例如:
```python
if stream and (image_source or record_file_path or video_file_source or file_source):
logger.debug("[QQOfficial] 检测到富媒体,降级为非流式发送。")
stream = None
```
以保持与注释以及原有行为一致。
</issue_to_address>
### Comment 3
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="240-241" />
<code_context>
- if image_path:
- payload["file_image"] = image_path
- # Guild text-channel send API (/channels/{channel_id}/messages) does not use v2 msg_type.
+ if image_source and os.path.exists(image_source):
+ payload["file_image"] = image_source
payload.pop("msg_type", None)
ret = await self._send_with_markdown_fallback(
</code_context>
<issue_to_address>
**issue (bug_risk):** 频道消息仅在本地存在文件时发送图片,URL 图片会被静默忽略
`image_source` 在 `_parse_to_qqofficial` 中对远程图片是 URL 字符串而非本地路径,因此这里用 `os.path.exists(image_source)` 时,URL 场景会恒为 False,导致频道消息不再发送图片,相比之前始终使用 `image_path` 有所退化。
如果频道消息需要继续支持 URL 图片,建议分支处理本地路径与 URL,例如:
```python
if image_source:
if os.path.exists(image_source):
payload["file_image"] = image_source
elif image_source.startswith("http"):
payload["image_url"] = image_source # 视 QQ SDK 支持方式而定
```
或者在上游保证频道场景始终传入本地路径而不是 URL。
</issue_to_address>
### Comment 4
<location path="astrbot/core/platform/sources/qqofficial/rate_limiter.py" line_range="138" />
<code_context>
+
+ record = self._tracker.get(message_id)
+ if record:
+ print(f"[QQOfficial] recordReply: {message_id}, count={record.count}")
+
+ def get_stats(self) -> Dict[str, int]:
</code_context>
<issue_to_address>
**nitpick:** 限流器中使用 print 而非 logger 可能干扰日志体系
在 `MessageReplyLimiter.record_reply` 中的直接 `print`:
```python
print(f"[QQOfficial] recordReply: {message_id}, count={record.count}")
```
会绕过统一的日志配置,并在高并发时污染标准输出。建议改为使用项目统一的日志记录器(如 `logger.debug(...)`),或在不再需要时删除该调试输出。
</issue_to_address>
### Comment 5
<location path="astrbot/core/platform/sources/qqofficial/chunked_upload.py" line_range="723" />
<code_context>
+ raise last_error or RuntimeError("Upload failed")
+
+
+async def chunked_upload_c2c(
+ http_client: QQBotHttpClient,
+ user_id: str,
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the chunked upload and retry logic into shared helpers to remove duplicated code and make the flow easier to understand and maintain.
You can keep the current functionality but significantly reduce complexity and duplication with a couple of small refactors.
### 1. Collapse `chunked_upload_c2c` / `chunked_upload_group` into a shared core
Both functions are almost identical aside from which HTTP methods they call and how they log. You can hide that behind a small strategy object or callbacks and keep the public API unchanged:
```python
async def _chunked_upload(
*,
http_client: QQBotHttpClient,
target_id: str,
file_path: str,
file_type: int,
prepare_fn: Callable[..., Awaitable[UploadPrepareResponse]],
part_finish_fn: Callable[..., Awaitable[None]],
complete_fn: Callable[..., Awaitable[MediaUploadResponse]],
on_progress: Optional[Callable[[ChunkedUploadProgress], None]] = None,
log_prefix: str = "[chunked]",
) -> MediaUploadResponse:
prefix = log_prefix
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
logger.info(f"{prefix} Starting chunked upload: file={file_name}, size={file_size}, type={file_type}")
hashes = await compute_file_hashes(file_path, file_size)
try:
prepare_resp = await prepare_fn(target_id, file_type, file_name, file_size, hashes)
except ApiError as e:
if e.biz_code == UPLOAD_PREPARE_FALLBACK_CODE:
raise UploadDailyLimitExceededError(file_path, file_size, str(e))
raise
upload_id = prepare_resp.upload_id
block_size = prepare_resp.block_size
parts = prepare_resp.parts
concurrency = min(prepare_resp.concurrency or DEFAULT_CONCURRENT_PARTS, MAX_CONCURRENT_PARTS)
retry_timeout_ms = prepare_resp.retry_timeout * 1000 if prepare_resp.retry_timeout else None
completed_parts = 0
uploaded_bytes = 0
async def upload_part(part: UploadPart) -> None:
nonlocal completed_parts, uploaded_bytes
part_index = part.index
offset = (part_index - 1) * block_size
length = min(block_size, file_size - offset)
part_data = read_file_chunk(file_path, offset, length)
part_md5 = hashlib.md5(part_data).hexdigest()
await put_to_presigned_url(part.presigned_url, part_data, prefix, part_index, len(parts))
await part_finish_fn(target_id, upload_id, part_index, length, part_md5, retry_timeout_ms)
completed_parts += 1
uploaded_bytes += length
if on_progress:
on_progress(ChunkedUploadProgress(
completed_parts=completed_parts,
total_parts=len(parts),
uploaded_bytes=uploaded_bytes,
total_bytes=file_size,
))
for i in range(0, len(parts), concurrency):
batch = parts[i:i + concurrency]
await asyncio.gather(*(upload_part(p) for p in batch))
result = await complete_fn(target_id, upload_id)
logger.info(f"{prefix} Upload completed: file_uuid={result.file_uuid}, ttl={result.ttl}s")
return result
```
Then your existing functions become thin wrappers that preserve the current API:
```python
async def chunked_upload_c2c(...):
return await _chunked_upload(
http_client=http_client,
target_id=user_id,
file_path=file_path,
file_type=file_type,
prepare_fn=http_client.c2c_upload_prepare,
part_finish_fn=http_client.c2c_upload_part_finish,
complete_fn=http_client.c2c_complete_upload,
on_progress=on_progress,
log_prefix=log_prefix,
)
async def chunked_upload_group(...):
return await _chunked_upload(
http_client=http_client,
target_id=group_id,
file_path=file_path,
file_type=file_type,
prepare_fn=http_client.group_upload_prepare,
part_finish_fn=http_client.group_upload_part_finish,
complete_fn=http_client.group_complete_upload,
on_progress=on_progress,
log_prefix=log_prefix,
)
```
This eliminates the copy‑paste logic (hashing, part scheduling, progress tracking) while keeping behavior and signatures intact.
### 2. Centralize retry behavior instead of duplicating loops
You already have multiple retry loops with very similar structure (`_part_finish_with_retry`, `_part_finish_persistent_retry`, `_complete_upload_with_retry`, `put_to_presigned_url`). A small, parameterized helper can collapse this:
```python
async def retry_async(
op: Callable[[], Awaitable[T]],
*,
max_retries: int,
base_delay_ms: int,
is_retryable: Optional[Callable[[Exception], bool]] = None,
) -> T:
last_error: Optional[Exception] = None
for attempt in range(max_retries + 1):
try:
return await op()
except Exception as err:
last_error = err
if is_retryable and not is_retryable(err):
raise
if attempt < max_retries:
delay = base_delay_ms * (2 ** attempt) / 1000
await asyncio.sleep(delay)
raise last_error or RuntimeError("retry_async: exhausted retries")
```
Then `_complete_upload_with_retry` becomes simpler and easier to reason about:
```python
async def _complete_upload_with_retry(self, method: str, path: str, body: dict) -> MediaUploadResponse:
async def op() -> dict:
return await self.api_request(method, path, body, timeout=120.0)
data = await retry_async(
op,
max_retries=COMPLETE_UPLOAD_MAX_RETRIES,
base_delay_ms=COMPLETE_UPLOAD_BASE_DELAY_MS,
)
return MediaUploadResponse(
file_uuid=data["file_uuid"],
file_info=data["file_info"],
ttl=data.get("ttl", 0),
)
```
And `_part_finish_with_retry` can delegate to either this helper or a specialized “persistent” helper, instead of duplicating the loop and redeclaring constants inside the method.
### 3. Avoid redefining retry constants inside methods
In `_part_finish_with_retry` and `_part_finish_persistent_retry` you redeclare constants that are already defined at the module level, which makes behavior harder to track and tune:
```python
async def _part_finish_with_retry(...):
# These shadow the module constants above:
PART_FINISH_MAX_RETRIES = 2
PART_FINISH_BASE_DELAY_MS = 1000
PART_FINISH_RETRYABLE_DEFAULT_TIMEOUT_MS = 2 * 60 * 1000
PART_FINISH_RETRYABLE_INTERVAL_MS = 1000
```
You can instead rely on the top-level constants so there’s a single source of truth:
```python
async def _part_finish_with_retry(...):
last_error: Optional[Exception] = None
for attempt in range(PART_FINISH_MAX_RETRIES + 1):
...
if attempt < PART_FINISH_MAX_RETRIES:
delay = PART_FINISH_BASE_DELAY_MS * (2 ** attempt) / 1000
await asyncio.sleep(delay)
```
Same for `_part_finish_persistent_retry` and `_complete_upload_with_retry`, using `PART_FINISH_RETRYABLE_INTERVAL_MS`, `COMPLETE_UPLOAD_MAX_RETRIES`, `COMPLETE_UPLOAD_BASE_DELAY_MS` from the module constants.
These changes keep all existing functionality (chunked upload, retries, detailed logging) but reduce cognitive load and make the retry and upload flows easier to maintain and test.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py
Outdated
Show resolved
Hide resolved
astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py
Outdated
Show resolved
Hide resolved
astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Code Review
This pull request implements a chunked upload system, file utilities, and a message reply rate limiter for the QQ Official platform. It also enhances media handling and text segmentation to improve reliability and compliance with platform limits. Feedback highlights the need to avoid blocking the event loop with synchronous file operations, remove redundant constant definitions, fix a logic error in text chunking that causes content duplication, and add missing retry logic for C2C messages as described in the pull request documentation. Additionally, improvements to logging and import practices were suggested to align with standard Python conventions.
astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py
Outdated
Show resolved
Hide resolved
- 修复chunk_text函数游标更新逻辑,避免死循环 - 完善流式C2C降级条件,覆盖所有富媒体类型 - 修复频道消息图片发送,支持URL图片 - 改进MessageReplyLimiter,使用logger并增加并发安全 - 清理未使用的上传缓存
|
已修复所有代码审查问题:
所有修改已提交到分支,请审查。 |
修复 qqofficial 图片上传超时问题
问题描述
在使用
qqofficial平台适配器发送图片时,经常出现超时错误:问题分析
通过对比隔壁QQ官方的项目
openclaw-qqbot,发现:修改内容
1. 始终使用分片上传(
qqofficial_message_event.py)待解决问题
虽然这样修改过后杜绝了绝大多数情况下的图片与文件传输失败 但WinError 121 信号灯超时问题仍然存在,可能原因:
botpy 库默认超时配置过短
以下是修复后的某些情况下任然出现的报错信息
如果社区有更好的解决方案,欢迎讨论。
建议讨论是否需要在 core 层统一添加 HTTP 请求重试机制
Summary by Sourcery
Improve QQ Official platform adapter reliability for media and text sending by adopting chunked upload, adding reply rate limiting, and enhancing message handling.
New Features:
Bug Fixes:
Enhancements: