diff --git a/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java b/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java index 5d052c2cde..d4d76f57f4 100644 --- a/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java +++ b/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java @@ -16,7 +16,7 @@ */ package org.apache.arrow.memory; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.arrow.util.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; @@ -37,16 +37,24 @@ class Accountant implements AutoCloseable { */ protected final long reservation; - private final AtomicLong peakAllocation = new AtomicLong(); + // AtomicLongFieldUpdaters for memory accounting fields to reduce memory overhead + private static final AtomicLongFieldUpdater PEAK_ALLOCATION_UPDATER = + AtomicLongFieldUpdater.newUpdater(Accountant.class, "peakAllocation"); + private static final AtomicLongFieldUpdater ALLOCATION_LIMIT_UPDATER = + AtomicLongFieldUpdater.newUpdater(Accountant.class, "allocationLimit"); + private static final AtomicLongFieldUpdater LOCALLY_HELD_MEMORY_UPDATER = + AtomicLongFieldUpdater.newUpdater(Accountant.class, "locallyHeldMemory"); + + private volatile long peakAllocation = 0; /** * Maximum local memory that can be held. This can be externally updated. Changing it won't cause * past memory to change but will change responses to future allocation efforts */ - private final AtomicLong allocationLimit = new AtomicLong(); + private volatile long allocationLimit = 0; /** Currently allocated amount of memory. */ - private final AtomicLong locallyHeldMemory = new AtomicLong(); + private volatile long locallyHeldMemory = 0; public Accountant( @Nullable Accountant parent, String name, long reservation, long maxAllocation) { @@ -64,7 +72,7 @@ public Accountant( this.parent = parent; this.name = name; this.reservation = reservation; - this.allocationLimit.set(maxAllocation); + ALLOCATION_LIMIT_UPDATER.set(this, maxAllocation); if (reservation != 0) { Preconditions.checkArgument(parent != null, "parent must not be null"); @@ -117,12 +125,12 @@ private AllocationOutcome.Status allocateBytesInternal(long size) { } private void updatePeak() { - final long currentMemory = locallyHeldMemory.get(); + final long currentMemory = locallyHeldMemory; while (true) { - final long previousPeak = peakAllocation.get(); + final long previousPeak = peakAllocation; if (currentMemory > previousPeak) { - if (!peakAllocation.compareAndSet(previousPeak, currentMemory)) { + if (!PEAK_ALLOCATION_UPDATER.compareAndSet(this, previousPeak, currentMemory)) { // peak allocation changed underneath us. try again. continue; } @@ -166,7 +174,7 @@ private AllocationOutcome.Status allocate( final boolean incomingUpdatePeak, final boolean forceAllocation, @Nullable AllocationOutcomeDetails details) { - final long oldLocal = locallyHeldMemory.getAndAdd(size); + final long oldLocal = LOCALLY_HELD_MEMORY_UPDATER.getAndAdd(this, size); final long newLocal = oldLocal + size; // Borrowed from Math.addExact (but avoid exception here) // Overflow if result has opposite sign of both arguments @@ -174,7 +182,7 @@ private AllocationOutcome.Status allocate( // failure final boolean overflow = ((oldLocal ^ newLocal) & (size ^ newLocal)) < 0; final long beyondReservation = newLocal - reservation; - final boolean beyondLimit = overflow || newLocal > allocationLimit.get(); + final boolean beyondLimit = overflow || newLocal > allocationLimit; final boolean updatePeak = forceAllocation || (incomingUpdatePeak && !beyondLimit); if (details != null) { @@ -214,7 +222,7 @@ private AllocationOutcome.Status allocate( public void releaseBytes(long size) { // reduce local memory. all memory released above reservation should be released up the tree. - final long newSize = locallyHeldMemory.addAndGet(-size); + final long newSize = LOCALLY_HELD_MEMORY_UPDATER.addAndGet(this, -size); Preconditions.checkArgument(newSize >= 0, "Accounted size went negative."); @@ -255,7 +263,7 @@ public String getName() { * @return Limit in bytes. */ public long getLimit() { - return allocationLimit.get(); + return allocationLimit; } /** @@ -274,7 +282,7 @@ public long getInitReservation() { * @param newLimit The limit in bytes. */ public void setLimit(long newLimit) { - allocationLimit.set(newLimit); + ALLOCATION_LIMIT_UPDATER.set(this, newLimit); } /** @@ -284,7 +292,7 @@ public void setLimit(long newLimit) { * @return Currently allocate memory in bytes. */ public long getAllocatedMemory() { - return locallyHeldMemory.get(); + return locallyHeldMemory; } /** @@ -293,17 +301,17 @@ public long getAllocatedMemory() { * @return The peak allocated memory in bytes. */ public long getPeakMemoryAllocation() { - return peakAllocation.get(); + return peakAllocation; } public long getHeadroom() { - long localHeadroom = allocationLimit.get() - locallyHeldMemory.get(); + long localHeadroom = allocationLimit - locallyHeldMemory; if (parent == null) { return localHeadroom; } // Amount of reserved memory left on top of what parent has - long reservedHeadroom = Math.max(0, reservation - locallyHeldMemory.get()); + long reservedHeadroom = Math.max(0, reservation - locallyHeldMemory); return Math.min(localHeadroom, parent.getHeadroom() + reservedHeadroom); } } diff --git a/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java b/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java index b8012fe643..9712be34d7 100644 --- a/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java +++ b/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.ReadOnlyBufferException; -import java.util.concurrent.atomic.AtomicLong; import org.apache.arrow.memory.BaseAllocator.Verbosity; import org.apache.arrow.memory.util.CommonUtil; import org.apache.arrow.memory.util.HistoricalLog; @@ -57,9 +56,8 @@ public final class ArrowBuf implements AutoCloseable { private static final int DOUBLE_SIZE = Double.BYTES; private static final int LONG_SIZE = Long.BYTES; - private static final AtomicLong idGenerator = new AtomicLong(0); private static final int LOG_BYTES_PER_ROW = 10; - private final long id = idGenerator.incrementAndGet(); + private final ReferenceManager referenceManager; private final @Nullable BufferManager bufferManager; private final long addr; @@ -67,7 +65,8 @@ public final class ArrowBuf implements AutoCloseable { private long writerIndex; private final @Nullable HistoricalLog historicalLog = BaseAllocator.DEBUG - ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "ArrowBuf[%d]", id) + ? new HistoricalLog( + BaseAllocator.DEBUG_LOG_LENGTH, "ArrowBuf[%d]", System.identityHashCode(this)) : null; private volatile long capacity; @@ -218,7 +217,8 @@ public long memoryAddress() { @Override public String toString() { - return String.format("ArrowBuf[%d], address:%d, capacity:%d", id, memoryAddress(), capacity); + return String.format( + "ArrowBuf[%d], address:%d, capacity:%d", getId(), memoryAddress(), capacity); } @Override @@ -1080,12 +1080,15 @@ public String toHexString(final long start, final int length) { } /** - * Get the integer id assigned to this ArrowBuf for debugging purposes. + * Get the id assigned to this ArrowBuf for debugging purposes. + * + *

Returns {@link System#identityHashCode(Object)} which provides a unique identifier for this + * buffer without any per-instance memory overhead. * - * @return integer id + * @return the identity hash code for this buffer */ public long getId() { - return id; + return System.identityHashCode(this); } /** diff --git a/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java b/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java index b562a421e7..eb90efcbb5 100644 --- a/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -17,8 +17,7 @@ package org.apache.arrow.memory; import java.util.IdentityHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.arrow.memory.util.CommonUtil; import org.apache.arrow.memory.util.HistoricalLog; import org.apache.arrow.util.Preconditions; @@ -32,12 +31,13 @@ public class BufferLedger implements ValueWithKeyIncluded, ReferenceManager { private final @Nullable IdentityHashMap buffers = BaseAllocator.DEBUG ? new IdentityHashMap<>() : null; - private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0); - // unique ID assigned to each ledger - private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet(); - private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can - // manage request for retain - // correctly + + // AtomicIntegerFieldUpdater for bufRefCnt to reduce memory overhead + private static final AtomicIntegerFieldUpdater BUF_REF_CNT_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(BufferLedger.class, "bufRefCnt"); + // start at zero so we can manage request for retain correctly + private volatile int bufRefCnt = 0; + private final long lCreationTime = System.nanoTime(); private final BufferAllocator allocator; private final AllocationManager allocationManager; @@ -78,7 +78,7 @@ public BufferAllocator getAllocator() { */ @Override public int getRefCount() { - return bufRefCnt.get(); + return bufRefCnt; } /** @@ -86,7 +86,7 @@ public int getRefCount() { * ArrowBufs managed by this ledger will share the ref count. */ void increment() { - bufRefCnt.incrementAndGet(); + BUF_REF_CNT_UPDATER.incrementAndGet(this); } /** @@ -144,7 +144,7 @@ private int decrement(int decrement) { allocator.assertOpen(); final int outcome; synchronized (allocationManager) { - outcome = bufRefCnt.addAndGet(-decrement); + outcome = BUF_REF_CNT_UPDATER.addAndGet(this, -decrement); if (outcome == 0) { lDestructionTime = System.nanoTime(); // refcount of this reference manager has dropped to 0 @@ -174,7 +174,7 @@ public void retain(int increment) { if (historicalLog != null) { historicalLog.recordEvent("retain(%d)", increment); } - final int originalReferenceCount = bufRefCnt.getAndAdd(increment); + final int originalReferenceCount = BUF_REF_CNT_UPDATER.getAndAdd(this, increment); Preconditions.checkArgument(originalReferenceCount > 0); } @@ -472,13 +472,13 @@ public long getAccountedSize() { void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) { CommonUtil.indent(sb, indent) .append("ledger[") - .append(ledgerId) + .append(System.identityHashCode(this)) .append("] allocator: ") .append(allocator.getName()) .append("), isOwning: ") .append(", size: ") .append(", references: ") - .append(bufRefCnt.get()) + .append(bufRefCnt) .append(", life: ") .append(lCreationTime) .append("..") diff --git a/performance/src/main/java/org/apache/arrow/memory/MemoryFootprintBenchmarks.java b/performance/src/main/java/org/apache/arrow/memory/MemoryFootprintBenchmarks.java new file mode 100644 index 0000000000..395ba13b9d --- /dev/null +++ b/performance/src/main/java/org/apache/arrow/memory/MemoryFootprintBenchmarks.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.memory; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * Benchmarks for memory footprint of Arrow memory objects. + * + *

This benchmark measures the heap memory overhead of creating many ArrowBuf instances. The + * optimizations using AtomicFieldUpdater instead of AtomicLong/AtomicInteger objects should reduce + * memory overhead significantly. + * + *

Expected savings per instance: - ArrowBuf: 8 bytes (id field removed) - BufferLedger: 28 bytes + * (20 from AtomicInteger + 8 from ledgerId) - Accountant: 48 bytes (3 × 16 bytes from AtomicLong + * objects) + * + *

For 1M ArrowBuf instances, this should save approximately 8 MB of heap memory. + */ +@State(Scope.Benchmark) +@Fork( + value = 1, + jvmArgs = {"-Xms2g", "-Xmx2g"}) +@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +public class MemoryFootprintBenchmarks { + + /** Number of ArrowBuf instances to create for memory footprint measurement. */ + private static final int NUM_BUFFERS = 100_000; + + /** Size in bytes of each buffer allocation. */ + private static final int BUFFER_SIZE = 1024; + + /** Root allocator used for all buffer allocations in the benchmark. */ + private RootAllocator allocator; + + /** Array to hold references to allocated buffers, preventing garbage collection. */ + private ArrowBuf[] buffers; + + /** JMX bean for querying heap memory usage statistics. */ + private MemoryMXBean memoryBean; + + /** + * Sets up the benchmark state before each trial. + * + *

Initializes the memory monitoring bean, creates a root allocator with sufficient capacity, + * and allocates the buffer reference array. + */ + @Setup(Level.Trial) + public void setup() { + memoryBean = ManagementFactory.getMemoryMXBean(); + allocator = new RootAllocator((long) NUM_BUFFERS * BUFFER_SIZE); + buffers = new ArrowBuf[NUM_BUFFERS]; + } + + /** + * Cleans up buffers after each benchmark invocation. + * + *

Closes all allocated buffers to prevent memory leaks and ensure each iteration starts with a + * clean slate. This is critical for the memory footprint benchmark which allocates many buffers + * that would otherwise accumulate across warmup and measurement iterations. + */ + @TearDown(Level.Invocation) + public void tearDown() { + for (int i = 0; i < NUM_BUFFERS; i++) { + if (buffers[i] != null) { + buffers[i].close(); + buffers[i] = null; + } + } + } + + /** + * Cleans up the allocator after the trial completes. + * + *

Closes the root allocator to release all resources after all warmup and measurement + * iterations are complete. + */ + @TearDown(Level.Trial) + public void tearDownTrial() { + allocator.close(); + } + + /** + * Benchmark that measures heap memory usage when creating many ArrowBuf instances. + * + *

This benchmark creates {@value #NUM_BUFFERS} ArrowBuf instances and measures the heap memory + * used. With the AtomicFieldUpdater optimizations, we expect to save approximately 800 KB of heap + * memory (8 bytes × 100,000 instances) just from removing the id field in ArrowBuf. + * + *

The benchmark performs garbage collection before and after allocation to ensure accurate + * measurement of heap memory delta. Results are printed to stdout for analysis. + * + * @return the total heap memory used by the allocated buffers in bytes + */ + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public long measureArrowBufMemoryFootprint() { + // Force GC before measurement + System.gc(); + System.gc(); + System.gc(); + + MemoryUsage heapBefore = memoryBean.getHeapMemoryUsage(); + long usedBefore = heapBefore.getUsed(); + + // Allocate buffers + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers[i] = allocator.buffer(BUFFER_SIZE); + } + + // Force GC to get accurate measurement + System.gc(); + System.gc(); + System.gc(); + + MemoryUsage heapAfter = memoryBean.getHeapMemoryUsage(); + long usedAfter = heapAfter.getUsed(); + + long memoryUsed = usedAfter - usedBefore; + + // Print memory usage for analysis + System.out.printf( + "Created %d ArrowBuf instances. Heap memory used: %d bytes (%.2f MB)%n", + NUM_BUFFERS, memoryUsed, memoryUsed / (1024.0 * 1024.0)); + System.out.printf( + "Average memory per ArrowBuf: %.2f bytes%n", (double) memoryUsed / NUM_BUFFERS); + + return memoryUsed; + } + + /** + * Benchmark that measures allocation and deallocation performance. + * + *

This complements the memory footprint benchmark by measuring the time it takes to allocate + * and deallocate 1,000 buffers in a tight loop. This helps identify any performance regressions + * introduced by memory optimizations. + * + *

Uses a local buffer array to avoid interference with the shared {@link #buffers} array used + * by other benchmarks. + */ + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void measureAllocationPerformance() { + ArrowBuf[] localBuffers = new ArrowBuf[1000]; + + for (int i = 0; i < 1000; i++) { + localBuffers[i] = allocator.buffer(BUFFER_SIZE); + } + + for (int i = 0; i < 1000; i++) { + localBuffers[i].close(); + } + } + + /** + * Main entry point for running the benchmarks standalone. + * + *

This allows running the benchmarks directly from the command line or IDE without using the + * Maven JMH plugin. Example usage: + * + *

{@code
+   * java -cp target/benchmarks.jar org.apache.arrow.memory.MemoryFootprintBenchmarks
+   * }
+ * + * @param args command line arguments (not used) + * @throws RunnerException if the benchmark runner encounters an error + */ + public static void main(String[] args) throws RunnerException { + Options opt = + new OptionsBuilder() + .include(MemoryFootprintBenchmarks.class.getSimpleName()) + .forks(1) + .build(); + + new Runner(opt).run(); + } +}