Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref != 'refs/heads/main' || startsWith(github.ref, 'refs/heads/release-') || github.run_number }}
cancel-in-progress: ${{ startsWith(github.ref, 'refs/pull/') }}

permissions:
contents: read

jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
Expand Down Expand Up @@ -45,6 +48,8 @@ jobs:

steps:
- uses: actions/checkout@v6
with:
persist-credentials: false
- uses: julia-actions/setup-julia@v2
with:
version: ${{ matrix.version }}
Expand Down Expand Up @@ -73,6 +78,8 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
with:
persist-credentials: false
- uses: julia-actions/setup-julia@latest
with:
version: '1'
Expand All @@ -82,3 +89,17 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: julia --project=docs/ docs/make.jl

jet:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
with:
persist-credentials: false
- uses: julia-actions/setup-julia@latest
with:
version: '1.12'
# version: 'nightly'
- run: julia --color=yes --project=ci/jet -e 'import Pkg; Pkg.instantiate()'
- name: Run the JET tests
run: julia --color=yes --project=ci/jet ci/jet/check.jl
11 changes: 11 additions & 0 deletions ci/jet/Project.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[deps]
DistributedNext = "fab6aee4-877b-4bac-a744-3eca44acbb6f"
JET = "c3a54625-cd67-489e-a8e7-0a5a0ff4e31b"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[sources]
DistributedNext = {path = "../.."}

[compat]
JET = "0.11"
20 changes: 20 additions & 0 deletions ci/jet/check.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using DistributedNext: DistributedNext

using JET: JET
using Serialization: Serialization
using Test: Test, @testset

# We don't want to fail PkgEval because of a JET failure
# Therefore, we don't put the JET tests in the regular DistributedNext test suite
# Instead, we put it in a separate CI job, which runs on the DistributedNext repo

@testset "JET" begin
ignored_modules = (
# We will ignore Base:
Base,

# We'll ignore the Serialization stdlib:
Serialization,
)
JET.test_package(DistributedNext; ignored_modules)
end
15 changes: 11 additions & 4 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ Cluster managers implement how workers can be added, removed and communicated wi
"""
abstract type ClusterManager end

# cluster_manager is a global constant
const cluster_manager = Ref{ClusterManager}()

function throw_if_cluster_manager_unassigned()
isassigned(cluster_manager) || error("cluster_manager is unassigned")
return nothing
end

"""
WorkerConfig

Expand Down Expand Up @@ -390,8 +398,7 @@ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClus

# On workers, the default cluster manager connects via TCP sockets. Custom
# transports will need to call this function with their own manager.
global cluster_manager
cluster_manager = manager
cluster_manager[] = manager

# Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called.
@assert nprocs() <= 1
Expand Down Expand Up @@ -569,7 +576,7 @@ function setup_launched_worker(manager, wconfig, launched_q)
# same type. This is done by setting an appropriate value to `WorkerConfig.cnt`.
cnt = something(wconfig.count, 1)
if cnt === :auto
cnt = wconfig.environ[:cpu_threads]
cnt = (wconfig.environ::AbstractDict)[:cpu_threads]
end
cnt = cnt - 1 # Removing self from the requested number

Expand Down Expand Up @@ -607,7 +614,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
end
end

function create_worker(manager, wconfig)
function create_worker(manager::ClusterManager, wconfig::WorkerConfig)
# only node 1 can add new nodes, since nobody else has the full list of address:port
@assert LPROC.id == 1
timeout = worker_timeout()
Expand Down
1 change: 1 addition & 0 deletions src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ completion. To wait for completion, prefix the call with [`@sync`](@ref), like :
macro distributed(args...)
na = length(args)
if na==1
reducer = identity
loop = args[1]
elseif na==2
reducer = args[1]
Expand Down
17 changes: 9 additions & 8 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, pa

any(c -> c == '"', exename) && throw(ArgumentError("invalid exename"))

remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...))
remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...)::AbstractString)
# change working directory
if dir !== nothing && dir != ""
any(c -> c == '"', dir) && throw(ArgumentError("invalid dir"))
Expand Down Expand Up @@ -553,7 +553,7 @@ end

function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
if op === :interrupt
kill(config.process, 2)
kill(config.process::Process, 2)
end
end

Expand Down Expand Up @@ -606,7 +606,7 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)

# master connecting to workers
if config.io !== nothing
(bind_addr, port::Int) = read_worker_host_port(config.io)
(bind_addr, port::Int) = read_worker_host_port(config.io::IO)
pubhost = something(config.host, bind_addr)
config.host = pubhost
config.port = port
Expand Down Expand Up @@ -776,21 +776,22 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wai
sleep(exit_timeout)

# Check to see if our child exited, and if not, send an actual kill signal
if !process_exited(config.process)
process = config.process::Process
if !process_exited(process)
@warn "Failed to gracefully kill worker $(pid)"
profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10)
if profile_sig !== nothing
@warn("Sending profile $(profile_sig[1]) to worker $(pid)")
kill(config.process, profile_sig[2])
kill(process, profile_sig[2])
sleep(profile_wait)
end
@warn("Sending SIGQUIT to worker $(pid)")
kill(config.process, Base.SIGQUIT)
kill(process, Base.SIGQUIT)

sleep(term_timeout)
if !process_exited(config.process)
if !process_exited(process)
@warn("Worker $(pid) ignored SIGQUIT, sending SIGKILL")
kill(config.process, Base.SIGKILL)
kill(process, Base.SIGKILL)
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ end
function send_msg(s::IO, header, msg)
id = worker_id_from_socket(s)
if id > -1
return send_msg(worker_from_id(id), header, msg)
return send_msg(worker_from_id(id)::Worker, header, msg)
end
send_msg_unknown(s, header, msg)
end

function send_msg_now(s::IO, header, msg::AbstractMsg)
id = worker_id_from_socket(s)
if id > -1
return send_msg_now(worker_from_id(id), header, msg)
return send_msg_now(worker_from_id(id)::Worker, header, msg)
end
send_msg_unknown(s, header, msg)
end
Expand Down
20 changes: 12 additions & 8 deletions src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ function run_work_thunk(thunk::Function, print_error::Bool)
end
return result
end
function run_work_thunk(rv::RemoteValue, thunk)
function run_work_thunk_remotevalue(rv::RemoteValue, thunk)
put!(rv, run_work_thunk(thunk, false))
nothing
end
Expand All @@ -85,7 +85,7 @@ function schedule_call(rid, thunk)
rv = RemoteValue(def_rv_channel())
(PGRP::ProcessGroup).refs[rid] = rv
push!(rv.clientset, rid.whence)
errormonitor(@async run_work_thunk(rv, thunk))
errormonitor(@async run_work_thunk_remotevalue(rv, thunk))
return rv
end
end
Expand Down Expand Up @@ -289,7 +289,7 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi
try
deliver_result(w_stream, :call_fetch, header.notify_oid, v.v)
finally
unlock(v.rv.synctake)
unlock(v.rv.synctake::ReentrantLock)
end
else
deliver_result(w_stream, :call_fetch, header.notify_oid, v)
Expand All @@ -315,8 +315,10 @@ function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
end

function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version)
throw_if_cluster_manager_unassigned()

# register a new peer worker connection
w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)
w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager[]; version=version)::Worker
send_connection_hdr(w, false)
send_msg_now(w, MsgHeader(), IdentifySocketAckMsg())
notify(w.initialized)
Expand All @@ -328,8 +330,10 @@ function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, versi
end

function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
throw_if_cluster_manager_unassigned()

LPROC.id = msg.self_pid
controller = Worker(1, r_stream, w_stream, cluster_manager; version=version)
controller = Worker(1, r_stream, w_stream, cluster_manager[]; version=version)::Worker
notify(controller.initialized)
register_worker(LPROC)
topology(msg.topology)
Expand All @@ -348,9 +352,9 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
let rpid=rpid, wconfig=wconfig
if lazy
# The constructor registers the object with a global registry.
Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig))
Worker(rpid, ()->connect_to_peer(cluster_manager[], rpid, wconfig))
else
@async connect_to_peer(cluster_manager, rpid, wconfig)
@async connect_to_peer(cluster_manager[], rpid, wconfig)
end
end
end
Expand All @@ -362,7 +366,7 @@ end
function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig)
try
(r_s, w_s) = connect(manager, rpid, wconfig)
w = Worker(rpid, r_s, w_s, manager; config=wconfig)
w = Worker(rpid, r_s, w_s, manager; config=wconfig)::Worker
process_messages(w.r_stream, w.w_stream, false)
send_connection_hdr(w, true)
send_msg_now(w, MsgHeader(), IdentifySocketMsg(myid()))
Expand Down
8 changes: 4 additions & 4 deletions src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,8 @@ function put_ref(rid, caller, args...)
put!(rv, args...)
if myid() == caller && rv.synctake !== nothing
# Wait till a "taken" value is serialized out - github issue #29932
lock(rv.synctake)
unlock(rv.synctake)
lock(rv.synctake::ReentrantLock)
unlock(rv.synctake::ReentrantLock)
end
nothing
end
Expand All @@ -731,15 +731,15 @@ function take_ref(rid, caller, args...)
# special handling for local put! / remote take! on unbuffered channel
# github issue #29932
synctake = true
lock(rv.synctake)
lock(rv.synctake::ReentrantLock)
end

v = try
take!(rv, args...)
catch e
# avoid unmatched unlock when exception occurs
# github issue #33972
synctake && unlock(rv.synctake)
synctake && unlock(rv.synctake::ReentrantLock)
rethrow(e)
end

Expand Down
4 changes: 2 additions & 2 deletions src/workerpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
```
"""
function default_worker_pool()
function default_worker_pool()::AbstractWorkerPool
# On workers retrieve the default worker pool from the master when accessed
# for the first time
if _default_worker_pool[] === nothing
Expand All @@ -299,7 +299,7 @@ function default_worker_pool()
_default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1)
end
end
return _default_worker_pool[]
return _default_worker_pool[]::AbstractWorkerPool
end

"""
Expand Down
Loading