Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.arrow.memory;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.arrow.util.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -37,16 +37,24 @@ class Accountant implements AutoCloseable {
*/
protected final long reservation;

private final AtomicLong peakAllocation = new AtomicLong();
// AtomicLongFieldUpdaters for memory accounting fields to reduce memory overhead
private static final AtomicLongFieldUpdater<Accountant> PEAK_ALLOCATION_UPDATER =
AtomicLongFieldUpdater.newUpdater(Accountant.class, "peakAllocation");
private static final AtomicLongFieldUpdater<Accountant> ALLOCATION_LIMIT_UPDATER =
AtomicLongFieldUpdater.newUpdater(Accountant.class, "allocationLimit");
private static final AtomicLongFieldUpdater<Accountant> LOCALLY_HELD_MEMORY_UPDATER =
AtomicLongFieldUpdater.newUpdater(Accountant.class, "locallyHeldMemory");

private volatile long peakAllocation = 0;

/**
* Maximum local memory that can be held. This can be externally updated. Changing it won't cause
* past memory to change but will change responses to future allocation efforts
*/
private final AtomicLong allocationLimit = new AtomicLong();
private volatile long allocationLimit = 0;

/** Currently allocated amount of memory. */
private final AtomicLong locallyHeldMemory = new AtomicLong();
private volatile long locallyHeldMemory = 0;

public Accountant(
@Nullable Accountant parent, String name, long reservation, long maxAllocation) {
Expand All @@ -64,7 +72,7 @@ public Accountant(
this.parent = parent;
this.name = name;
this.reservation = reservation;
this.allocationLimit.set(maxAllocation);
ALLOCATION_LIMIT_UPDATER.set(this, maxAllocation);

if (reservation != 0) {
Preconditions.checkArgument(parent != null, "parent must not be null");
Expand Down Expand Up @@ -117,12 +125,12 @@ private AllocationOutcome.Status allocateBytesInternal(long size) {
}

private void updatePeak() {
final long currentMemory = locallyHeldMemory.get();
final long currentMemory = locallyHeldMemory;
while (true) {

final long previousPeak = peakAllocation.get();
final long previousPeak = peakAllocation;
if (currentMemory > previousPeak) {
if (!peakAllocation.compareAndSet(previousPeak, currentMemory)) {
if (!PEAK_ALLOCATION_UPDATER.compareAndSet(this, previousPeak, currentMemory)) {
// peak allocation changed underneath us. try again.
continue;
}
Expand Down Expand Up @@ -166,15 +174,15 @@ private AllocationOutcome.Status allocate(
final boolean incomingUpdatePeak,
final boolean forceAllocation,
@Nullable AllocationOutcomeDetails details) {
final long oldLocal = locallyHeldMemory.getAndAdd(size);
final long oldLocal = LOCALLY_HELD_MEMORY_UPDATER.getAndAdd(this, size);
final long newLocal = oldLocal + size;
// Borrowed from Math.addExact (but avoid exception here)
// Overflow if result has opposite sign of both arguments
// No need to reset locallyHeldMemory on overflow; allocateBytesInternal will releaseBytes on
// failure
final boolean overflow = ((oldLocal ^ newLocal) & (size ^ newLocal)) < 0;
final long beyondReservation = newLocal - reservation;
final boolean beyondLimit = overflow || newLocal > allocationLimit.get();
final boolean beyondLimit = overflow || newLocal > allocationLimit;
final boolean updatePeak = forceAllocation || (incomingUpdatePeak && !beyondLimit);

if (details != null) {
Expand Down Expand Up @@ -214,7 +222,7 @@ private AllocationOutcome.Status allocate(

public void releaseBytes(long size) {
// reduce local memory. all memory released above reservation should be released up the tree.
final long newSize = locallyHeldMemory.addAndGet(-size);
final long newSize = LOCALLY_HELD_MEMORY_UPDATER.addAndGet(this, -size);

Preconditions.checkArgument(newSize >= 0, "Accounted size went negative.");

Expand Down Expand Up @@ -255,7 +263,7 @@ public String getName() {
* @return Limit in bytes.
*/
public long getLimit() {
return allocationLimit.get();
return allocationLimit;
}

/**
Expand All @@ -274,7 +282,7 @@ public long getInitReservation() {
* @param newLimit The limit in bytes.
*/
public void setLimit(long newLimit) {
allocationLimit.set(newLimit);
ALLOCATION_LIMIT_UPDATER.set(this, newLimit);
}

/**
Expand All @@ -284,7 +292,7 @@ public void setLimit(long newLimit) {
* @return Currently allocate memory in bytes.
*/
public long getAllocatedMemory() {
return locallyHeldMemory.get();
return locallyHeldMemory;
}

/**
Expand All @@ -293,17 +301,17 @@ public long getAllocatedMemory() {
* @return The peak allocated memory in bytes.
*/
public long getPeakMemoryAllocation() {
return peakAllocation.get();
return peakAllocation;
}

public long getHeadroom() {
long localHeadroom = allocationLimit.get() - locallyHeldMemory.get();
long localHeadroom = allocationLimit - locallyHeldMemory;
if (parent == null) {
return localHeadroom;
}

// Amount of reserved memory left on top of what parent has
long reservedHeadroom = Math.max(0, reservation - locallyHeldMemory.get());
long reservedHeadroom = Math.max(0, reservation - locallyHeldMemory);
return Math.min(localHeadroom, parent.getHeadroom() + reservedHeadroom);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.ReadOnlyBufferException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.arrow.memory.BaseAllocator.Verbosity;
import org.apache.arrow.memory.util.CommonUtil;
import org.apache.arrow.memory.util.HistoricalLog;
Expand Down Expand Up @@ -57,17 +56,17 @@ public final class ArrowBuf implements AutoCloseable {
private static final int DOUBLE_SIZE = Double.BYTES;
private static final int LONG_SIZE = Long.BYTES;

private static final AtomicLong idGenerator = new AtomicLong(0);
private static final int LOG_BYTES_PER_ROW = 10;
private final long id = idGenerator.incrementAndGet();

private final ReferenceManager referenceManager;
private final @Nullable BufferManager bufferManager;
private final long addr;
private long readerIndex;
private long writerIndex;
private final @Nullable HistoricalLog historicalLog =
BaseAllocator.DEBUG
? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "ArrowBuf[%d]", id)
? new HistoricalLog(
BaseAllocator.DEBUG_LOG_LENGTH, "ArrowBuf[%d]", System.identityHashCode(this))
: null;
private volatile long capacity;

Expand Down Expand Up @@ -218,7 +217,8 @@ public long memoryAddress() {

@Override
public String toString() {
return String.format("ArrowBuf[%d], address:%d, capacity:%d", id, memoryAddress(), capacity);
return String.format(
"ArrowBuf[%d], address:%d, capacity:%d", getId(), memoryAddress(), capacity);
}

@Override
Expand Down Expand Up @@ -1080,12 +1080,15 @@ public String toHexString(final long start, final int length) {
}

/**
* Get the integer id assigned to this ArrowBuf for debugging purposes.
* Get the id assigned to this ArrowBuf for debugging purposes.
*
* <p>Returns {@link System#identityHashCode(Object)} which provides a unique identifier for this
* buffer without any per-instance memory overhead.
*
* @return integer id
* @return the identity hash code for this buffer
*/
public long getId() {
return id;
return System.identityHashCode(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package org.apache.arrow.memory;

import java.util.IdentityHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.arrow.memory.util.CommonUtil;
import org.apache.arrow.memory.util.HistoricalLog;
import org.apache.arrow.util.Preconditions;
Expand All @@ -32,12 +31,13 @@
public class BufferLedger implements ValueWithKeyIncluded<BufferAllocator>, ReferenceManager {
private final @Nullable IdentityHashMap<ArrowBuf, @Nullable Object> buffers =
BaseAllocator.DEBUG ? new IdentityHashMap<>() : null;
private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
// unique ID assigned to each ledger
private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet();
private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can
// manage request for retain
// correctly

// AtomicIntegerFieldUpdater for bufRefCnt to reduce memory overhead
private static final AtomicIntegerFieldUpdater<BufferLedger> BUF_REF_CNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(BufferLedger.class, "bufRefCnt");
// start at zero so we can manage request for retain correctly
private volatile int bufRefCnt = 0;

private final long lCreationTime = System.nanoTime();
private final BufferAllocator allocator;
private final AllocationManager allocationManager;
Expand Down Expand Up @@ -78,15 +78,15 @@ public BufferAllocator getAllocator() {
*/
@Override
public int getRefCount() {
return bufRefCnt.get();
return bufRefCnt;
}

/**
* Increment the ledger's reference count for the associated underlying memory chunk. All
* ArrowBufs managed by this ledger will share the ref count.
*/
void increment() {
bufRefCnt.incrementAndGet();
BUF_REF_CNT_UPDATER.incrementAndGet(this);
}

/**
Expand Down Expand Up @@ -144,7 +144,7 @@ private int decrement(int decrement) {
allocator.assertOpen();
final int outcome;
synchronized (allocationManager) {
outcome = bufRefCnt.addAndGet(-decrement);
outcome = BUF_REF_CNT_UPDATER.addAndGet(this, -decrement);
if (outcome == 0) {
lDestructionTime = System.nanoTime();
// refcount of this reference manager has dropped to 0
Expand Down Expand Up @@ -174,7 +174,7 @@ public void retain(int increment) {
if (historicalLog != null) {
historicalLog.recordEvent("retain(%d)", increment);
}
final int originalReferenceCount = bufRefCnt.getAndAdd(increment);
final int originalReferenceCount = BUF_REF_CNT_UPDATER.getAndAdd(this, increment);
Preconditions.checkArgument(originalReferenceCount > 0);
}

Expand Down Expand Up @@ -472,13 +472,13 @@ public long getAccountedSize() {
void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) {
CommonUtil.indent(sb, indent)
.append("ledger[")
.append(ledgerId)
.append(System.identityHashCode(this))
.append("] allocator: ")
.append(allocator.getName())
.append("), isOwning: ")
.append(", size: ")
.append(", references: ")
.append(bufRefCnt.get())
.append(bufRefCnt)
.append(", life: ")
.append(lCreationTime)
.append("..")
Expand Down
Loading
Loading