Skip to content

fix: Clean shutdown for multithreaded unary Map#329

Open
BulkBeing wants to merge 6 commits intomainfrom
clean-shutdown-unarymap
Open

fix: Clean shutdown for multithreaded unary Map#329
BulkBeing wants to merge 6 commits intomainfrom
clean-shutdown-unarymap

Conversation

@BulkBeing
Copy link
Contributor

@BulkBeing BulkBeing commented Mar 6, 2026

Similar to #323

Also avoids invoking UDF after shutdown is initiated.

Normal shutdown (Pod deletion)

2026-03-06 03:59:46 INFO     Sync GRPC Server listening on: unix:///var/run/numaflow/map.sock with max threads: 4
INFO:pynumaflow._constants:Sync GRPC Server listening on: unix:///var/run/numaflow/map.sock with max threads: 4
INFO:pynumaflow._constants:GRPC Server listening on: unix:///var/run/numaflow/map.sock 32
2026-03-06 03:59:46 INFO     GRPC Server listening on: unix:///var/run/numaflow/map.sock 32
2026-03-06 04:00:10 WARNING  gRPC stream closed in reader thread, shutting down the server.
WARNING:pynumaflow._constants:gRPC stream closed in reader thread, shutting down the server.
2026-03-06 04:00:10 INFO     Shutdown signal received, stopping server gracefully...
INFO:pynumaflow._constants:Shutdown signal received, stopping server gracefully...

With uncaught UDF exception:

2026-03-06 12:35:07,067 - root - INFO - ----> Received request
2026-03-06 12:35:17,059 - root - INFO - <--- Returning response
2026-03-06 12:35:17,062 - root - INFO - ----> Received request
2026-03-06 12:35:17 CRITICAL MapFn handler error
Traceback (most recent call last):
  File "/app/.venv/lib/python3.13/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 147, in _invoke_map
    responses = self.__map_handler(list(request.request.keys), d)
  File "/app/main.py", line 33, in my_handler
    raise RuntimeError("Simulated UDF exception")
RuntimeError: Simulated UDF exception
2026-03-06 12:35:17,063 - pynumaflow._constants - CRITICAL - MapFn handler error
Traceback (most recent call last):
  File "/app/.venv/lib/python3.13/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 147, in _invoke_map
    responses = self.__map_handler(list(request.request.keys), d)
  File "/app/main.py", line 33, in my_handler
    raise RuntimeError("Simulated UDF exception")
RuntimeError: Simulated UDF exception
2026-03-06 12:35:17 CRITICAL Traceback (most recent call last):
  File "/app/.venv/lib/python3.13/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 147, in _invoke_map
    responses = self.__map_handler(list(request.request.keys), d)
  File "/app/main.py", line 33, in my_handler
    raise RuntimeError("Simulated UDF exception")
RuntimeError: Simulated UDF exception
2026-03-06 12:35:17,066 - pynumaflow._constants - CRITICAL - Traceback (most recent call last):
  File "/app/.venv/lib/python3.13/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 147, in _invoke_map
    responses = self.__map_handler(list(request.request.keys), d)
  File "/app/main.py", line 33, in my_handler
    raise RuntimeError("Simulated UDF exception")
RuntimeError: Simulated UDF exception
2026-03-06 12:35:17 CRITICAL UDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')
2026-03-06 12:35:17,066 - pynumaflow._constants - CRITICAL - UDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')
2026-03-06 12:35:17,067 - pynumaflow._constants - INFO - Shutdown signal received, stopping server gracefully...
2026-03-06 12:35:17,067 - root - INFO - <--- Returning response
2026-03-06 12:35:17 INFO     Shutdown signal received, stopping server gracefully...
2026-03-06 12:35:17,073 - root - INFO - <--- Returning response
2026-03-06 12:35:17 CRITICAL Server exiting due to UDF error: Simulated UDF exception
2026-03-06 12:35:17,081 - pynumaflow._constants - CRITICAL - Server exiting due to UDF error: Simulated UDF exception

Numa:

