Skip to content

Commit

Permalink
[JENKINS-36871] Tests had missed some critical bugs in this ByteBuffe…
Browse files Browse the repository at this point in the history
…rQueueInputStream
  • Loading branch information
stephenc committed Aug 2, 2016
1 parent 124439e commit 707bfe8
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 25 deletions.
Expand Up @@ -74,7 +74,7 @@ public ByteBufferQueueInputStream(ByteBufferQueue queue, int length) {
*/
@Override
public int read() throws IOException {
if (length != -1 && pos > length) {
if (length != -1 && pos >= length) {
return -1;
}
try {
Expand All @@ -88,7 +88,7 @@ public int read() throws IOException {
mark = null;
}
}
return b;
return b & 0xff;
} catch (BufferUnderflowException e) {
return -1;
}
Expand All @@ -99,57 +99,73 @@ public int read() throws IOException {
*/
@Override
public int read(byte[] b) throws IOException {
if (length != -1 && pos > length) {
return -1;
}
ByteBuffer buffer = ByteBuffer.wrap(b);
if (length != -1 && buffer.limit() > length - pos) {
buffer.limit(length - pos + 1);
ByteBuffer buffer;
if (length != -1 && pos >= length) {
int rem = length - pos;
if (rem <= 0) {
return -1;
} else if (b.length > rem) {
buffer = ByteBuffer.wrap(b, 0, rem);
} else {
buffer = ByteBuffer.wrap(b);
}
} else {
buffer = ByteBuffer.wrap(b);
}
queue.get(buffer);
if (buffer.position() == 0) {
int read = buffer.position();
if (read <= 0) {
return -1;
}
pos += buffer.position();
pos += read;
if (mark != null) {
if (mark.remaining() > buffer.position()) {
buffer.flip();
if (mark.remaining() > read) {
int oldLimit = buffer.limit();
buffer.limit(buffer.position());
buffer.position(0);
mark.put(buffer);
buffer.limit(oldLimit);
} else {
// mark was invalidated as there was more data than reserved
mark = null;
}
}
return buffer.position();
return read;
}

/**
* {@inheritDoc}
*/
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (length != -1 && pos >= length) {
return -1;
if (length != -1) {
int rem = length - pos;
if (rem <= 0) {
return -1;
} else if (len > rem) {
len = rem;
}
}
ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
if (length != -1 && buffer.limit() > length - pos) {
buffer.limit(length - pos + 1);
}
queue.get(buffer);
if (buffer.position() == 0) {
int read = buffer.position() - off;
if (read <= 0) {
return -1;
}
pos += buffer.position();
pos += read;
if (mark != null) {
if (mark.remaining() > buffer.position()) {
buffer.flip();
if (mark.remaining() > read) {
int oldLimit = buffer.limit();
buffer.limit(buffer.position());
buffer.position(off);
mark.put(buffer);
buffer.limit(oldLimit);
} else {
// mark was invalidated as there was more data than reserved
mark = null;
}
}
return buffer.position();
return read;
}

/**
Expand All @@ -161,7 +177,7 @@ public long skip(long n) throws IOException {
if (pos >= length) {
return -1;
}
if (pos + n > length) {
if (pos + n >= length) {
n = length - pos;
}
}
Expand Down Expand Up @@ -209,7 +225,7 @@ public synchronized void reset() throws IOException {
mark.flip();
pos -= mark.remaining();
queue.unget(mark);
mark.position(0);
mark.clear();
}

/**
Expand Down
Expand Up @@ -67,6 +67,44 @@ public void readSome() throws Exception {
assertThat(read(instance, 10), is("AbCdEfGhIj"));
}

@Test
public void readBytes() throws Exception {
String str = "AbCdEfGhIjKlMnOpQrStUvWxYz";

ByteBufferQueue queue = new ByteBufferQueue(10);
queue.put(ByteBuffer.wrap(str.getBytes(Charsets.UTF_8)));
ByteBufferQueueInputStream instance = new ByteBufferQueueInputStream(queue);

byte[] bytes = new byte[10];
assertThat(instance.read(bytes), is(10));
assertThat(new String(bytes, Charsets.UTF_8), is("AbCdEfGhIj"));
assertThat(instance.read(bytes), is(10));
assertThat(new String(bytes, Charsets.UTF_8), is("KlMnOpQrSt"));
assertThat(instance.read(bytes), is(6));
assertThat(new String(bytes, Charsets.UTF_8), is("UvWxYzQrSt"));
}

@Test
public void readBytesOffset() throws Exception {
String str = "AbCdEfGhIjKlMnOpQrStUvWxYz";

ByteBufferQueue queue = new ByteBufferQueue(10);
queue.put(ByteBuffer.wrap(str.getBytes(Charsets.UTF_8)));
ByteBufferQueueInputStream instance = new ByteBufferQueueInputStream(queue);

byte[] bytes = new byte[10];
assertThat(instance.read(bytes,5,3), is(3));
assertThat(new String(bytes, Charsets.UTF_8), is("\u0000\u0000\u0000\u0000\u0000AbC\u0000\u0000"));
assertThat(instance.read(bytes, 0, 2), is(2));
assertThat(new String(bytes, Charsets.UTF_8), is("dE\u0000\u0000\u0000AbC\u0000\u0000"));
assertThat(instance.read(bytes, 2, 8), is(8));
assertThat(new String(bytes, Charsets.UTF_8), is("dEfGhIjKlM"));
assertThat(instance.read(bytes, 2, 8), is(8));
assertThat(new String(bytes, Charsets.UTF_8), is("dEnOpQrStU"));
assertThat(instance.read(bytes, 2, 8), is(5));
assertThat(new String(bytes, Charsets.UTF_8), is("dEvWxYzStU"));
}

@Test
public void skipRead() throws Exception {
String str = "AbCdEfGhIjKlMnOpQrStUvWxYz";
Expand Down

0 comments on commit 707bfe8

Please sign in to comment.