{@link IoUring#isAvailable()} only checks that the JNI library loaded and basic syscalls + * work; it does not detect environments where the kernel allows io_uring but + * {@code RLIMIT_MEMLOCK} is too low for the submission/completion queue rings (common in + * containers, GitHub Actions runners, and other restricted environments). The probe creates + * a {@link MultiThreadIoEventLoopGroup} sized to {@link #MAX_DEFAULT_NETTY_THREADS} -- which + * is the worst case Spark allocates by default for a single event loop group -- and shuts it + * down to verify ring allocation actually succeeds. Probing with one thread is insufficient + * because some restricted environments allow a single ring to fit in memlock but not the eight + * rings Spark needs in practice. + */ + private static volatile Boolean ioUringUsable = null; + public static long freeDirectMemory() { return PlatformDependent.maxDirectMemory() - PlatformDependent.usedDirectMemory(); } + /** + * Returns true if io_uring can actually be used on the running JVM. Probes once (with the + * result cached) by attempting a real ring allocation, which catches environments where + * {@link IoUring#isAvailable()} returns true but {@code RLIMIT_MEMLOCK} is too low to allocate + * the submission/completion queues (common in containers, GitHub Actions runners, and other + * restricted environments). + * + *
Used by AUTO mode and by tests that gate execution on io_uring being usable. An explicit
+ * {@link IOMode#IO_URING} mode does not consult this and surfaces the underlying error.
+ */
+ public static boolean isIoUringUsable() {
+ Boolean cached = ioUringUsable;
+ if (cached != null) {
+ return cached;
+ }
+ synchronized (NettyUtils.class) {
+ if (ioUringUsable != null) {
+ return ioUringUsable;
+ }
+ if (!JavaUtils.isLinux || !IoUring.isAvailable()) {
+ ioUringUsable = false;
+ return false;
+ }
+ MultiThreadIoEventLoopGroup probe = null;
+ try {
+ probe = new MultiThreadIoEventLoopGroup(
+ MAX_DEFAULT_NETTY_THREADS, IoUringIoHandler.newFactory());
+ ioUringUsable = true;
+ } catch (Throwable t) {
+ logger.warn("io_uring is reported as available but allocation of " +
+ MAX_DEFAULT_NETTY_THREADS + " rings failed; " +
+ "AUTO will fall back to EPOLL on this JVM. " +
+ "Common cause: RLIMIT_MEMLOCK too low (containers, restricted environments). " +
+ "To force io_uring, set spark.shuffle.io.mode=IO_URING explicitly.", t);
+ ioUringUsable = false;
+ } finally {
+ if (probe != null) {
+ probe.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
+ }
+ }
+ return ioUringUsable;
+ }
+ }
+
/** Creates a new ThreadFactory which prefixes each thread with the given name. */
public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
return new DefaultThreadFactory(threadPoolPrefix, true);
@@ -73,8 +143,11 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String
case NIO -> NioIoHandler.newFactory();
case EPOLL -> EpollIoHandler.newFactory();
case KQUEUE -> KQueueIoHandler.newFactory();
+ case IO_URING -> IoUringIoHandler.newFactory();
case AUTO -> {
- if (JavaUtils.isLinux && Epoll.isAvailable()) {
+ if (isIoUringUsable()) {
+ yield IoUringIoHandler.newFactory();
+ } else if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollIoHandler.newFactory();
} else if (JavaUtils.isMac && KQueue.isAvailable()) {
yield KQueueIoHandler.newFactory();
@@ -92,8 +165,11 @@ public static Class extends Channel> getClientChannelClass(IOMode mode) {
case NIO -> NioSocketChannel.class;
case EPOLL -> EpollSocketChannel.class;
case KQUEUE -> KQueueSocketChannel.class;
+ case IO_URING -> IoUringSocketChannel.class;
case AUTO -> {
- if (JavaUtils.isLinux && Epoll.isAvailable()) {
+ if (isIoUringUsable()) {
+ yield IoUringSocketChannel.class;
+ } else if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollSocketChannel.class;
} else if (JavaUtils.isMac && KQueue.isAvailable()) {
yield KQueueSocketChannel.class;
@@ -110,8 +186,11 @@ public static Class extends ServerChannel> getServerChannelClass(IOMode mode)
case NIO -> NioServerSocketChannel.class;
case EPOLL -> EpollServerSocketChannel.class;
case KQUEUE -> KQueueServerSocketChannel.class;
+ case IO_URING -> IoUringServerSocketChannel.class;
case AUTO -> {
- if (JavaUtils.isLinux && Epoll.isAvailable()) {
+ if (isIoUringUsable()) {
+ yield IoUringServerSocketChannel.class;
+ } else if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollServerSocketChannel.class;
} else if (JavaUtils.isMac && KQueue.isAvailable()) {
yield KQueueServerSocketChannel.class;
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index cf00d806ba835..606b49758b750 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -184,6 +184,12 @@
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_kqueue_aarch_64.jnilib" />