Skip to content

fix: 修复qqofficial上传图片超时问题 - 改用分片上传#7176

Open
bunwen wants to merge 2 commits intoAstrBotDevs:masterfrom
bunwen:fix/qqofficial-image-upload
Open

fix: 修复qqofficial上传图片超时问题 - 改用分片上传#7176
bunwen wants to merge 2 commits intoAstrBotDevs:masterfrom
bunwen:fix/qqofficial-image-upload

Conversation

@bunwen
Copy link
Copy Markdown

@bunwen bunwen commented Mar 30, 2026

修复 qqofficial 图片上传超时问题

问题描述

在使用 qqofficial 平台适配器发送图片时,经常出现超时错误:

OSError: [WinError 121] 信号灯超时时间已到 aiohttp.client_exceptions.ClientOSError: [WinError 121] 信号灯超时时间已到

问题分析

通过对比隔壁QQ官方的项目 openclaw-qqbot,发现:

  1. openclaw-qqbot 从不使用 base64 上传 - 所有图片都通过分片上传
  2. 当前 qqofficial 使用 base64 上传小文件(<1MB),分片上传大文件(>1MB)
  3. base64 上传端点可能存在网络/防火墙问题

修改内容

1. 始终使用分片上传(qqofficial_message_event.py

# 修改前:小于1MB用base64,大于1MB用分片
if file_size > self.CHUNKED_UPLOAD_THRESHOLD:
    return await self._chunked_upload(...)
else:
    return await self._base64_upload(...)

# 修改后:所有文件都使用分片上传
return await self._chunked_upload(...)
  1. 添加 post_c2c_message 重试机制(qqofficial_message_event.py)为 botpy 的 HTTP 请求添加重试逻辑,捕获网络超时错误并自动重试:
max_retries = 3
for attempt in range(max_retries):
    try:
        result = await bot.api._http.request(route, json=payload)
        break
    except (aiohttp.ClientOSError, asyncio.TimeoutError, OSError) as e:
        if attempt < max_retries - 1:
            await asyncio.sleep((attempt + 1) * 2)  # 2s, 4s, 6s
  1. 其他改进
  • 添加 init.py 文件
  • 修复 QQOfficialMessageEvent.new 属性初始化
  • 修复 expires_in 类型转换(string → int)
  • 优化 HTTP 客户端超时配置

待解决问题
虽然这样修改过后杜绝了绝大多数情况下的图片与文件传输失败 但WinError 121 信号灯超时问题仍然存在,可能原因:
botpy 库默认超时配置过短

以下是修复后的某些情况下任然出现的报错信息

[2026-03-29 20:25:13.158] [Core] [ERRO] [v4.22.2] [meme_manager.main:1046]: 发送表情图片失败: [WinError 121] 信号灯超时时间已到
[2026-03-29 20:25:13.163] [Core] [ERRO] [v4.22.2] [meme_manager.main:1047]: Traceback (most recent call last):
  File "C:\Users\bunwen\AppData\Local\AstrBot\backend\python\Lib\asyncio\proactor_events.py", line 286, in _loop_reading
    length = fut.result()
             ^^^^^^^^^^^^
  File "C:\Users\bunwen\AppData\Local\AstrBot\backend\python\Lib\asyncio\windows_events.py", line 803, in _poll
    value = callback(transferred, key, ov)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\bunwen\AppData\Local\AstrBot\backend\python\Lib\asyncio\windows_events.py", line 462, in finish_socket_func
    return ov.getresult()
           ^^^^^^^^^^^^^^
OSError: [WinError 121] 信号灯超时时间已到

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\bunwen\.astrbot\data\plugins\meme_manager\main.py", line 1042, in after_message_sent
    await self.context.send_message(
  File "\\?\C:\Users\bunwen\AppData\Local\AstrBot\backend\app\astrbot\core\star\context.py", line 456, in send_message
    await platform.send_by_session(session, message_chain)
  File "\\?\C:\Users\bunwen\AppData\Local\AstrBot\backend\app\astrbot\core\platform\sources\qqofficial\qqofficial_platform_adapter.py", line 152, in send_by_session
    await self._send_by_session_common(session, message_chain)
  File "\\?\C:\Users\bunwen\AppData\Local\AstrBot\backend\app\astrbot\core\platform\sources\qqofficial\qqofficial_platform_adapter.py", line 294, in _send_by_session_common
    ret = await QQOfficialMessageEvent.post_c2c_message(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "\\?\C:\Users\bunwen\AppData\Local\AstrBot\backend\app\astrbot\core\platform\sources\qqofficial\qqofficial_message_event.py", line 1147, in post_c2c_message
    result = await bot.api._http.request(route, json=payload)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\bunwen\AppData\Local\AstrBot\backend\python\Lib\site-packages\botpy\http.py", line 182, in request
    async with self._session.request(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\bunwen\AppData\Local\AstrBot\backend\python\Lib\site-packages\aiohttp\client.py", line 1510, in __aenter__
    self._resp: _RetType = await self._coro
                           ^^^^^^^^^^^^^^^^
  File "C:\Users\bunwen\AppData\Local\AstrBot\backend\python\Lib\site-packages\aiohttp\client.py", line 779, in _request
    resp = await handler(req)
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\bunwen\AppData\Local\AstrBot\backend\python\Lib\site-packages\aiohttp\client.py", line 757, in _connect_and_send_request
    await resp.start(conn)
  File "C:\Users\bunwen\AppData\Local\AstrBot\backend\python\Lib\site-packages\aiohttp\client_reqrep.py", line 539, in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
                       ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\bunwen\AppData\Local\AstrBot\backend\python\Lib\site-packages\aiohttp\streams.py", line 703, in read
    await self._waiter
aiohttp.client_exceptions.ClientOSError: [WinError 121] 信号灯超时时间已到

如果社区有更好的解决方案,欢迎讨论。
建议讨论是否需要在 core 层统一添加 HTTP 请求重试机制

Summary by Sourcery

Improve QQ Official platform adapter reliability for media and text sending by adopting chunked upload, adding reply rate limiting, and enhancing message handling.

New Features:

  • Add chunked upload support for QQ Official C2C and group media with robust retry logic and shared HTTP client management.
  • Introduce a per-message reply rate limiter to cap passive replies and automatically fall back to proactive messages when limits or TTL are exceeded.
  • Implement long text chunking for QQ Official messages to split oversized content into multiple sends within platform limits.

Bug Fixes:

  • Resolve QQ Official image and media upload timeouts by routing all uploads through chunked transfer with higher timeouts and better error handling.
  • Fix QQOfficialMessageEvent initialization and C2C sending so credentials, message IDs, and media payloads are handled correctly across message types.

Enhancements:

  • Unify QQ Official message parsing and sending to work with a single image source abstraction (path, URL, or base64) and preserve accompanying text when media is present or fails to upload.
  • Improve streaming (partial) message sending for C2C by refining state handling, throttling, and fallback behavior when switching between streaming and non-streaming paths.
  • Add file utility helpers for size limits, type detection, and formatting to enforce QQ upload constraints and provide clearer logging.
  • Ensure temporary media files are tracked and cleaned up after QQ Official sends to avoid disk buildup.

@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. labels Mar 30, 2026
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 5 issues, and left some high level feedback:

  • The new chunk_text implementation has fairly complex overlap logic (start = breakpoint - overlap if breakpoint > overlap else start with an extra if start <= chunks[-1].find(...) check) that looks error‑prone and may cause incorrect or even infinite loops; consider simplifying this function and adding explicit bounds checks so start always strictly increases.
  • MessageReplyLimiter currently uses print for logging and a module‑level _global_limiter without any locking; it would be more consistent and safer to use the shared logger and avoid bare globals (e.g. by encapsulating the singleton or making it explicitly per‑process).
  • Some of the new upload helpers (e.g. _base64_upload, the upload cache in chunked_upload.py) appear unused or only partially integrated after switching to always chunked uploads; consider trimming or clearly marking unused paths to keep the upload logic easier to reason about.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The new `chunk_text` implementation has fairly complex overlap logic (`start = breakpoint - overlap if breakpoint > overlap else start` with an extra `if start <= chunks[-1].find(...)` check) that looks error‑prone and may cause incorrect or even infinite loops; consider simplifying this function and adding explicit bounds checks so `start` always strictly increases.
- `MessageReplyLimiter` currently uses `print` for logging and a module‑level `_global_limiter` without any locking; it would be more consistent and safer to use the shared `logger` and avoid bare globals (e.g. by encapsulating the singleton or making it explicitly per‑process).
- Some of the new upload helpers (e.g. `_base64_upload`, the upload cache in `chunked_upload.py`) appear unused or only partially integrated after switching to always chunked uploads; consider trimming or clearly marking unused paths to keep the upload logic easier to reason about.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="75-84" />
<code_context>
+def chunk_text(text: str, limit: int = TEXT_CHUNK_LIMIT, overlap: int = TEXT_CHUNK_OVERLAP) -> List[str]:
</code_context>
<issue_to_address>
**issue (bug_risk):** chunk_text 的游标更新逻辑存在潜在死循环/重复块风险

当前 while 末尾这段逻辑:

```python
start = breakpoint - overlap if breakpoint > overlap else start
if start <= chunks[-1].find(text[breakpoint - 1] if breakpoint > 0 else ''):
    start = breakpoint
````chunks[-1].find(...)` 返回 -1 等情况下,`start` 可能保持不变,导致下一轮循环仍从同一位置开始,出现重复块甚至死循环;同时用 `find` 决定是否前进游标也较难推理。

建议改为始终按一个确定规则推进游标,例如:

```python
start = max(breakpoint - overlap, start + 1)
```

或如果不需要重叠则直接 `start = breakpoint`。请重新推演边界条件,确保每次循环 `start` 都单调前进,避免死循环/重复块。
</issue_to_address>

### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py" line_range="370-371" />
<code_context>
-        # C2C 流式仅用于文本分片,富媒体时降级为普通发送,避免平台侧流式校验报错。
-        if stream and (image_base64 or record_file_path):
+        # C2C 流式仅用于文本分片,富媒体时降级为普通发送
+        if stream and image_source:
             logger.debug("[QQOfficial] 检测到富媒体,降级为非流式发送。")
             stream = None

</code_context>
<issue_to_address>
**issue (bug_risk):** 流式 C2C 降级条件只考虑图片,未覆盖语音/视频等富媒体

原先是 `if stream and (image_base64 or record_file_path):`,现在改成 `if stream and image_source:`,只考虑图片。这样在有语音、视频或文件附件时仍会走 C2C 流式路径,与“富媒体时降级为普通发送”的注释不一致。如果 QQ 仍不支持富媒体+流式组合,可能导致平台报错或未定义行为。

建议恢复为覆盖所有富媒体类型,例如:

```python
if stream and (image_source or record_file_path or video_file_source or file_source):
    logger.debug("[QQOfficial] 检测到富媒体,降级为非流式发送。")
    stream = None
```

以保持与注释以及原有行为一致。
</issue_to_address>

### Comment 3
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="240-241" />
<code_context>
-                if image_path:
-                    payload["file_image"] = image_path
-                # Guild text-channel send API (/channels/{channel_id}/messages) does not use v2 msg_type.
+                if image_source and os.path.exists(image_source):
+                    payload["file_image"] = image_source
                 payload.pop("msg_type", None)
                 ret = await self._send_with_markdown_fallback(
</code_context>
<issue_to_address>
**issue (bug_risk):** 频道消息仅在本地存在文件时发送图片,URL 图片会被静默忽略

`image_source``_parse_to_qqofficial` 中对远程图片是 URL 字符串而非本地路径,因此这里用 `os.path.exists(image_source)` 时,URL 场景会恒为 False,导致频道消息不再发送图片,相比之前始终使用 `image_path` 有所退化。

如果频道消息需要继续支持 URL 图片,建议分支处理本地路径与 URL,例如:

```python
if image_source:
    if os.path.exists(image_source):
        payload["file_image"] = image_source
    elif image_source.startswith("http"):
        payload["image_url"] = image_source  # 视 QQ SDK 支持方式而定
```

或者在上游保证频道场景始终传入本地路径而不是 URL。
</issue_to_address>

### Comment 4
<location path="astrbot/core/platform/sources/qqofficial/rate_limiter.py" line_range="138" />
<code_context>
+        
+        record = self._tracker.get(message_id)
+        if record:
+            print(f"[QQOfficial] recordReply: {message_id}, count={record.count}")
+    
+    def get_stats(self) -> Dict[str, int]:
</code_context>
<issue_to_address>
**nitpick:** 限流器中使用 print 而非 logger 可能干扰日志体系

在 `MessageReplyLimiter.record_reply` 中的直接 `print````python
print(f"[QQOfficial] recordReply: {message_id}, count={record.count}")
```
会绕过统一的日志配置,并在高并发时污染标准输出。建议改为使用项目统一的日志记录器(如 `logger.debug(...)`),或在不再需要时删除该调试输出。
</issue_to_address>

### Comment 5
<location path="astrbot/core/platform/sources/qqofficial/chunked_upload.py" line_range="723" />
<code_context>
+    raise last_error or RuntimeError("Upload failed")
+
+
+async def chunked_upload_c2c(
+    http_client: QQBotHttpClient,
+    user_id: str,
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the chunked upload and retry logic into shared helpers to remove duplicated code and make the flow easier to understand and maintain.

You can keep the current functionality but significantly reduce complexity and duplication with a couple of small refactors.

### 1. Collapse `chunked_upload_c2c` / `chunked_upload_group` into a shared core

Both functions are almost identical aside from which HTTP methods they call and how they log. You can hide that behind a small strategy object or callbacks and keep the public API unchanged:

```python
async def _chunked_upload(
    *,
    http_client: QQBotHttpClient,
    target_id: str,
    file_path: str,
    file_type: int,
    prepare_fn: Callable[..., Awaitable[UploadPrepareResponse]],
    part_finish_fn: Callable[..., Awaitable[None]],
    complete_fn: Callable[..., Awaitable[MediaUploadResponse]],
    on_progress: Optional[Callable[[ChunkedUploadProgress], None]] = None,
    log_prefix: str = "[chunked]",
) -> MediaUploadResponse:
    prefix = log_prefix
    file_size = os.path.getsize(file_path)
    file_name = os.path.basename(file_path)

    logger.info(f"{prefix} Starting chunked upload: file={file_name}, size={file_size}, type={file_type}")
    hashes = await compute_file_hashes(file_path, file_size)

    try:
        prepare_resp = await prepare_fn(target_id, file_type, file_name, file_size, hashes)
    except ApiError as e:
        if e.biz_code == UPLOAD_PREPARE_FALLBACK_CODE:
            raise UploadDailyLimitExceededError(file_path, file_size, str(e))
        raise

    upload_id = prepare_resp.upload_id
    block_size = prepare_resp.block_size
    parts = prepare_resp.parts
    concurrency = min(prepare_resp.concurrency or DEFAULT_CONCURRENT_PARTS, MAX_CONCURRENT_PARTS)
    retry_timeout_ms = prepare_resp.retry_timeout * 1000 if prepare_resp.retry_timeout else None

    completed_parts = 0
    uploaded_bytes = 0

    async def upload_part(part: UploadPart) -> None:
        nonlocal completed_parts, uploaded_bytes
        part_index = part.index
        offset = (part_index - 1) * block_size
        length = min(block_size, file_size - offset)

        part_data = read_file_chunk(file_path, offset, length)
        part_md5 = hashlib.md5(part_data).hexdigest()

        await put_to_presigned_url(part.presigned_url, part_data, prefix, part_index, len(parts))
        await part_finish_fn(target_id, upload_id, part_index, length, part_md5, retry_timeout_ms)

        completed_parts += 1
        uploaded_bytes += length
        if on_progress:
            on_progress(ChunkedUploadProgress(
                completed_parts=completed_parts,
                total_parts=len(parts),
                uploaded_bytes=uploaded_bytes,
                total_bytes=file_size,
            ))

    for i in range(0, len(parts), concurrency):
        batch = parts[i:i + concurrency]
        await asyncio.gather(*(upload_part(p) for p in batch))

    result = await complete_fn(target_id, upload_id)
    logger.info(f"{prefix} Upload completed: file_uuid={result.file_uuid}, ttl={result.ttl}s")
    return result
```

Then your existing functions become thin wrappers that preserve the current API:

```python
async def chunked_upload_c2c(...):
    return await _chunked_upload(
        http_client=http_client,
        target_id=user_id,
        file_path=file_path,
        file_type=file_type,
        prepare_fn=http_client.c2c_upload_prepare,
        part_finish_fn=http_client.c2c_upload_part_finish,
        complete_fn=http_client.c2c_complete_upload,
        on_progress=on_progress,
        log_prefix=log_prefix,
    )

async def chunked_upload_group(...):
    return await _chunked_upload(
        http_client=http_client,
        target_id=group_id,
        file_path=file_path,
        file_type=file_type,
        prepare_fn=http_client.group_upload_prepare,
        part_finish_fn=http_client.group_upload_part_finish,
        complete_fn=http_client.group_complete_upload,
        on_progress=on_progress,
        log_prefix=log_prefix,
    )
```

This eliminates the copy‑paste logic (hashing, part scheduling, progress tracking) while keeping behavior and signatures intact.

### 2. Centralize retry behavior instead of duplicating loops

You already have multiple retry loops with very similar structure (`_part_finish_with_retry`, `_part_finish_persistent_retry`, `_complete_upload_with_retry`, `put_to_presigned_url`). A small, parameterized helper can collapse this:

```python
async def retry_async(
    op: Callable[[], Awaitable[T]],
    *,
    max_retries: int,
    base_delay_ms: int,
    is_retryable: Optional[Callable[[Exception], bool]] = None,
) -> T:
    last_error: Optional[Exception] = None
    for attempt in range(max_retries + 1):
        try:
            return await op()
        except Exception as err:
            last_error = err
            if is_retryable and not is_retryable(err):
                raise
            if attempt < max_retries:
                delay = base_delay_ms * (2 ** attempt) / 1000
                await asyncio.sleep(delay)
    raise last_error or RuntimeError("retry_async: exhausted retries")
```

Then `_complete_upload_with_retry` becomes simpler and easier to reason about:

```python
async def _complete_upload_with_retry(self, method: str, path: str, body: dict) -> MediaUploadResponse:
    async def op() -> dict:
        return await self.api_request(method, path, body, timeout=120.0)

    data = await retry_async(
        op,
        max_retries=COMPLETE_UPLOAD_MAX_RETRIES,
        base_delay_ms=COMPLETE_UPLOAD_BASE_DELAY_MS,
    )
    return MediaUploadResponse(
        file_uuid=data["file_uuid"],
        file_info=data["file_info"],
        ttl=data.get("ttl", 0),
    )
```

And `_part_finish_with_retry` can delegate to either this helper or a specialized “persistent” helper, instead of duplicating the loop and redeclaring constants inside the method.

### 3. Avoid redefining retry constants inside methods

In `_part_finish_with_retry` and `_part_finish_persistent_retry` you redeclare constants that are already defined at the module level, which makes behavior harder to track and tune:

```python
async def _part_finish_with_retry(...):
    # These shadow the module constants above:
    PART_FINISH_MAX_RETRIES = 2
    PART_FINISH_BASE_DELAY_MS = 1000
    PART_FINISH_RETRYABLE_DEFAULT_TIMEOUT_MS = 2 * 60 * 1000
    PART_FINISH_RETRYABLE_INTERVAL_MS = 1000
```

You can instead rely on the top-level constants so there’s a single source of truth:

```python
async def _part_finish_with_retry(...):
    last_error: Optional[Exception] = None
    for attempt in range(PART_FINISH_MAX_RETRIES + 1):
        ...
        if attempt < PART_FINISH_MAX_RETRIES:
            delay = PART_FINISH_BASE_DELAY_MS * (2 ** attempt) / 1000
            await asyncio.sleep(delay)
```

Same for `_part_finish_persistent_retry` and `_complete_upload_with_retry`, using `PART_FINISH_RETRYABLE_INTERVAL_MS`, `COMPLETE_UPLOAD_MAX_RETRIES`, `COMPLETE_UPLOAD_BASE_DELAY_MS` from the module constants.

These changes keep all existing functionality (chunked upload, retries, detailed logging) but reduce cognitive load and make the retry and upload flows easier to maintain and test.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a chunked upload system, file utilities, and a message reply rate limiter for the QQ Official platform. It also enhances media handling and text segmentation to improve reliability and compliance with platform limits. Feedback highlights the need to avoid blocking the event loop with synchronous file operations, remove redundant constant definitions, fix a logic error in text chunking that causes content duplication, and add missing retry logic for C2C messages as described in the pull request documentation. Additionally, improvements to logging and import practices were suggested to align with standard Python conventions.

- 修复chunk_text函数游标更新逻辑,避免死循环
- 完善流式C2C降级条件,覆盖所有富媒体类型
- 修复频道消息图片发送,支持URL图片
- 改进MessageReplyLimiter,使用logger并增加并发安全
- 清理未使用的上传缓存
@bunwen
Copy link
Copy Markdown
Author

bunwen commented Mar 30, 2026

已修复所有代码审查问题:

  1. 修复了 chunk_text 函数的游标更新逻辑,避免死循环
  2. 完善了流式 C2C 降级条件,覆盖所有富媒体类型
  3. 修复了频道消息图片发送,支持 URL 图片
  4. 改进了 MessageReplyLimiter,使用 logger 并增加并发安全
  5. 清理了未使用的上传缓存

所有修改已提交到分支,请审查。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant