Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[JENKINS-36871] Improve single-byte get() performance
  • Loading branch information
stephenc committed Aug 5, 2016
1 parent 4b291b3 commit 6cd9145
Showing 1 changed file with 80 additions and 30 deletions.
110 changes: 80 additions & 30 deletions src/main/java/org/jenkinsci/remoting/util/ByteBufferQueue.java
Expand Up @@ -64,6 +64,10 @@ public class ByteBufferQueue {
* Our current read index.
*/
private int readIndex;
/**
* Our current read position.
*/
private int readPosition;
/**
* Our current write index.
*/
Expand All @@ -85,6 +89,7 @@ public ByteBufferQueue(int bufferSize) {
this.buffers = new ByteBuffer[INITIAL_CAPACITY];
this.bufferSize = bufferSize;
this.readIndex = 0;
this.readPosition = 0;
this.writeIndex = 0;
this.buffers[writeIndex] = ByteBuffer.allocate(bufferSize);
}
Expand All @@ -94,8 +99,9 @@ public ByteBufferQueue(int bufferSize) {
*/
private void compact() {
// skip over any empty buffers
while (readIndex < writeIndex && buffers[readIndex].position() == 0) {
while (readIndex < writeIndex && buffers[readIndex].position() == readPosition) {
readIndex++;
readPosition = 0;
}
if (readIndex > 0) {
// compact the array start
Expand Down Expand Up @@ -213,7 +219,11 @@ public void put(byte b) {
* remaining in this buffer queue.
*/
public boolean hasRemaining() {
for (int i = readIndex; i <= writeIndex; i++) {
// first buffer is special
if (buffers[readIndex].position() > readPosition) {
return true;
}
for (int i = readIndex + 1; i <= writeIndex; i++) {
if (buffers[i].position() > 0) {
return true;
}
Expand All @@ -230,13 +240,14 @@ public boolean hasRemaining() {
* remaining in this buffer queue.
*/
public boolean hasRemaining(int len) {
for (int i = readIndex; i <= writeIndex; i++) {
len -= buffers[readIndex].position() - readPosition;
for (int i = readIndex + 1; i <= writeIndex; i++) {
len -= buffers[i].position();
if (len <= 0) {
return true;
}
}
return false;
return len <= 0;
}

/**
Expand All @@ -245,8 +256,8 @@ public boolean hasRemaining(int len) {
* @return the total number of bytes remaining in this buffer queue.
*/
public long remaining() {
long total = 0;
for (int i = readIndex; i <= writeIndex; i++) {
long total = buffers[readIndex].position() - readPosition;
for (int i = readIndex + 1; i <= writeIndex; i++) {
total += buffers[i].position();
}
return total;
Expand All @@ -260,8 +271,11 @@ public long remaining() {
* the supplied limit remaining.
*/
public int remaining(int limit) {
int total = 0;
for (int i = readIndex; i <= writeIndex; i++) {
int total = buffers[readIndex].position() - readPosition;
if (total >= limit) {
return limit;
}
for (int i = readIndex + 1; i <= writeIndex; i++) {
total += buffers[i].position();
if (total >= limit) {
return limit;
Expand All @@ -278,30 +292,32 @@ public int remaining(int limit) {
public long skip(long bytes) {
long skipped = 0;
while (bytes > 0) {
if (readIndex >= writeIndex && buffers[readIndex].position() == 0) {
if (readIndex >= writeIndex && buffers[readIndex].position() == readPosition) {
if (writeIndex > 0) {
// this is a cheap compact
buffers[0] = buffers[writeIndex];
buffers[writeIndex] = null;
readIndex = writeIndex = 0;
buffers[0].clear();
readPosition = 0;
}
break;
}
buffers[readIndex].flip();
int remaining = buffers[readIndex].remaining();
int remaining = buffers[readIndex].position() - readPosition;
if (remaining > bytes) {
buffers[readIndex].position((int)bytes);
buffers[readIndex].compact();
readPosition += (int)bytes;
skipped += bytes;
break;
} else {
skipped += remaining;
bytes -= remaining;
if (readIndex < writeIndex) {
buffers[readIndex++] = null;
readPosition = 0;
} else {
assert readIndex == writeIndex;
buffers[readIndex].clear();
readPosition = 0;
}
}
}
Expand All @@ -310,14 +326,15 @@ public long skip(long bytes) {

public void peek(ByteBuffer dst) {
int readIndex = this.readIndex;
int readPosition = this.readPosition;
while (dst.hasRemaining()) {
if (readIndex >= writeIndex && buffers[readIndex].position() == 0) {
if (readIndex >= writeIndex && buffers[readIndex].position() == readPosition) {
break;
}
int p = buffers[readIndex].position();
int l = buffers[readIndex].limit();
try {
buffers[readIndex].position(0);
buffers[readIndex].position(readPosition);
buffers[readIndex].limit(p);
if (buffers[readIndex].remaining() > dst.remaining()) {
buffers[readIndex].limit(dst.remaining());
Expand All @@ -331,6 +348,7 @@ public void peek(ByteBuffer dst) {
buffers[readIndex].position(p);
}
readIndex++;
readPosition = 0;
}
}

Expand All @@ -343,7 +361,7 @@ public void peek(ByteBuffer dst) {
*/
public void get(ByteBuffer dst) {
while (dst.hasRemaining()) {
if (readIndex >= writeIndex && buffers[readIndex].position() == 0) {
if (readIndex >= writeIndex && buffers[readIndex].position() == readPosition) {
if (writeIndex > 0) {
// this is a cheap compact
buffers[0] = buffers[writeIndex];
Expand All @@ -353,15 +371,19 @@ public void get(ByteBuffer dst) {
break;
}
buffers[readIndex].flip();
if (buffers[readIndex].remaining() > dst.remaining()) {
if (buffers[readIndex].remaining() - readPosition > dst.remaining()) {
int limit = buffers[readIndex].limit();
buffers[readIndex].limit(buffers[readIndex].position() + dst.remaining());
buffers[readIndex].limit(readPosition + dst.remaining());
buffers[readIndex].position(readPosition);
dst.put(buffers[readIndex]);
buffers[readIndex].limit(limit);
buffers[readIndex].compact();
readPosition = buffers[readIndex].position();
buffers[readIndex].limit(buffers[readIndex].capacity());
buffers[readIndex].position(limit);
break;
} else {
buffers[readIndex].position(readPosition);
dst.put(buffers[readIndex]);
readPosition = 0;
if (readIndex < writeIndex) {
buffers[readIndex++] = null;
} else {
Expand All @@ -384,7 +406,7 @@ public void get(ByteBuffer dst) {
public int get(byte[] dst, int offset, int len) {
int read = 0;
while (len > 0) {
if (readIndex >= writeIndex && buffers[readIndex].position() == 0) {
if (readIndex >= writeIndex && buffers[readIndex].position() == readPosition) {
if (writeIndex > 0) {
// this is a cheap compact
buffers[0] = buffers[writeIndex];
Expand All @@ -394,17 +416,22 @@ public int get(byte[] dst, int offset, int len) {
break;
}
buffers[readIndex].flip();
buffers[readIndex].position(readPosition);
int count = buffers[readIndex].remaining();
if (count > len) {
int limit = buffers[readIndex].limit();
buffers[readIndex].get(dst, offset, len);
read += len;
buffers[readIndex].compact();
readPosition = buffers[readIndex].position();
buffers[readIndex].limit(buffers[readIndex].capacity());
buffers[readIndex].position(limit);
return read;
} else {
buffers[readIndex].get(dst, offset, count);
offset += count;
read += count;
len -= count;
readPosition = 0;
if (readIndex < writeIndex) {
buffers[readIndex++] = null;
} else {
Expand All @@ -423,14 +450,15 @@ public int get(byte[] dst, int offset, int len) {
* @throws BufferUnderflowException If there are no remaining bytes to be read.
*/
public byte get() {
while (readIndex < writeIndex && buffers[readIndex].position() == 0) {
buffers[readIndex++] = null;
int readLimit = buffers[readIndex].position();
while (readIndex < writeIndex && readLimit == readPosition) {
buffers[readIndex] = null;
readIndex++;
readPosition = 0;
readLimit = buffers[readIndex].position();
}
if (buffers[readIndex].position() > 0) {
buffers[readIndex].flip();
byte result = buffers[readIndex].get();
buffers[readIndex].compact();
return result;
if (readPosition < readLimit) {
return buffers[readIndex].get(readPosition++);
}
throw new BufferUnderflowException();
}
Expand All @@ -444,6 +472,24 @@ public void unget(ByteBuffer src) {
if (!src.hasRemaining()) {
return;
}
if (readPosition >= src.remaining()) {
// very fast unget
int l = buffers[readIndex].limit();
int p = buffers[readIndex].position();
buffers[readIndex].limit(readPosition);
readPosition -= src.remaining();
buffers[readIndex].position(readPosition);
buffers[readIndex].put(src);
buffers[readIndex].limit(l);
buffers[readIndex].position(p);
return;
} else if (readPosition > 0) {
// need to compact early
buffers[readIndex].flip();
buffers[readIndex].position(readPosition);
buffers[readIndex].compact();
readPosition = 0;
}
ByteBuffer[] inject = new ByteBuffer[src.remaining() / bufferSize + 1];
int injectIndex = 0;
while (src.hasRemaining()) {
Expand Down Expand Up @@ -503,13 +549,15 @@ public String toString() {
*/
public byte[] toByteArray() {
if (readIndex == writeIndex) {
if (buffers[readIndex].position() == 0) {
if (buffers[readIndex].position() == readPosition) {
return EMPTY_BYTE_ARRAY;
}
buffers[readIndex].flip();
buffers[readIndex].position(readPosition);
byte[] result = new byte[buffers[readIndex].remaining()];
buffers[readIndex].get(result);
buffers[readIndex].clear();
readPosition = 0;
readIndex = writeIndex = 0;
return result;
}
Expand All @@ -521,9 +569,11 @@ public byte[] toByteArray() {
int pos = 0;
for (int index = readIndex; index <= writeIndex; index++) {
buffers[index].flip();
buffers[index].position(readPosition);
int count = buffers[index].remaining();
buffers[index].get(result, pos, count);
buffers[index].clear();
readPosition = 0;
pos += count;
}
readIndex = writeIndex = 0;
Expand Down

0 comments on commit 6cd9145

Please sign in to comment.