-
Notifications
You must be signed in to change notification settings - Fork 158
Description
Bug Report
Description
RetryPolicy.from_proto() assigns proto.non_retryable_error_types directly as a protobuf RepeatedScalarContainer instead of converting it to a Python list. This makes activity.info().retry_policy non-picklable, which crashes any sync activity running in a ProcessPoolExecutor when the workflow specifies non_retryable_error_types in the retry policy.
Root Cause
In temporalio/common.py, RetryPolicy.from_proto() does:
non_retryable_error_types=proto.non_retryable_error_types or None,proto.non_retryable_error_types is a google._upb._message.RepeatedScalarContainer, which is not picklable. It should be:
non_retryable_error_types=list(proto.non_retryable_error_types) or None,Introduced
v1.18.0 (PR #1055 "Added retry policy to activity info"), which populates activity.info().retry_policy via RetryPolicy.from_proto(). The from_proto() method itself has had this issue since it was written, but it only became user-facing when activity.info() started including the retry policy — because activity.info() is pickled when sent to subprocess workers.
Version Matrix
| SDK Version | Result |
|---|---|
| 1.8.0 | PASS (activity.Info has no retry_policy field) |
| 1.17.0 | PASS (same) |
| 1.18.0 | FAIL — _pickle.PickleError: can't pickle repeated message fields, convert to list first |
| 1.23.0 | FAIL — same error |
Minimal Reproduction
Requires only pip install temporalio. No external Temporal server needed (uses built-in test server).
"""Repro: sync activity in ProcessPoolExecutor crashes when
retry_policy has non_retryable_error_types."""
import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import SharedStateManager, Worker
@activity.defn
def my_sync_activity() -> str:
return "ok"
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> str:
return await workflow.execute_activity(
my_sync_activity,
start_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(
maximum_attempts=1,
non_retryable_error_types=["SomeError"],
),
)
async def main():
async with await WorkflowEnvironment.start_local() as env:
with ProcessPoolExecutor(
max_workers=1,
mp_context=multiprocessing.get_context("spawn"),
) as executor:
mgr = SharedStateManager.create_from_multiprocessing(
multiprocessing.Manager())
async with Worker(
env.client,
task_queue="repro-queue",
workflows=[MyWorkflow],
activities=[my_sync_activity],
activity_executor=executor,
shared_state_manager=mgr,
):
result = await env.client.execute_workflow(
MyWorkflow.run,
id="repro-pickle-bug",
task_queue="repro-queue",
task_timeout=timedelta(seconds=15),
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())Error
_pickle.PickleError: can't pickle repeated message fields, convert to list first
Full traceback:
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File ".../multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File ".../multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PickleError: can't pickle repeated message fields, convert to list first
"""
Suggested Fix
In RetryPolicy.from_proto(), wrap with list():
non_retryable_error_types=list(proto.non_retryable_error_types) or None,Workaround
Monkey-patch RetryPolicy.from_proto at import time:
from temporalio.common import RetryPolicy
_original_from_proto = RetryPolicy.from_proto
@staticmethod
def _patched_from_proto(proto) -> RetryPolicy:
rp = _original_from_proto(proto)
if rp.non_retryable_error_types is not None:
rp.non_retryable_error_types = list(rp.non_retryable_error_types)
return rp
RetryPolicy.from_proto = _patched_from_proto