diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 30580b1..3ad9f8f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,6 +15,9 @@ concurrency: group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref != 'refs/heads/main' || startsWith(github.ref, 'refs/heads/release-') || github.run_number }} cancel-in-progress: ${{ startsWith(github.ref, 'refs/pull/') }} +permissions: + contents: read + jobs: test: name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} @@ -45,6 +48,8 @@ jobs: steps: - uses: actions/checkout@v6 + with: + persist-credentials: false - uses: julia-actions/setup-julia@v2 with: version: ${{ matrix.version }} @@ -73,6 +78,8 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v6 + with: + persist-credentials: false - uses: julia-actions/setup-julia@latest with: version: '1' @@ -82,3 +89,17 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: julia --project=docs/ docs/make.jl + + jet: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + with: + persist-credentials: false + - uses: julia-actions/setup-julia@latest + with: + version: '1.12' + # version: 'nightly' + - run: julia --color=yes --project=ci/jet -e 'import Pkg; Pkg.instantiate()' + - name: Run the JET tests + run: julia --color=yes --project=ci/jet ci/jet/check.jl diff --git a/ci/jet/Project.toml b/ci/jet/Project.toml new file mode 100644 index 0000000..15df17d --- /dev/null +++ b/ci/jet/Project.toml @@ -0,0 +1,11 @@ +[deps] +DistributedNext = "fab6aee4-877b-4bac-a744-3eca44acbb6f" +JET = "c3a54625-cd67-489e-a8e7-0a5a0ff4e31b" +Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + +[sources] +DistributedNext = {path = "../.."} + +[compat] +JET = "0.11" diff --git a/ci/jet/check.jl b/ci/jet/check.jl new file mode 100644 index 0000000..11e615c --- /dev/null +++ b/ci/jet/check.jl @@ -0,0 +1,20 @@ +using DistributedNext: DistributedNext + +using JET: JET +using Serialization: Serialization +using Test: Test, @testset + +# We don't want to fail PkgEval because of a JET failure +# Therefore, we don't put the JET tests in the regular DistributedNext test suite +# Instead, we put it in a separate CI job, which runs on the DistributedNext repo + +@testset "JET" begin + ignored_modules = ( + # We will ignore Base: + Base, + + # We'll ignore the Serialization stdlib: + Serialization, + ) + JET.test_package(DistributedNext; ignored_modules) +end diff --git a/src/cluster.jl b/src/cluster.jl index 82a355f..d3a903a 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -9,6 +9,14 @@ Cluster managers implement how workers can be added, removed and communicated wi """ abstract type ClusterManager end +# cluster_manager is a global constant +const cluster_manager = Ref{ClusterManager}() + +function throw_if_cluster_manager_unassigned() + isassigned(cluster_manager) || error("cluster_manager is unassigned") + return nothing +end + """ WorkerConfig @@ -390,8 +398,7 @@ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClus # On workers, the default cluster manager connects via TCP sockets. Custom # transports will need to call this function with their own manager. - global cluster_manager - cluster_manager = manager + cluster_manager[] = manager # Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called. @assert nprocs() <= 1 @@ -569,7 +576,7 @@ function setup_launched_worker(manager, wconfig, launched_q) # same type. This is done by setting an appropriate value to `WorkerConfig.cnt`. cnt = something(wconfig.count, 1) if cnt === :auto - cnt = wconfig.environ[:cpu_threads] + cnt = (wconfig.environ::AbstractDict)[:cpu_threads] end cnt = cnt - 1 # Removing self from the requested number @@ -607,7 +614,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch end end -function create_worker(manager, wconfig) +function create_worker(manager::ClusterManager, wconfig::WorkerConfig) # only node 1 can add new nodes, since nobody else has the full list of address:port @assert LPROC.id == 1 timeout = worker_timeout() diff --git a/src/macros.jl b/src/macros.jl index c58faf6..ea98c60 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -333,6 +333,7 @@ completion. To wait for completion, prefix the call with [`@sync`](@ref), like : macro distributed(args...) na = length(args) if na==1 + reducer = identity loop = args[1] elseif na==2 reducer = args[1] diff --git a/src/managers.jl b/src/managers.jl index ab79abe..2ce1a3b 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -347,7 +347,7 @@ function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, pa any(c -> c == '"', exename) && throw(ArgumentError("invalid exename")) - remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...)) + remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...)::AbstractString) # change working directory if dir !== nothing && dir != "" any(c -> c == '"', dir) && throw(ArgumentError("invalid dir")) @@ -553,7 +553,7 @@ end function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol) if op === :interrupt - kill(config.process, 2) + kill(config.process::Process, 2) end end @@ -606,7 +606,7 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig) # master connecting to workers if config.io !== nothing - (bind_addr, port::Int) = read_worker_host_port(config.io) + (bind_addr, port::Int) = read_worker_host_port(config.io::IO) pubhost = something(config.host, bind_addr) config.host = pubhost config.port = port @@ -776,21 +776,22 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wai sleep(exit_timeout) # Check to see if our child exited, and if not, send an actual kill signal - if !process_exited(config.process) + process = config.process::Process + if !process_exited(process) @warn "Failed to gracefully kill worker $(pid)" profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10) if profile_sig !== nothing @warn("Sending profile $(profile_sig[1]) to worker $(pid)") - kill(config.process, profile_sig[2]) + kill(process, profile_sig[2]) sleep(profile_wait) end @warn("Sending SIGQUIT to worker $(pid)") - kill(config.process, Base.SIGQUIT) + kill(process, Base.SIGQUIT) sleep(term_timeout) - if !process_exited(config.process) + if !process_exited(process) @warn("Worker $(pid) ignored SIGQUIT, sending SIGKILL") - kill(config.process, Base.SIGKILL) + kill(process, Base.SIGKILL) end end end diff --git a/src/messages.jl b/src/messages.jl index 6e895f0..1a5dd82 100644 --- a/src/messages.jl +++ b/src/messages.jl @@ -102,7 +102,7 @@ end function send_msg(s::IO, header, msg) id = worker_id_from_socket(s) if id > -1 - return send_msg(worker_from_id(id), header, msg) + return send_msg(worker_from_id(id)::Worker, header, msg) end send_msg_unknown(s, header, msg) end @@ -110,7 +110,7 @@ end function send_msg_now(s::IO, header, msg::AbstractMsg) id = worker_id_from_socket(s) if id > -1 - return send_msg_now(worker_from_id(id), header, msg) + return send_msg_now(worker_from_id(id)::Worker, header, msg) end send_msg_unknown(s, header, msg) end diff --git a/src/process_messages.jl b/src/process_messages.jl index a444651..211c225 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -75,7 +75,7 @@ function run_work_thunk(thunk::Function, print_error::Bool) end return result end -function run_work_thunk(rv::RemoteValue, thunk) +function run_work_thunk_remotevalue(rv::RemoteValue, thunk) put!(rv, run_work_thunk(thunk, false)) nothing end @@ -85,7 +85,7 @@ function schedule_call(rid, thunk) rv = RemoteValue(def_rv_channel()) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) - errormonitor(@async run_work_thunk(rv, thunk)) + errormonitor(@async run_work_thunk_remotevalue(rv, thunk)) return rv end end @@ -289,7 +289,7 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi try deliver_result(w_stream, :call_fetch, header.notify_oid, v.v) finally - unlock(v.rv.synctake) + unlock(v.rv.synctake::ReentrantLock) end else deliver_result(w_stream, :call_fetch, header.notify_oid, v) @@ -315,8 +315,10 @@ function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version) end function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version) + throw_if_cluster_manager_unassigned() + # register a new peer worker connection - w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version) + w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager[]; version=version)::Worker send_connection_hdr(w, false) send_msg_now(w, MsgHeader(), IdentifySocketAckMsg()) notify(w.initialized) @@ -328,8 +330,10 @@ function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, versi end function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) + throw_if_cluster_manager_unassigned() + LPROC.id = msg.self_pid - controller = Worker(1, r_stream, w_stream, cluster_manager; version=version) + controller = Worker(1, r_stream, w_stream, cluster_manager[]; version=version)::Worker notify(controller.initialized) register_worker(LPROC) topology(msg.topology) @@ -348,9 +352,9 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) let rpid=rpid, wconfig=wconfig if lazy # The constructor registers the object with a global registry. - Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig)) + Worker(rpid, ()->connect_to_peer(cluster_manager[], rpid, wconfig)) else - @async connect_to_peer(cluster_manager, rpid, wconfig) + @async connect_to_peer(cluster_manager[], rpid, wconfig) end end end @@ -362,7 +366,7 @@ end function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig) try (r_s, w_s) = connect(manager, rpid, wconfig) - w = Worker(rpid, r_s, w_s, manager; config=wconfig) + w = Worker(rpid, r_s, w_s, manager; config=wconfig)::Worker process_messages(w.r_stream, w.w_stream, false) send_connection_hdr(w, true) send_msg_now(w, MsgHeader(), IdentifySocketMsg(myid())) diff --git a/src/remotecall.jl b/src/remotecall.jl index 644ff04..d0ac719 100644 --- a/src/remotecall.jl +++ b/src/remotecall.jl @@ -706,8 +706,8 @@ function put_ref(rid, caller, args...) put!(rv, args...) if myid() == caller && rv.synctake !== nothing # Wait till a "taken" value is serialized out - github issue #29932 - lock(rv.synctake) - unlock(rv.synctake) + lock(rv.synctake::ReentrantLock) + unlock(rv.synctake::ReentrantLock) end nothing end @@ -731,7 +731,7 @@ function take_ref(rid, caller, args...) # special handling for local put! / remote take! on unbuffered channel # github issue #29932 synctake = true - lock(rv.synctake) + lock(rv.synctake::ReentrantLock) end v = try @@ -739,7 +739,7 @@ function take_ref(rid, caller, args...) catch e # avoid unmatched unlock when exception occurs # github issue #33972 - synctake && unlock(rv.synctake) + synctake && unlock(rv.synctake::ReentrantLock) rethrow(e) end diff --git a/src/workerpool.jl b/src/workerpool.jl index b28c726..aa73a61 100644 --- a/src/workerpool.jl +++ b/src/workerpool.jl @@ -289,7 +289,7 @@ julia> default_worker_pool() WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4)) ``` """ -function default_worker_pool() +function default_worker_pool()::AbstractWorkerPool # On workers retrieve the default worker pool from the master when accessed # for the first time if _default_worker_pool[] === nothing @@ -299,7 +299,7 @@ function default_worker_pool() _default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1) end end - return _default_worker_pool[] + return _default_worker_pool[]::AbstractWorkerPool end """