{"timestamp":"2026-03-06T12:35:17.078708Z","level":"WARN","message":"Map component is shutting down because of an error, not accepting the message","offset":"Int(IntOffset { offset: 1032, partition_idx: 0 })","error":"Err(Grpc(Status { code: Internal, message: \"UDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')\", details: b\"\\x08\\r\\x12AUDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')\\x1a\\x93\\x03\\n(type.googleapis.com/google.rpc.DebugInfo\\x12\\xe6\\x02\\x12\\xe3\\x02Traceback (most recent call last):\\n  File \\\"/app/.venv/lib/python3.13/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py\\\", line 147, in _invoke_map\\n    responses = self.__map_handler(list(request.request.keys), d)\\n  File \\\"/app/main.py\\\", line 33, in my_handler\\n    raise RuntimeError(\\\"Simulated UDF exception\\\")\\nRuntimeError: Simulated UDF exception\", source: None }))","target":"numaflow_core::mapper::map"}
{"timestamp":"2026-03-06T12:35:17.078762Z","level":"WARN","message":"Map component is shutting down because of an error, not accepting the message","offset":"Int(IntOffset { offset: 1033, partition_idx: 0 })","error":"Err(Grpc(Status { code: Internal, message: \"UDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')\", details: b\"\\x08\\r\\x12AUDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')\\x1a\\x93\\x03\\n(type.googleapis.com/google.rpc.DebugInfo\\x12\\xe6\\x02\\x12\\xe3\\x02Traceback (most recent call last):\\n  File \\\"/app/.venv/lib/python3.13/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py\\\", line 147, in _invoke_map\\n    responses = self.__map_handler(list(request.request.keys), d)\\n  File \\\"/app/main.py\\\", line 33, in my_handler\\n    raise RuntimeError(\\\"Simulated UDF exception\\\")\\nRuntimeError: Simulated UDF exception\", source: None }))","target":"numaflow_core::mapper::map"}
{"timestamp":"2026-03-06T12:35:17.078795Z","level":"INFO","message":"Nak received for offset","params.offset":"Int(IntOffset { offset: 1033, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"}
{"timestamp":"2026-03-06T12:35:17.078810Z","level":"INFO","message":"Nak sent for offset","offset":"Int(IntOffset { offset: 1033, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"}
{"timestamp":"2026-03-06T12:35:17.078846Z","level":"INFO","message":"Nak received for offset","params.offset":"Int(IntOffset { offset: 1032, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"}
{"timestamp":"2026-03-06T12:35:17.078896Z","level":"INFO","message":"Nak sent for offset","offset":"Int(IntOffset { offset: 1032, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"}
{"timestamp":"2026-03-06T12:35:17.078912Z","level":"INFO","message":"ISBReaderOrchestrator cleanup on shutdown completed.","target":"numaflow_core::pipeline::isb::reader"}
{"timestamp":"2026-03-06T12:35:17.079467Z","level":"INFO","message":"Map input stream ended, waiting for inflight messages to finish","target":"numaflow_core::mapper::map"}
{"timestamp":"2026-03-06T12:35:17.079530Z","level":"INFO","message":"Map component is completed","status":"Err(Grpc(Status { code: Internal, message: \"UDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')\", details: b\"\\x08\\r\\x12AUDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')\\x1a\\x93\\x03\\n(type.googleapis.com/google.rpc.DebugInfo\\x12\\xe6\\x02\\x12\\xe3\\x02Traceback (most recent call last):\\n  File \\\"/app/.venv/lib/python3.13/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py\\\", line 147, in _invoke_map\\n    responses = self.__map_handler(list(request.request.keys), d)\\n  File \\\"/app/main.py\\\", line 33, in my_handler\\n    raise RuntimeError(\\\"Simulated UDF exception\\\")\\nRuntimeError: Simulated UDF exception\", source: None }))","target":"numaflow_core::mapper::map"}
{"timestamp":"2026-03-06T12:35:17.079613Z","level":"ERROR","message":"Error while mapping messages","e":"Grpc(Status { code: Internal, message: \"UDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')\", details: b\"\\x08\\r\\x12AUDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')\\x1a\\x93\\x03\\n(type.googleapis.com/google.rpc.DebugInfo\\x12\\xe6\\x02\\x12\\xe3\\x02Traceback (most recent call last):\\n  File \\\"/app/.venv/lib/python3.13/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py\\\", line 147, in _invoke_map\\n    responses = self.__map_handler(list(request.request.keys), d)\\n  File \\\"/app/main.py\\\", line 33, in my_handler\\n    raise RuntimeError(\\\"Simulated UDF exception\\\")\\nRuntimeError: Simulated UDF exception\", source: None })","target":"numaflow_core::pipeline::forwarder::map_forwarder"}
{"timestamp":"2026-03-06T12:35:17.079786Z","level":"INFO","message":"Forwarder task completed","result":"Err(Grpc(Status { code: Internal, message: \"UDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')\", details: b\"\\x08\\r\\x12AUDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')\\x1a\\x93\\x03\\n(type.googleapis.com/google.rpc.DebugInfo\\x12\\xe6\\x02\\x12\\xe3\\x02Traceback (most recent call last):\\n  File \\\"/app/.venv/lib/python3.13/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py\\\", line 147, in _invoke_map\\n    responses = self.__map_handler(list(request.request.keys), d)\\n  File \\\"/app/main.py\\\", line 33, in my_handler\\n    raise RuntimeError(\\\"Simulated UDF exception\\\")\\nRuntimeError: Simulated UDF exception\", source: None }))","target":"numaflow_core::pipeline::forwarder::map_forwarder"}
{"timestamp":"2026-03-06T12:35:17.079994Z","level":"INFO","message":"Stopped the Lag-Reader Expose tasks","target":"numaflow_core::metrics"}
{"timestamp":"2026-03-06T12:35:17.080895Z","level":"ERROR","message":"Pipeline failed because of UDF failure","error":"Status { code: Internal, message: \"UDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')\", details: b\"\\x08\\r\\x12AUDF_EXECUTION_ERROR(udf): RuntimeError('Simulated UDF exception')\\x1a\\x93\\x03\\n(type.googleapis.com/google.rpc.DebugInfo\\x12\\xe6\\x02\\x12\\xe3\\x02Traceback (most recent call last):\\n  File \\\"/app/.venv/lib/python3.13/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py\\\", line 147, in _invoke_map\\n    responses = self.__map_handler(list(request.request.keys), d)\\n  File \\\"/app/main.py\\\", line 33, in my_handler\\n    raise RuntimeError(\\\"Simulated UDF exception\\\")\\nRuntimeError: Simulated UDF exception\", source: None }","target":"numaflow_core"}
{"timestamp":"2026-03-06T12:35:17.081332Z","level":"INFO","message":"Gracefully Exiting...","target":"numaflow_core"}
{"timestamp":"2026-03-06T12:35:17.086559Z","level":"INFO","message":"Exited.","target":"numaflow"}

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@codecov
Copy link

codecov bot commented Mar 6, 2026

Codecov Report

❌ Patch coverage is 72.97297% with 10 lines in your changes missing coverage. Please review.
✅ Project coverage is 93.97%. Comparing base (a66cdda) to head (072be7e).

Files with missing lines Patch % Lines
...flow/pynumaflow/mapper/_servicer/_sync_servicer.py 78.78% 4 Missing and 3 partials ⚠️
...ckages/pynumaflow/pynumaflow/mapper/sync_server.py 25.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #329      +/-   ##
==========================================
- Coverage   94.17%   93.97%   -0.21%     
==========================================
  Files          66       66              
  Lines        3092     3122      +30     
  Branches      162      166       +4     
==========================================
+ Hits         2912     2934      +22     
- Misses        149      155       +6     
- Partials       31       33       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@BulkBeing BulkBeing marked this pull request as ready for review March 6, 2026 13:01
@BulkBeing BulkBeing requested a review from yhl25 March 6, 2026 13:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant