Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ jobs:
key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }}
- name: Run benchmarks
run: |
# Raise RLIMIT_MEMLOCK so the Netty io_uring transport (used by AUTO
# on Linux 5.10+) can allocate its submission/completion queue rings.
# Without this, NettyTransportBenchmark silently falls back to EPOLL.
sudo prlimit --pid $$ --memlock=unlimited:unlimited 2>/dev/null || true
./build/sbt -Pscala-${{ inputs.scala }} -Pyarn -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pspark-ganglia-lgpl Test/package
# Make less noisy
cp conf/log4j2.properties.template conf/log4j2.properties
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ jobs:
run: |
# Fix for TTY related issues when launching the Ammonite REPL in tests.
export TERM=vt100
# Raise RLIMIT_MEMLOCK so the Netty io_uring transport can allocate
# its submission/completion queue rings. Without this, AUTO transport
# selection silently falls back to EPOLL on the default 64KB memlock,
# leaving io_uring code paths untested.
sudo prlimit --pid $$ --memlock=unlimited:unlimited 2>/dev/null || true
# Hive "other tests" test needs larger metaspace size based on experiment.
if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi
# SPARK-46283: should delete the following env replacement after SPARK 3.x EOL
Expand Down
10 changes: 10 additions & 0 deletions common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-io_uring</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-io_uring</artifactId>
<classifier>linux-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ public enum IOMode {
*/
KQUEUE,
/**
* Prefer to use native EPOLL on Linux (or KQUEUE on MacOS) if available. Then, fallback to NIO.
* Native io_uring via JNI, Linux only. Requires kernel 5.10+.
*/
IO_URING,
/**
* Prefer to use a native transport when available. On Linux, io_uring is preferred over EPOLL
* when the running kernel supports it AND the JVM can actually allocate an io_uring ring
* (probed once via {@link NettyUtils#isIoUringUsable()}; environments with low
* {@code RLIMIT_MEMLOCK} fall through to EPOLL). On MacOS/BSD, KQUEUE is used. Falls back to
* NIO when no native transport is available.
*/
AUTO
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.network.util;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
Expand All @@ -32,15 +33,24 @@
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.uring.IoUring;
import io.netty.channel.uring.IoUringIoHandler;
import io.netty.channel.uring.IoUringServerSocketChannel;
import io.netty.channel.uring.IoUringSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.PlatformDependent;

import org.apache.spark.internal.SparkLogger;
import org.apache.spark.internal.SparkLoggerFactory;

/**
* Utilities for creating various Netty constructs based on whether we're using NIO, EPOLL,
* , KQUEUE, or AUTO.
* KQUEUE, IO_URING, or AUTO.
*/
public class NettyUtils {

private static final SparkLogger logger = SparkLoggerFactory.getLogger(NettyUtils.class);

/**
* Specifies an upper bound on the number of Netty threads that Spark requires by default.
* In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
Expand All @@ -56,10 +66,70 @@ public class NettyUtils {
private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator =
new PooledByteBufAllocator[2];

/**
* Cached result of probing whether io_uring can actually allocate the rings Spark needs on
* this JVM. `null` means not yet probed; non-null is the probed value.
*
* <p>{@link IoUring#isAvailable()} only checks that the JNI library loaded and basic syscalls
* work; it does not detect environments where the kernel allows io_uring but
* {@code RLIMIT_MEMLOCK} is too low for the submission/completion queue rings (common in
* containers, GitHub Actions runners, and other restricted environments). The probe creates
* a {@link MultiThreadIoEventLoopGroup} sized to {@link #MAX_DEFAULT_NETTY_THREADS} -- which
* is the worst case Spark allocates by default for a single event loop group -- and shuts it
* down to verify ring allocation actually succeeds. Probing with one thread is insufficient
* because some restricted environments allow a single ring to fit in memlock but not the eight
* rings Spark needs in practice.
*/
private static volatile Boolean ioUringUsable = null;

public static long freeDirectMemory() {
return PlatformDependent.maxDirectMemory() - PlatformDependent.usedDirectMemory();
}

/**
* Returns true if io_uring can actually be used on the running JVM. Probes once (with the
* result cached) by attempting a real ring allocation, which catches environments where
* {@link IoUring#isAvailable()} returns true but {@code RLIMIT_MEMLOCK} is too low to allocate
* the submission/completion queues (common in containers, GitHub Actions runners, and other
* restricted environments).
*
* <p>Used by AUTO mode and by tests that gate execution on io_uring being usable. An explicit
* {@link IOMode#IO_URING} mode does not consult this and surfaces the underlying error.
*/
public static boolean isIoUringUsable() {
Boolean cached = ioUringUsable;
if (cached != null) {
return cached;
}
synchronized (NettyUtils.class) {
if (ioUringUsable != null) {
return ioUringUsable;
}
if (!JavaUtils.isLinux || !IoUring.isAvailable()) {
ioUringUsable = false;
return false;
}
MultiThreadIoEventLoopGroup probe = null;
try {
probe = new MultiThreadIoEventLoopGroup(
MAX_DEFAULT_NETTY_THREADS, IoUringIoHandler.newFactory());
ioUringUsable = true;
} catch (Throwable t) {
logger.warn("io_uring is reported as available but allocation of " +
MAX_DEFAULT_NETTY_THREADS + " rings failed; " +
"AUTO will fall back to EPOLL on this JVM. " +
"Common cause: RLIMIT_MEMLOCK too low (containers, restricted environments). " +
"To force io_uring, set spark.shuffle.io.mode=IO_URING explicitly.", t);
ioUringUsable = false;
} finally {
if (probe != null) {
probe.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
}
}
return ioUringUsable;
}
}

/** Creates a new ThreadFactory which prefixes each thread with the given name. */
public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
return new DefaultThreadFactory(threadPoolPrefix, true);
Expand All @@ -73,8 +143,11 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String
case NIO -> NioIoHandler.newFactory();
case EPOLL -> EpollIoHandler.newFactory();
case KQUEUE -> KQueueIoHandler.newFactory();
case IO_URING -> IoUringIoHandler.newFactory();
case AUTO -> {
if (JavaUtils.isLinux && Epoll.isAvailable()) {
if (isIoUringUsable()) {
yield IoUringIoHandler.newFactory();
} else if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollIoHandler.newFactory();
} else if (JavaUtils.isMac && KQueue.isAvailable()) {
yield KQueueIoHandler.newFactory();
Expand All @@ -92,8 +165,11 @@ public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
case NIO -> NioSocketChannel.class;
case EPOLL -> EpollSocketChannel.class;
case KQUEUE -> KQueueSocketChannel.class;
case IO_URING -> IoUringSocketChannel.class;
case AUTO -> {
if (JavaUtils.isLinux && Epoll.isAvailable()) {
if (isIoUringUsable()) {
yield IoUringSocketChannel.class;
} else if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollSocketChannel.class;
} else if (JavaUtils.isMac && KQueue.isAvailable()) {
yield KQueueSocketChannel.class;
Expand All @@ -110,8 +186,11 @@ public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode)
case NIO -> NioServerSocketChannel.class;
case EPOLL -> EpollServerSocketChannel.class;
case KQUEUE -> KQueueServerSocketChannel.class;
case IO_URING -> IoUringServerSocketChannel.class;
case AUTO -> {
if (JavaUtils.isLinux && Epoll.isAvailable()) {
if (isIoUringUsable()) {
yield IoUringServerSocketChannel.class;
} else if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollServerSocketChannel.class;
} else if (JavaUtils.isMac && KQueue.isAvailable()) {
yield KQueueServerSocketChannel.class;
Expand Down
6 changes: 6 additions & 0 deletions common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_kqueue_aarch_64.jnilib" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_transport_native_epoll_riscv64.so"
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_epoll_riscv64.so" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_transport_native_io_uring42_x86_64.so"
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_io_uring42_x86_64.so" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_transport_native_io_uring42_aarch_64.so"
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_io_uring42_aarch_64.so" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_transport_native_io_uring42_riscv64.so"
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_io_uring42_riscv64.so" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_tcnative_linux_x86_64.so"
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_tcnative_linux_x86_64.so" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_tcnative_linux_aarch_64.so"
Expand Down
10 changes: 10 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,16 @@
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-io_uring</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-io_uring</artifactId>
<classifier>linux-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
Expand Down
7 changes: 6 additions & 1 deletion core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark
import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.network.util.IOMode
import org.apache.spark.network.util.{IOMode, NettyUtils}
import org.apache.spark.util.Utils

abstract class ShuffleNettySuite extends ShuffleSuite {
Expand Down Expand Up @@ -56,6 +56,11 @@ class ShuffleNettyKQueueSuite extends ShuffleNettySuite {
override def ioMode: IOMode = IOMode.KQUEUE
}

class ShuffleNettyIoUringSuite extends ShuffleNettySuite {
override def shouldRunTests: Boolean = Utils.isLinux && NettyUtils.isIoUringUsable
override def ioMode: IOMode = IOMode.IO_URING
}

class ShuffleNettyAutoSuite extends ShuffleNettySuite {
override def ioMode: IOMode = IOMode.AUTO
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,10 @@ object NettyTransportBenchmark extends BenchmarkBase {
/**
* Suite 3: IOMode Comparison (NIO vs AUTO).
* AUTO selects the best native transport via NettyUtils.createEventLoop
* (EPOLL on Linux, KQUEUE on macOS, NIO fallback), so comparing NIO vs AUTO
* shows the benefit of native transport without needing manual probing.
* Uses concurrent load (8 clients) to amplify transport-level differences.
* (IO_URING on Linux 5.10+, then EPOLL on Linux, KQUEUE on macOS, NIO fallback),
* so comparing NIO vs AUTO shows the benefit of native transport without needing
* manual probing. Uses concurrent load (8 clients) to amplify transport-level
* differences.
*/
private def ioModeComparisonBenchmark(): Unit = {
val payload = new Array[Byte](MEDIUM_PAYLOAD)
Expand Down Expand Up @@ -562,8 +563,9 @@ object NettyTransportBenchmark extends BenchmarkBase {
* and fetches them using client.fetchChunk(). This exercises the DefaultFileRegion
* zero-copy sendfile/splice path.
*
* Compares NIO vs AUTO to verify that native transports (EPOLL/KQUEUE) use sendfile()
* for file-backed transfers. AUTO should be equal to or faster than NIO.
* Compares NIO vs AUTO to verify that native transports use the kernel zero-copy
* path for file-backed transfers (sendfile() for EPOLL/KQUEUE, splice() via the
* io_uring submission queue for IO_URING). AUTO should be equal to or faster than NIO.
*/
private def fileBackedShuffleBenchmark(): Unit = {
val numFiles = 100
Expand Down
4 changes: 4 additions & 0 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,14 @@ netty-tcnative-boringssl-static/2.0.77.Final/osx-x86_64/netty-tcnative-boringssl
netty-tcnative-boringssl-static/2.0.77.Final/windows-x86_64/netty-tcnative-boringssl-static-2.0.77.Final-windows-x86_64.jar
netty-tcnative-classes/2.0.77.Final//netty-tcnative-classes-2.0.77.Final.jar
netty-transport-classes-epoll/4.2.13.Final//netty-transport-classes-epoll-4.2.13.Final.jar
netty-transport-classes-io_uring/4.2.13.Final//netty-transport-classes-io_uring-4.2.13.Final.jar
netty-transport-classes-kqueue/4.2.13.Final//netty-transport-classes-kqueue-4.2.13.Final.jar
netty-transport-native-epoll/4.2.13.Final/linux-aarch_64/netty-transport-native-epoll-4.2.13.Final-linux-aarch_64.jar
netty-transport-native-epoll/4.2.13.Final/linux-riscv64/netty-transport-native-epoll-4.2.13.Final-linux-riscv64.jar
netty-transport-native-epoll/4.2.13.Final/linux-x86_64/netty-transport-native-epoll-4.2.13.Final-linux-x86_64.jar
netty-transport-native-io_uring/4.2.13.Final/linux-aarch_64/netty-transport-native-io_uring-4.2.13.Final-linux-aarch_64.jar
netty-transport-native-io_uring/4.2.13.Final/linux-riscv64/netty-transport-native-io_uring-4.2.13.Final-linux-riscv64.jar
netty-transport-native-io_uring/4.2.13.Final/linux-x86_64/netty-transport-native-io_uring-4.2.13.Final-linux-x86_64.jar
netty-transport-native-kqueue/4.2.13.Final/osx-aarch_64/netty-transport-native-kqueue-4.2.13.Final-osx-aarch_64.jar
netty-transport-native-kqueue/4.2.13.Final/osx-x86_64/netty-transport-native-kqueue-4.2.13.Final-osx-x86_64.jar
netty-transport-native-unix-common/4.2.13.Final//netty-transport-native-unix-common-4.2.13.Final.jar
Expand Down
20 changes: 12 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1004,14 +1004,6 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport-udt</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-io_uring</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-io_uring</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler-ssl-ocsp</artifactId>
Expand All @@ -1037,6 +1029,18 @@
<version>${netty.version}</version>
<classifier>linux-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-io_uring</artifactId>
<version>${netty.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-io_uring</artifactId>
<version>${netty.version}</version>
<classifier>linux-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
Expand Down