Skip to content

Commit

Permalink
apacheGH-43902: [Java] Support for Long memory addresses (apache#43903)
Browse files Browse the repository at this point in the history
### Rationale for this change

The usage of `Integer` instead of `Long` must be encouraged with the usage of memory sizing, indexing and addresses. 

### What changes are included in this PR?

This PR refactors the usage of `Integer` into `Long` along with utilities refactors. 

### Are these changes tested?

Existing test cases. 

### Are there any user-facing changes?

Yes, certain API calls may subject changes. 
* GitHub Issue: apache#43902

Authored-by: Vibhatha Lakmal Abeykoon <vibhatha@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
vibhatha authored Sep 4, 2024
1 parent c455d6b commit b2e0668
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,44 @@ public interface AllocationReservation extends AutoCloseable {
* @param nBytes the number of bytes to add
* @return true if the addition is possible, false otherwise
* @throws IllegalStateException if called after buffer() is used to allocate the reservation
* @deprecated use {@link #add(long)} instead
*/
@Deprecated(forRemoval = true)
boolean add(int nBytes);

/**
* Add to the current reservation.
*
* <p>Adding may fail if the allocator is not allowed to consume any more space.
*
* @param nBytes the number of bytes to add
* @return true if the addition is possible, false otherwise
* @throws IllegalStateException if called after buffer() is used to allocate the reservation
*/
boolean add(long nBytes);

/**
* Requests a reservation of additional space.
*
* <p>The implementation of the allocator's inner class provides this.
*
* @param nBytes the amount to reserve
* @return true if the reservation can be satisfied, false otherwise
* @deprecated use {@link #reserve(long)} instead
*/
@Deprecated(forRemoval = true)
boolean reserve(int nBytes);

/**
* Requests a reservation of additional space.
*
* <p>The implementation of the allocator's inner class provides this.
*
* @param nBytes the amount to reserve
* @return true if the reservation can be satisfied, false otherwise
*/
boolean reserve(long nBytes);

/**
* Allocate a buffer whose size is the total of all the add()s made.
*
Expand All @@ -65,6 +90,13 @@ public interface AllocationReservation extends AutoCloseable {
*/
int getSize();

/**
* Get the current size of the reservation (the sum of all the add()s) as a long value.
*
* @return size of the current reservation
*/
long getSizeLong();

/**
* Return whether or not the reservation has been used.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.arrow.memory.util.AssertionUtil;
import org.apache.arrow.memory.util.CommonUtil;
import org.apache.arrow.memory.util.HistoricalLog;
import org.apache.arrow.memory.util.LargeMemoryUtil;
import org.apache.arrow.util.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyFor;
Expand Down Expand Up @@ -860,7 +861,7 @@ RoundingPolicy getRoundingPolicy() {
public class Reservation implements AllocationReservation {

private final @Nullable HistoricalLog historicalLog;
private int nBytes = 0;
private long nBytes = 0;
private boolean used = false;
private boolean closed = false;

Expand Down Expand Up @@ -888,8 +889,15 @@ public Reservation() {
}
}

@SuppressWarnings({"removal", "InlineMeSuggester"})
@Deprecated(forRemoval = true)
@Override
public boolean add(final int nBytes) {
return add((long) nBytes);
}

@Override
public boolean add(final long nBytes) {
assertOpen();

Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes);
Expand All @@ -906,7 +914,7 @@ public boolean add(final int nBytes) {
// modifying this behavior so that we maintain what we reserve and what the user asked for
// and make sure to only
// round to power of two as necessary.
final int nBytesTwo = CommonUtil.nextPowerOfTwo(nBytes);
final long nBytesTwo = CommonUtil.nextPowerOfTwo(nBytes);
if (!reserve(nBytesTwo)) {
return false;
}
Expand All @@ -929,6 +937,11 @@ public ArrowBuf allocateBuffer() {

@Override
public int getSize() {
return LargeMemoryUtil.checkedCastToInt(nBytes);
}

@Override
public long getSizeLong() {
return nBytes;
}

Expand Down Expand Up @@ -978,8 +991,15 @@ public void close() {
closed = true;
}

@SuppressWarnings({"removal", "InlineMeSuggester"})
@Deprecated(forRemoval = true)
@Override
public boolean reserve(int nBytes) {
return reserve((long) nBytes);
}

@Override
public boolean reserve(long nBytes) {
assertOpen();

final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes);
Expand All @@ -999,7 +1019,7 @@ public boolean reserve(int nBytes) {
* @param nBytes the size of the buffer requested
* @return the buffer, or null, if the request cannot be satisfied
*/
private ArrowBuf allocate(int nBytes) {
private ArrowBuf allocate(long nBytes) {
assertOpen();

boolean success = false;
Expand Down Expand Up @@ -1033,7 +1053,7 @@ private ArrowBuf allocate(int nBytes) {
*
* @param nBytes the size of the reservation
*/
private void releaseReservation(int nBytes) {
private void releaseReservation(long nBytes) {
assertOpen();

releaseBytes(nBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package org.apache.arrow.memory;

/**
* Child allocator class. Only slightly different from the {@see RootAllocator}, in that these can't
* be created directly, but must be obtained from {@link BufferAllocator#newChildAllocator(String,
* AllocationListener, long, long)}.
* Child allocator class. Only slightly different from the {@link RootAllocator}, in that these
* can't be created directly, but must be obtained from {@link
* BufferAllocator#newChildAllocator(String, AllocationListener, long, long)}.
*
* <p>Child allocators can only be created by the root, or other children, so this class is package
* private.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ public class DefaultRoundingPolicy implements RoundingPolicy {
*
* <p>It was copied from {@link io.netty.buffer.PooledByteBufAllocator}.
*/
private static final int MIN_PAGE_SIZE = 4096;
private static final long MIN_PAGE_SIZE = 4096;

private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
private static final long MAX_CHUNK_SIZE = ((long) Integer.MAX_VALUE + 1) / 2;
private static final long DEFAULT_CHUNK_SIZE;

static {
int defaultPageSize = Integer.getInteger("org.apache.memory.allocator.pageSize", 8192);
long defaultPageSize = Long.getLong("org.apache.memory.allocator.pageSize", 8192);
try {
validateAndCalculatePageShifts(defaultPageSize);
} catch (Throwable t) {
Expand All @@ -60,7 +60,7 @@ public class DefaultRoundingPolicy implements RoundingPolicy {
}
}

private static int validateAndCalculatePageShifts(int pageSize) {
private static long validateAndCalculatePageShifts(long pageSize) {
if (pageSize < MIN_PAGE_SIZE) {
throw new IllegalArgumentException(
"pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + ")");
Expand All @@ -71,17 +71,17 @@ private static int validateAndCalculatePageShifts(int pageSize) {
}

// Logarithm base 2. At this point we know that pageSize is a power of two.
return Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize);
return Long.SIZE - 1L - Long.numberOfLeadingZeros(pageSize);
}

private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) {
private static long validateAndCalculateChunkSize(long pageSize, int maxOrder) {
if (maxOrder > 14) {
throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)");
}

// Ensure the resulting chunkSize does not overflow.
int chunkSize = pageSize;
for (int i = maxOrder; i > 0; i--) {
long chunkSize = pageSize;
for (long i = maxOrder; i > 0; i--) {
if (chunkSize > MAX_CHUNK_SIZE / 2) {
throw new IllegalArgumentException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.arrow.memory.rounding;

import com.google.errorprone.annotations.InlineMe;
import org.apache.arrow.memory.util.LargeMemoryUtil;
import org.apache.arrow.util.Preconditions;

/** The rounding policy that each buffer size must a multiple of the segment size. */
Expand All @@ -28,16 +30,30 @@ public class SegmentRoundingPolicy implements RoundingPolicy {
* The segment size. It must be at least {@link SegmentRoundingPolicy#MIN_SEGMENT_SIZE}, and be a
* power of 2.
*/
private int segmentSize;
private long segmentSize;

/**
* Constructor for the segment rounding policy.
*
* @param segmentSize the segment size.
* @throws IllegalArgumentException if the segment size is smaller than {@link
* SegmentRoundingPolicy#MIN_SEGMENT_SIZE}, or is not a power of 2.
* @deprecated use {@link SegmentRoundingPolicy#SegmentRoundingPolicy(long)} instead.
*/
@Deprecated(forRemoval = true)
@InlineMe(replacement = "this((long) segmentSize)")
public SegmentRoundingPolicy(int segmentSize) {
this((long) segmentSize);
}

/**
* Constructor for the segment rounding policy.
*
* @param segmentSize the segment size.
* @throws IllegalArgumentException if the segment size is smaller than {@link
* SegmentRoundingPolicy#MIN_SEGMENT_SIZE}, or is not a power of 2.
*/
public SegmentRoundingPolicy(long segmentSize) {
Preconditions.checkArgument(
segmentSize >= MIN_SEGMENT_SIZE,
"The segment size cannot be smaller than %s",
Expand All @@ -52,7 +68,12 @@ public long getRoundedSize(long requestSize) {
return (requestSize + (segmentSize - 1)) / segmentSize * segmentSize;
}

@Deprecated(forRemoval = true)
public int getSegmentSize() {
return LargeMemoryUtil.checkedCastToInt(segmentSize);
}

public long getSegmentSizeAsLong() {
return segmentSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public void testRootAllocator_createChildDontClose() throws Exception {

@Test
public void testSegmentAllocator() {
RoundingPolicy policy = new SegmentRoundingPolicy(1024);
RoundingPolicy policy = new SegmentRoundingPolicy(1024L);
try (RootAllocator allocator =
new RootAllocator(AllocationListener.NOOP, 1024 * 1024, policy)) {
ArrowBuf buf = allocator.buffer(798);
Expand All @@ -334,7 +334,7 @@ public void testSegmentAllocator() {

@Test
public void testSegmentAllocator_childAllocator() {
RoundingPolicy policy = new SegmentRoundingPolicy(1024);
RoundingPolicy policy = new SegmentRoundingPolicy(1024L);
try (RootAllocator allocator = new RootAllocator(AllocationListener.NOOP, 1024 * 1024, policy);
BufferAllocator childAllocator = allocator.newChildAllocator("child", 0, 512 * 1024)) {

Expand All @@ -357,14 +357,14 @@ public void testSegmentAllocator_childAllocator() {
@Test
public void testSegmentAllocator_smallSegment() {
IllegalArgumentException e =
assertThrows(IllegalArgumentException.class, () -> new SegmentRoundingPolicy(128));
assertThrows(IllegalArgumentException.class, () -> new SegmentRoundingPolicy(128L));
assertEquals("The segment size cannot be smaller than 1024", e.getMessage());
}

@Test
public void testSegmentAllocator_segmentSizeNotPowerOf2() {
IllegalArgumentException e =
assertThrows(IllegalArgumentException.class, () -> new SegmentRoundingPolicy(4097));
assertThrows(IllegalArgumentException.class, () -> new SegmentRoundingPolicy(4097L));
assertEquals("The segment size must be a power of 2", e.getMessage());
}

Expand Down Expand Up @@ -957,7 +957,7 @@ public void testAllocator_unclaimedReservation() throws Exception {
try (final BufferAllocator childAllocator1 =
rootAllocator.newChildAllocator("unclaimedReservation", 0, MAX_ALLOCATION)) {
try (final AllocationReservation reservation = childAllocator1.newReservation()) {
assertTrue(reservation.add(64));
assertTrue(reservation.add(64L));
}
rootAllocator.verify();
}
Expand All @@ -972,8 +972,8 @@ public void testAllocator_claimedReservation() throws Exception {
rootAllocator.newChildAllocator("claimedReservation", 0, MAX_ALLOCATION)) {

try (final AllocationReservation reservation = childAllocator1.newReservation()) {
assertTrue(reservation.add(32));
assertTrue(reservation.add(32));
assertTrue(reservation.add(32L));
assertTrue(reservation.add(32L));

final ArrowBuf arrowBuf = reservation.allocateBuffer();
assertEquals(64, arrowBuf.capacity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class NettyArrowBuf extends AbstractByteBuf implements AutoCloseable {

private final ArrowBuf arrowBuf;
private final ArrowByteBufAllocator arrowByteBufAllocator;
private int length;
private long length;
private final long address;

/**
Expand All @@ -47,10 +47,24 @@ public class NettyArrowBuf extends AbstractByteBuf implements AutoCloseable {
* @param arrowBuf The buffer to wrap.
* @param bufferAllocator The allocator for the buffer.
* @param length The length of this buffer.
* @deprecated Use {@link #NettyArrowBuf(ArrowBuf, BufferAllocator, long)} instead.
*/
@Deprecated(forRemoval = true)
public NettyArrowBuf(
final ArrowBuf arrowBuf, final BufferAllocator bufferAllocator, final int length) {
super(length);
this(arrowBuf, bufferAllocator, (long) length);
}

/**
* Constructs a new instance.
*
* @param arrowBuf The buffer to wrap.
* @param bufferAllocator The allocator for the buffer.
* @param length The length of this buffer.
*/
public NettyArrowBuf(
final ArrowBuf arrowBuf, final BufferAllocator bufferAllocator, final long length) {
super((int) length);
this.arrowBuf = arrowBuf;
this.arrowByteBufAllocator = new ArrowByteBufAllocator(bufferAllocator);
this.length = length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public void defaultAllocatorBenchmark() {
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void segmentRoundingPolicyBenchmark() {
final int bufferSize = 1024;
final long bufferSize = 1024L;
final int numBuffers = 1024;
final int segmentSize = 1024;
final long segmentSize = 1024L;

RoundingPolicy policy = new SegmentRoundingPolicy(segmentSize);
try (RootAllocator allocator =
Expand Down

0 comments on commit b2e0668

Please sign in to comment.