Skip to content

Sync activities in ProcessPoolExecutor crash with PickleError when retry_policy has non_retryable_error_types #1350

@lambyqq

Description

@lambyqq

Bug Report

Description

RetryPolicy.from_proto() assigns proto.non_retryable_error_types directly as a protobuf RepeatedScalarContainer instead of converting it to a Python list. This makes activity.info().retry_policy non-picklable, which crashes any sync activity running in a ProcessPoolExecutor when the workflow specifies non_retryable_error_types in the retry policy.

Root Cause

In temporalio/common.py, RetryPolicy.from_proto() does:

non_retryable_error_types=proto.non_retryable_error_types or None,

proto.non_retryable_error_types is a google._upb._message.RepeatedScalarContainer, which is not picklable. It should be:

non_retryable_error_types=list(proto.non_retryable_error_types) or None,

Introduced

v1.18.0 (PR #1055 "Added retry policy to activity info"), which populates activity.info().retry_policy via RetryPolicy.from_proto(). The from_proto() method itself has had this issue since it was written, but it only became user-facing when activity.info() started including the retry policy — because activity.info() is pickled when sent to subprocess workers.

Version Matrix

SDK Version Result
1.8.0 PASS (activity.Info has no retry_policy field)
1.17.0 PASS (same)
1.18.0 FAIL_pickle.PickleError: can't pickle repeated message fields, convert to list first
1.23.0 FAIL — same error

Minimal Reproduction

Requires only pip install temporalio. No external Temporal server needed (uses built-in test server).

"""Repro: sync activity in ProcessPoolExecutor crashes when
retry_policy has non_retryable_error_types."""
import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from datetime import timedelta

from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import SharedStateManager, Worker


@activity.defn
def my_sync_activity() -> str:
    return "ok"


@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self) -> str:
        return await workflow.execute_activity(
            my_sync_activity,
            start_to_close_timeout=timedelta(seconds=10),
            retry_policy=RetryPolicy(
                maximum_attempts=1,
                non_retryable_error_types=["SomeError"],
            ),
        )


async def main():
    async with await WorkflowEnvironment.start_local() as env:
        with ProcessPoolExecutor(
            max_workers=1,
            mp_context=multiprocessing.get_context("spawn"),
        ) as executor:
            mgr = SharedStateManager.create_from_multiprocessing(
                multiprocessing.Manager())
            async with Worker(
                env.client,
                task_queue="repro-queue",
                workflows=[MyWorkflow],
                activities=[my_sync_activity],
                activity_executor=executor,
                shared_state_manager=mgr,
            ):
                result = await env.client.execute_workflow(
                    MyWorkflow.run,
                    id="repro-pickle-bug",
                    task_queue="repro-queue",
                    task_timeout=timedelta(seconds=15),
                )
                print(f"Result: {result}")


if __name__ == "__main__":
    asyncio.run(main())

Error

_pickle.PickleError: can't pickle repeated message fields, convert to list first

Full traceback:

concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File ".../multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File ".../multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PickleError: can't pickle repeated message fields, convert to list first
"""

Suggested Fix

In RetryPolicy.from_proto(), wrap with list():

non_retryable_error_types=list(proto.non_retryable_error_types) or None,

Workaround

Monkey-patch RetryPolicy.from_proto at import time:

from temporalio.common import RetryPolicy

_original_from_proto = RetryPolicy.from_proto

@staticmethod
def _patched_from_proto(proto) -> RetryPolicy:
    rp = _original_from_proto(proto)
    if rp.non_retryable_error_types is not None:
        rp.non_retryable_error_types = list(rp.non_retryable_error_types)
    return rp

RetryPolicy.from_proto = _patched_from_proto

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions