From fff07cb9bc8314abae04ac96ae402b95b0a3d112 Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Sun, 31 Aug 2025 16:16:33 -0400 Subject: [PATCH 01/14] CI: Run CI on all PRs, even if the base (target) branch is not `master` or `release-*` (#144) (cherry picked from commit 9390857d62393e455232bb36d84e70be422e3a7a) --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 30580b1..a0c10b3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,9 +1,9 @@ name: CI on: pull_request: - branches: - - 'master' - - 'release-*' + # branches: + # - 'master' + # - 'release-*' push: branches: - 'master' From c5e657be16c33f60ef2ebcbca44c7744d05ba4e4 Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Thu, 12 Feb 2026 18:37:57 -0500 Subject: [PATCH 02/14] Fix JET errors around matching methods for `send_msg(...)` (#166) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` ┌ send_msg(s::IO, header::Any, msg::Any) @ Distributed /workpath/Distributed.jl/src/messages.jl:105 │ no matching method found `send_msg(::Distributed.LocalProcess, ::Any, ::Any)` (1/2 union split): Distributed.send_msg(Distributed.worker_from_id(id::Int64)::Union{Distributed.LocalProcess, Distributed.Worker}, header::Any, msg::Any) └──────────────────── ``` (cherry picked from commit 7223f4f14dc3749515ae36e76973045e27791606) (cherry picked from commit 56fa9f214afd5f2c487ac4132aee8e50daf19c0e) --- src/messages.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/messages.jl b/src/messages.jl index 6e895f0..a27b9ed 100644 --- a/src/messages.jl +++ b/src/messages.jl @@ -102,7 +102,7 @@ end function send_msg(s::IO, header, msg) id = worker_id_from_socket(s) if id > -1 - return send_msg(worker_from_id(id), header, msg) + return send_msg(worker_from_id(id)::Worker, header, msg) end send_msg_unknown(s, header, msg) end From 650247a4b85cc706476129b09bc7f7974f9654ac Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Thu, 12 Feb 2026 18:38:16 -0500 Subject: [PATCH 03/14] Fix JET errors around matching methods for `send_msg_now(...)` (#165) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` ┌ @ Distributed /workpath/Distributed.jl/src/process_messages.jl:321 │ no matching method found `send_msg_now(::Distributed.LocalProcess, ::Distributed.MsgHeader, ::Distributed.IdentifySocketAckMsg)` (1/2 union split): Distributed.send_msg_now(w, Distributed.MsgHeader()::Distributed.MsgHeader, Distributed.IdentifySocketAckMsg()::Distributed.IdentifySocketAckMsg) └──────────────────── ``` ``` ┌ @ Distributed /workpath/Distributed.jl/src/process_messages.jl:359 │ no matching method found `send_msg_now(::Distributed.LocalProcess, ::Distributed.MsgHeader, ::Distributed.JoinCompleteMsg)` (1/2 union split): Distributed.send_msg_now(controller::Union{Distributed.LocalProcess, Distributed.Worker}, Distributed.MsgHeader(Distributed.RRID(0, 0)::Distributed.RRID, (header::Distributed.MsgHeader).notify_oid::Distributed.RRID)::Distributed.MsgHeader, Distributed.JoinCompleteMsg((Distributed.Sys).CPU_THREADS::Int64, Distributed.getpid()::Int32)::Distributed.JoinCompleteMsg) └──────────────────── ``` ``` ┌ connect_to_peer(manager::Distributed.ClusterManager, rpid::Int64, wconfig::Distributed.WorkerConfig) @ Distributed /workpath/Distributed.jl/src/process_messages.jl:368 │ no matching method found `send_msg_now(::Distributed.LocalProcess, ::Distributed.MsgHeader, ::Distributed.IdentifySocketMsg)` (1/2 union split): Distributed.send_msg_now(w, Distributed.MsgHeader()::Distributed.MsgHeader, Distributed.IdentifySocketMsg(myid()::Int64)::Distributed.IdentifySocketMsg) └──────────────────── ``` ``` ┌ send_msg_now(s::Base.PipeEndpoint, header::Distributed.MsgHeader, msg::Distributed.ResultMsg) @ Distributed //workl/clones/JuliaLang/Distributed.jl/src/messages.jl:113 │ no matching method found `send_msg_now(::Distributed.LocalProcess, ::Distributed.MsgHeader, ::Distributed.(1/2 union split): Distributed.send_msg_now(Distributed.worker_from_id(id::Int64)::Union{Distributed.LocalProcess, Worker}, header::Distributed.MsgHeader, msg::Distributed.ResultMsg) └──────────────────── ``` (cherry picked from commit 305d6fc4280de14f015c13a080c785bf68c34785) (cherry picked from commit 871e3d746ab0c7668462f7b88515a362da9c11f1) --- src/messages.jl | 2 +- src/process_messages.jl | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/messages.jl b/src/messages.jl index a27b9ed..1a5dd82 100644 --- a/src/messages.jl +++ b/src/messages.jl @@ -110,7 +110,7 @@ end function send_msg_now(s::IO, header, msg::AbstractMsg) id = worker_id_from_socket(s) if id > -1 - return send_msg_now(worker_from_id(id), header, msg) + return send_msg_now(worker_from_id(id)::Worker, header, msg) end send_msg_unknown(s, header, msg) end diff --git a/src/process_messages.jl b/src/process_messages.jl index a444651..6a4b8c2 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -316,7 +316,7 @@ end function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version) # register a new peer worker connection - w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version) + w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)::Worker send_connection_hdr(w, false) send_msg_now(w, MsgHeader(), IdentifySocketAckMsg()) notify(w.initialized) @@ -329,7 +329,7 @@ end function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) LPROC.id = msg.self_pid - controller = Worker(1, r_stream, w_stream, cluster_manager; version=version) + controller = Worker(1, r_stream, w_stream, cluster_manager; version=version)::Worker notify(controller.initialized) register_worker(LPROC) topology(msg.topology) @@ -362,7 +362,7 @@ end function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig) try (r_s, w_s) = connect(manager, rpid, wconfig) - w = Worker(rpid, r_s, w_s, manager; config=wconfig) + w = Worker(rpid, r_s, w_s, manager; config=wconfig)::Worker process_messages(w.r_stream, w.w_stream, false) send_connection_hdr(w, true) send_msg_now(w, MsgHeader(), IdentifySocketMsg(myid())) From 2c7c6bc1aee028948c7175f347e08c51ddb94c2c Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Fri, 20 Feb 2026 08:18:19 -0500 Subject: [PATCH 04/14] Change `Distributed.cluster_manager` from a global non-constant `ClusterManager` to a global constant `Ref{ClusterManager}` (#177) (cherry picked from commit 2fe1aa4e267517565e99cd06664550dcd230cfc6) --- src/cluster.jl | 11 +++++++++-- src/process_messages.jl | 12 ++++++++---- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index 82a355f..1c958e5 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -9,6 +9,14 @@ Cluster managers implement how workers can be added, removed and communicated wi """ abstract type ClusterManager end +# cluster_manager is a global constant +const cluster_manager = Ref{ClusterManager}() + +function throw_if_cluster_manager_unassigned() + isassigned(cluster_manager) || error("cluster_manager is unassigned") + return nothing +end + """ WorkerConfig @@ -390,8 +398,7 @@ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClus # On workers, the default cluster manager connects via TCP sockets. Custom # transports will need to call this function with their own manager. - global cluster_manager - cluster_manager = manager + cluster_manager[] = manager # Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called. @assert nprocs() <= 1 diff --git a/src/process_messages.jl b/src/process_messages.jl index 6a4b8c2..393c764 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -315,8 +315,10 @@ function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version) end function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version) + throw_if_cluster_manager_unassigned() + # register a new peer worker connection - w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)::Worker + w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager[]; version=version)::Worker send_connection_hdr(w, false) send_msg_now(w, MsgHeader(), IdentifySocketAckMsg()) notify(w.initialized) @@ -328,8 +330,10 @@ function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, versi end function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) + throw_if_cluster_manager_unassigned() + LPROC.id = msg.self_pid - controller = Worker(1, r_stream, w_stream, cluster_manager; version=version)::Worker + controller = Worker(1, r_stream, w_stream, cluster_manager[]; version=version)::Worker notify(controller.initialized) register_worker(LPROC) topology(msg.topology) @@ -348,9 +352,9 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) let rpid=rpid, wconfig=wconfig if lazy # The constructor registers the object with a global registry. - Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig)) + Worker(rpid, ()->connect_to_peer(cluster_manager[], rpid, wconfig)) else - @async connect_to_peer(cluster_manager, rpid, wconfig) + @async connect_to_peer(cluster_manager[], rpid, wconfig) end end end From be78be9e7e392878cd25723327d0e2dff6c4ae69 Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Fri, 20 Feb 2026 23:35:31 -0500 Subject: [PATCH 05/14] Fix a JET error by narrowing the type signature of the `create_worker(x, y)` function (#175) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` ┌ create_worker(manager::Any, wconfig::Any) @ Distributed /workpath/Distributed.jl/src/cluster.jl:620 │ no matching method found `kwcall(::NamedTuple{(:config,), <:Tuple{Any}}, ::Type{Distributed.Worker}, ::Any, ::Sockets.TCPSocket, ::Sockets.TCPSocket, ::Base.LibuvStream)` (1/2 union split): Core.kwcall(NamedTuple{(:config,)}(tuple(wconfig::Any)::Tuple{Any})::NamedTuple{(:config,), <:Tuple{Any}}, Distributed.Worker, (getfield(w::Core.Box, :contents)::Any).id::Any, r_s::Sockets.TCPSocket, w_s::Sockets.TCPSocket, manager::Union{Base.LibuvStream, Distributed.ClusterManager}) └──────────────────── ``` (cherry picked from commit 79b4ca8d7b498a728912bf9d66d31890ff0f8667) (cherry picked from commit d65a9960ae358806312c1adf35f45c62f02caf0b) --- src/cluster.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.jl b/src/cluster.jl index 1c958e5..8161b0c 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -614,7 +614,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch end end -function create_worker(manager, wconfig) +function create_worker(manager::ClusterManager, wconfig::WorkerConfig) # only node 1 can add new nodes, since nobody else has the full list of address:port @assert LPROC.id == 1 timeout = worker_timeout() From f8daeb8eef5e581d16f8e358c1f7c3bed0fa69a6 Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Fri, 20 Feb 2026 23:35:45 -0500 Subject: [PATCH 06/14] Fix a JET error around matching methods for `shell_escape_wincmd(...)` (#174) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` ┌ @ Distributed /workpath/Distributed.jl/src/managers.jl:323 │ no matching method found `shell_escape_wincmd(::Nothing)` (1/2 union split): remotecmd = Distributed.shell_escape_wincmd(Distributed.escape_microsoft_c_args(tuple(exename::Any)::Tuple{Any}, exeflags::Cmd...)::Union{Nothing, String}) └──────────────────── ``` (cherry picked from commit 75c559c21fa28cafba9c9c2e6fdc87d2d8b55f57) (cherry picked from commit 9724553b50599d1ea8a6937c0697d6ca36bde597) --- src/managers.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/managers.jl b/src/managers.jl index ab79abe..560648a 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -347,7 +347,7 @@ function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, pa any(c -> c == '"', exename) && throw(ArgumentError("invalid exename")) - remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...)) + remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...)::AbstractString) # change working directory if dir !== nothing && dir != "" any(c -> c == '"', dir) && throw(ArgumentError("invalid dir")) From 3c73b0044a45e2b5506bb6969302139236a02f33 Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Fri, 20 Feb 2026 23:37:39 -0500 Subject: [PATCH 07/14] Fix JET errors around matching methods for `lock(...)` and `unlock(...)` (#167) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` ┌ put_ref(rid::Distributed.RRID, caller::Int64, args::WeakRef) @ Distributed /workpath/Distributed.jl/src/remotecall.jl:709 │ no matching method found `lock(::Nothing)` (1/2 union split): Distributed.lock((rv::Distributed.RemoteValue).synctake::Union{Nothing, ReentrantLock}) └──────────────────── ``` ``` ┌ put_ref(rid::Distributed.RRID, caller::Int64, args::WeakRef) @ Distributed /workpath/Distributed.jl/src/remotecall.jl:710 │ no matching method found `unlock(::Nothing)` (1/2 union split): Distributed.unlock((rv::Distributed.RemoteValue).synctake::Union{Nothing, ReentrantLock}) ││││││││││││││└──────────────────── ``` ``` ┌ take_ref(rid::Any, caller::Any, args::Vararg{Any}) @ Distributed /workpath/Distributed.jl/src/remotecall.jl:734 │ no matching method found `lock(::Nothing)` (1/2 union split): Distributed.lock((rv::Distributed.RemoteValue).synctake::Union{Nothing, ReentrantLock}) └──────────────────── ``` ``` ┌ take_ref(rid::Any, caller::Any, args::Vararg{Any}) @ Distributed /workpath/Distributed.jl/src/remotecall.jl:742 │ no matching method found `unlock(::Nothing)` (1/2 union split): Distributed.unlock((rv::Distributed.RemoteValue).synctake::Union{Nothing, ReentrantLock}) └──────────────────── ``` ``` ┌ (::Distributed.var"#handle_msg##2#handle_msg##3"{Distributed.CallMsg{…}, Distributed.MsgHeader, Base.PipeEndpoint})() @ Distributed /workpath/Distributed.jl/src/process_messages.jl:292 │ no matching method found `unlock(::Nothing)` (1/2 union split): Distributed.unlock(((v::Distributed.SyncTake).rv::Distributed.RemoteValue).synctake::Union{Nothing, ReentrantLock}) └──────────────────── ``` (cherry picked from commit 01027d14d994da11b8962d9a20a2f5a6888dec5f) (cherry picked from commit 9f6459f83f08e2ef147d102afc1ea5d6d8ab1ec1) --- src/process_messages.jl | 2 +- src/remotecall.jl | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/process_messages.jl b/src/process_messages.jl index 393c764..1c5c49c 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -289,7 +289,7 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi try deliver_result(w_stream, :call_fetch, header.notify_oid, v.v) finally - unlock(v.rv.synctake) + unlock(v.rv.synctake::ReentrantLock) end else deliver_result(w_stream, :call_fetch, header.notify_oid, v) diff --git a/src/remotecall.jl b/src/remotecall.jl index 644ff04..d0ac719 100644 --- a/src/remotecall.jl +++ b/src/remotecall.jl @@ -706,8 +706,8 @@ function put_ref(rid, caller, args...) put!(rv, args...) if myid() == caller && rv.synctake !== nothing # Wait till a "taken" value is serialized out - github issue #29932 - lock(rv.synctake) - unlock(rv.synctake) + lock(rv.synctake::ReentrantLock) + unlock(rv.synctake::ReentrantLock) end nothing end @@ -731,7 +731,7 @@ function take_ref(rid, caller, args...) # special handling for local put! / remote take! on unbuffered channel # github issue #29932 synctake = true - lock(rv.synctake) + lock(rv.synctake::ReentrantLock) end v = try @@ -739,7 +739,7 @@ function take_ref(rid, caller, args...) catch e # avoid unmatched unlock when exception occurs # github issue #33972 - synctake && unlock(rv.synctake) + synctake && unlock(rv.synctake::ReentrantLock) rethrow(e) end From c4850a0c870b6fae398cc99604471c1923e7248d Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Fri, 20 Feb 2026 23:37:54 -0500 Subject: [PATCH 08/14] Fix a JET error around matching methods for `read_worker_host_port(...)` (#171) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` ┌ connect(manager::Distributed.SSHManager, pid::Int64, config::Distributed.WorkerConfig) @ Distributed /workpath/Distributed.jl/src/managers.jl:582 │ no matching method found `read_worker_host_port(::Nothing)` (1/2 union split): Distributed.read_worker_host_port((config::Distributed.WorkerConfig).io::Union{Nothing, IO}) └──────────────────── ``` (cherry picked from commit d259b8d3cc24686430a7285abeed574ed46768e7) (cherry picked from commit 0cf99106072adf5527acd48c668cd95c925e114f) --- src/managers.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/managers.jl b/src/managers.jl index 560648a..0cf79a6 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -606,7 +606,7 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig) # master connecting to workers if config.io !== nothing - (bind_addr, port::Int) = read_worker_host_port(config.io) + (bind_addr, port::Int) = read_worker_host_port(config.io::IO) pubhost = something(config.host, bind_addr) config.host = pubhost config.port = port From 5430adb4d166f9da04532ba63bd9e556e161b460 Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Fri, 20 Feb 2026 23:38:09 -0500 Subject: [PATCH 09/14] Fix a JET error around matching methods for `getindex(...)` (#170) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` ┌ setup_launched_worker(manager::Distributed.SSHManager, wconfig::Distributed.WorkerConfig, launched_q::Vector{Int64}) @ Distributed /workpath/Distributed.jl/src/cluster.jl:563 │ no matching method found `getindex(::Nothing, ::Symbol)` (1/2 union split): cnt = ((wconfig::Distributed.WorkerConfig).environ::Union{Nothing, Dict})[:cpu_threads] └──────────────────── ``` (cherry picked from commit 61ff327e0aa53d25c29780327adab66ab4dcd244) (cherry picked from commit d06aa735f9511ac2618884e77e7caf29fdaeb1b1) --- src/cluster.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.jl b/src/cluster.jl index 8161b0c..d3a903a 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -576,7 +576,7 @@ function setup_launched_worker(manager, wconfig, launched_q) # same type. This is done by setting an appropriate value to `WorkerConfig.cnt`. cnt = something(wconfig.count, 1) if cnt === :auto - cnt = wconfig.environ[:cpu_threads] + cnt = (wconfig.environ::AbstractDict)[:cpu_threads] end cnt = cnt - 1 # Removing self from the requested number From b4a6800e064951ba20310de0df7ecb9768726c0a Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Sat, 21 Feb 2026 03:18:15 -0500 Subject: [PATCH 10/14] Fix a JET error around matching methods for `push!(...)` (#173) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix a JET error around matching methods for `push!(...)` ``` ┌ @ Distributed /workpath/Distributed.jl/src/process_messages.jl:387 │ no matching method found `push!(::Nothing, ::Any)` (1/2 union split): Distributed.push!(default_worker_pool()::Union{Nothing, Distributed.AbstractWorkerPool}, w.id::Any) └──────────────────── ``` (cherry picked from commit f15f306adaafbedc2f431c96087da2ea483c9558) * Add a `::AbstractWorkerPool` return type annotation to `default_worker_pool()` * Add a type annotation in an additional place, to fix another JET error (cherry picked from commit b7c43b235a963d19ec1c5e5aa9b8a565b6e49c34) --- src/workerpool.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/workerpool.jl b/src/workerpool.jl index b28c726..aa73a61 100644 --- a/src/workerpool.jl +++ b/src/workerpool.jl @@ -289,7 +289,7 @@ julia> default_worker_pool() WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4)) ``` """ -function default_worker_pool() +function default_worker_pool()::AbstractWorkerPool # On workers retrieve the default worker pool from the master when accessed # for the first time if _default_worker_pool[] === nothing @@ -299,7 +299,7 @@ function default_worker_pool() _default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1) end end - return _default_worker_pool[] + return _default_worker_pool[]::AbstractWorkerPool end """ From e394141de38ed5fc4616f11b7c799725b2a53675 Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Sat, 21 Feb 2026 03:19:51 -0500 Subject: [PATCH 11/14] Fix a JET error regarding the existence of the local variable `reducer` at a certain point (#169) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix a JET error regarding the existence of the local variable `reducer` at a certain point ``` ┌ var"@distributed"(__source__::LineNumberNode, __module__::Module, args::Vararg{Any}) @ Distributed /workpath/Distributed.jl/src/macros.jl:363 │ local variable `reducer` may be undefined: reducer::Any └──────────────────── ``` By the time we get to this location in the code, we know that the local variable `reducer` exists, by the following reasoning. If the number of arguments is 1, then we have already returned before this point. If the number of arguments is neither 1 nor 2, then we have already thrown an exception by this point. Thus, if we reach this line, the number of arguments must be 2. Since the number of arguments is 2, we know that we executed the `reducer = args[1]` line above, which means that `reducer` is defined. (cherry picked from commit cdb110658e0e3a89de3739b05679863fefb5b91b) * Define `reducer = identity` in the `na==1` branch (cherry picked from commit 1bc91f97a75c1ac321ed28dfca072cbabfad7b4c) --- src/macros.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/macros.jl b/src/macros.jl index c58faf6..ea98c60 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -333,6 +333,7 @@ completion. To wait for completion, prefix the call with [`@sync`](@ref), like : macro distributed(args...) na = length(args) if na==1 + reducer = identity loop = args[1] elseif na==2 reducer = args[1] From e555a698a03f4fdb91beccd0dfd11400cff56a31 Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Sat, 21 Feb 2026 03:21:28 -0500 Subject: [PATCH 12/14] Rename one method of `run_work_thunk()` to `run_work_thunk_remotevalue()`; this fixes a JET error around matching methods for `run_work_thunk(...)` (#181) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This fixes the following JET error: ``` │┌ run_work_thunk(rv::Distributed.RemoteValue, thunk::Bool) @ Distributed /workpath/Distributed.jl/src/process_messages.jl:79 ││ no matching method found `run_work_thunk(::Bool, ::Bool)`: Distributed.run_work_thunk(thunk::Bool, false) │└──────────────────── ``` (cherry picked from commit 2adcd26e81cec03dbf81fa94071b4c4499e539a1) --- src/process_messages.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/process_messages.jl b/src/process_messages.jl index 1c5c49c..211c225 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -75,7 +75,7 @@ function run_work_thunk(thunk::Function, print_error::Bool) end return result end -function run_work_thunk(rv::RemoteValue, thunk) +function run_work_thunk_remotevalue(rv::RemoteValue, thunk) put!(rv, run_work_thunk(thunk, false)) nothing end @@ -85,7 +85,7 @@ function schedule_call(rid, thunk) rv = RemoteValue(def_rv_channel()) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) - errormonitor(@async run_work_thunk(rv, thunk)) + errormonitor(@async run_work_thunk_remotevalue(rv, thunk)) return rv end end From 7b0d59dc23781fba5d30b476c70927ebc5a1f460 Mon Sep 17 00:00:00 2001 From: Andreas Noack Date: Wed, 18 Feb 2026 23:45:26 +0100 Subject: [PATCH 13/14] Ensure correct type parameter of serialized RemoteChannel (#179) This is a forward-port of https://github.com/JuliaLang/Distributed.jl/pull/179 (https://github.com/JuliaLang/Distributed.jl/commit/6649a94075ff6a52ee9558db3ec4491496bf1bea). When serializing RemoteChannels, a new zeroed RemoteChannel is constructed. However, the input type parameter might be part of an outer type parameter which can then lead to a TypeError during deserialization. The fix is just to reuse the type parameter of the input when constructing the zeroed RemoteChannel. (cherry picked from commit 6649a94075ff6a52ee9558db3ec4491496bf1bea) (cherry picked from commit 2e529965ef354af6c7ad19f8dac3d5374cf0a804) --- src/remotecall.jl | 4 ++-- test/distributed_exec.jl | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/remotecall.jl b/src/remotecall.jl index d0ac719..a1cbb16 100644 --- a/src/remotecall.jl +++ b/src/remotecall.jl @@ -413,8 +413,8 @@ function serialize(s::AbstractSerializer, ::Future) invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_fut) end -function serialize(s::AbstractSerializer, ::RemoteChannel) - zero_rc = RemoteChannel{Channel{Any}}((0,0,0)) +function serialize(s::AbstractSerializer, ::RemoteChannel{T}) where T + zero_rc = RemoteChannel{T}((0,0,0)) invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_rc) end diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index a218bf6..19d10eb 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -343,9 +343,12 @@ end @testset "Ser/deser to non-ClusterSerializer objects" begin function test_regular_io_ser(ref::DistributedNext.AbstractRemoteRef) io = IOBuffer() - serialize(io, ref) + # Wrapping the ref in a Dict to exercise the case when the + # type parameter of the RemoteChannel is part of an outer type. + # See https://github.com/JuliaLang/Distributed.jl/issues/178 + serialize(io, Dict("ref" => ref)) seekstart(io) - ref2 = deserialize(io) + ref2 = deserialize(io)["ref"] for fld in fieldnames(typeof(ref)) v = getfield(ref2, fld) if isa(v, Number) @@ -361,6 +364,7 @@ end test_regular_io_ser(Future()) test_regular_io_ser(RemoteChannel()) + test_regular_io_ser(RemoteChannel(() -> Channel{Bool}(1))) end @testset "@distributed and [un]buffered reads" begin From 82874229ac189ccfa2ac2f80ad3704282915cbcb Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Sat, 21 Feb 2026 03:37:22 -0500 Subject: [PATCH 14/14] Fix some JET errors around matching methods for `kill(...)` and `process_exited(...)` (#172) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix some JET errors around matching methods for `kill(...)` and `process_exited(...)` ``` │┌ manage(manager::Distributed.LocalManager, id::Int64, config::Distributed.WorkerConfig, op::Symbol) @ Distributed /workpath/Distributed.jl/src/managers.jl:529 ││ no matching method found `kill(::Nothing, ::Int64)` (1/2 union split): Distributed.kill((config::Distributed.WorkerConfig).process::Union{Nothing, Base.Process}, 2) │└──────────────────── ``` ``` │┌ (::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig})() @ Distributed /workpath/Distributed.jl/src/managers.jl:757 │ no matching method found `kill(::Nothing, ::Int64)` (1/2 union split): Distributed.kill((getfield(#self#::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig}, :config)::Distributed.WorkerConfig).process::Union{Nothing, Base.Process}, (profile_sig::Tuple{String, Int64})[2]::Int64) └──────────────────── ``` ``` ┌ (::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig})() @ Distributed /workpath/Distributed.jl/src/managers.jl:761 │ no matching method found `kill(::Nothing, ::Int64)` (1/2 union split): Distributed.kill((getfield(#self#::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig}, :config)::Distributed.WorkerConfig).process::Union{Nothing, Base.Process}, (Distributed.Base).SIGQUIT::Int64) └──────────────────── ``` ``` ┌ (::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig})() @ Distributed /workpath/Distributed.jl/src/managers.jl:766 │ no matching method found `kill(::Nothing, ::Int64)` (1/2 union split): Distributed.kill((getfield(#self#::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig}, :config)::Distributed.WorkerConfig).process::Union{Nothing, Base.Process}, (Distributed.Base).SIGKILL::Int64) └──────────────────── ``` ``` ┌ (::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig})() @ Distributed /workpath/Distributed.jl/src/managers.jl:752 │ no matching method found `process_exited(::Nothing)` (1/2 union split): Distributed.process_exited((getfield(#self#::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig}, :config)::Distributed.WorkerConfig).process::Union{Nothing, Base.Process}) └──────────────────── ``` ``` ┌ (::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig})() @ Distributed /workpath/Distributed.jl/src/managers.jl:764 │ no matching method found `process_exited(::Nothing)` (1/2 union split): Distributed.process_exited((getfield(#self#::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig}, :config)::Distributed.WorkerConfig).process::Union{Nothing, Base.Process}) └──────────────────── ``` (cherry picked from commit e19a4bfe30fd580cc8e8baa5033e2d7a2c1e1dc9) * Apply suggestions from code review Co-authored-by: James Wrigley * Define `process = config.process::Process` in one place, and then re-use the `process` variable when needed --------- Co-authored-by: James Wrigley (cherry picked from commit 231da28ef52953b68280a295419b990cb5c7e3f7) --- src/managers.jl | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/managers.jl b/src/managers.jl index 0cf79a6..2ce1a3b 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -553,7 +553,7 @@ end function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol) if op === :interrupt - kill(config.process, 2) + kill(config.process::Process, 2) end end @@ -776,21 +776,22 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wai sleep(exit_timeout) # Check to see if our child exited, and if not, send an actual kill signal - if !process_exited(config.process) + process = config.process::Process + if !process_exited(process) @warn "Failed to gracefully kill worker $(pid)" profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10) if profile_sig !== nothing @warn("Sending profile $(profile_sig[1]) to worker $(pid)") - kill(config.process, profile_sig[2]) + kill(process, profile_sig[2]) sleep(profile_wait) end @warn("Sending SIGQUIT to worker $(pid)") - kill(config.process, Base.SIGQUIT) + kill(process, Base.SIGQUIT) sleep(term_timeout) - if !process_exited(config.process) + if !process_exited(process) @warn("Worker $(pid) ignored SIGQUIT, sending SIGKILL") - kill(config.process, Base.SIGKILL) + kill(process, Base.SIGKILL) end end end