Navigation Menu

Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Added additional diagnostic code that will try to read ahead and provide
what's ahead in the input stream.
  • Loading branch information
kohsuke committed Jun 21, 2013
1 parent 2c47b1c commit 68be3b1
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 7 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -140,6 +140,12 @@ THE SOFTWARE.
<artifactId>constant-pool-scanner</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
73 changes: 66 additions & 7 deletions src/main/java/hudson/remoting/ClassicCommandTransport.java
Expand Up @@ -2,12 +2,15 @@

import hudson.remoting.Channel.Mode;

import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.StreamCorruptedException;

/**
* The default {@link CommandTransport} that has been used historically.
Expand All @@ -23,12 +26,21 @@
private final ObjectInputStream ois;
private final ObjectOutputStream oos;
private final Capability remoteCapability;
private final OutputStream underlyingStream;

private ClassicCommandTransport(ObjectInputStream ois, ObjectOutputStream oos, OutputStream underlyingStream, Capability remoteCapability) {
/**
* Transport level {@link InputStream} that we use only for diagnostics in case we detect stream
* corruption. Can be null.
*/
private final @Nullable InputStream rawIn;
/**
* See {@link CommandTransport#getUnderlyingStream()}
*/
private final OutputStream rawOut;

private ClassicCommandTransport(ObjectInputStream ois, ObjectOutputStream oos, InputStream rawIn, OutputStream rawOut, Capability remoteCapability) {
this.ois = ois;
this.oos = oos;
this.underlyingStream = underlyingStream;
this.rawIn= rawIn;
this.rawOut = rawOut;
this.remoteCapability = remoteCapability;
}

Expand Down Expand Up @@ -56,7 +68,54 @@ public void closeWrite() throws IOException {
}

public final Command read() throws IOException, ClassNotFoundException {
return Command.readFrom(channel,ois);
try {
return Command.readFrom(channel,ois);
} catch (StreamCorruptedException e) {
throw diagnoseStreamCorruption(e);
}
}

/**
* To diagnose stream corruption, we'll try to read ahead the data.
* This operation can block, so we'll use another thread to do this.
*/
private StreamCorruptedException diagnoseStreamCorruption(StreamCorruptedException e) throws StreamCorruptedException {
if (rawIn==null)
return e; // no source of diagnostics information. can't diagnose.


final ByteArrayOutputStream readAhead = new ByteArrayOutputStream();
final IOException[] error = new IOException[1];

Thread diagnosisThread = new Thread(channel+" stream corruption diagnosis thread") {
public void run() {
int b;
try {
// not all InputStream will look for the thread interrupt flag, so check that explicitly to be defensive
while (!Thread.interrupted() && (b=rawIn.read())!=-1) {
readAhead.write(b);
}
} catch (IOException e) {
error[0] = e;
}
}
};

// wait up to 1 sec to grab as much data as possible
diagnosisThread.start();
try {
diagnosisThread.join(1000);
} catch (InterruptedException _) {
// we are only waiting for a fixed amount of time, so we'll pretend like we were in a busy loop
Thread.currentThread().interrupt();
// fall through
}

IOException diagnosisProblem = error[0]; // capture the error, if any, before we kill the thread
if (diagnosisThread.isAlive())
diagnosisThread.interrupt(); // if it's not dead, kill

return new DiagnosedStreamCorruptionException(e,diagnosisProblem,readAhead.toByteArray());
}

public void closeRead() throws IOException {
Expand All @@ -65,7 +124,7 @@ public void closeRead() throws IOException {

@Override
OutputStream getUnderlyingStream() {
return underlyingStream;
return rawOut;
}

public static CommandTransport create(Mode mode, InputStream is, OutputStream os, OutputStream header, ClassLoader base, Capability capability) throws IOException {
Expand Down Expand Up @@ -120,7 +179,7 @@ public static CommandTransport create(Mode mode, InputStream is, OutputStream os

return new ClassicCommandTransport(
new ObjectInputStreamEx(mode.wrap(is),base),
oos, os, cap);
oos, is, os, cap);
case 2:
cap = Capability.read(is);
break;
Expand Down
@@ -0,0 +1,38 @@
package hudson.remoting;

import java.io.PrintWriter;
import java.io.StreamCorruptedException;
import java.io.StringWriter;

/**
* Signals a {@link StreamCorruptedException} with some additional diagnostic information.
*
* @author Kohsuke Kawaguchi
*/
public class DiagnosedStreamCorruptionException extends StreamCorruptedException {
private final Exception diagnoseFailure;
private final byte[] readAhead;

public DiagnosedStreamCorruptionException(StreamCorruptedException cause, Exception diagnoseFailure, byte[] readAhead) {
initCause(cause);
this.diagnoseFailure = diagnoseFailure;
this.readAhead = readAhead;
}

@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(super.toString()).append("\n");
buf.append("Read ahead: "+HexDump.toHex(readAhead));
if (diagnoseFailure!=null) {
StringWriter w = new StringWriter();
PrintWriter p = new PrintWriter(w);
diagnoseFailure.printStackTrace(p);
p.flush();

buf.append("Diagnosis problem:\n ");
buf.append(w.toString().trim().replace("\n","\n "));
}
return buf.toString();
}
}
@@ -0,0 +1,43 @@
package hudson.remoting;

import hudson.remoting.Channel.Mode;
import org.apache.commons.io.output.NullOutputStream;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.PrintWriter;
import java.io.StreamCorruptedException;
import java.io.StringWriter;

import static junit.framework.Assert.*;

/**
* @author Kohsuke Kawaguchi
*/
public class DiagnosedStreamCorruptionExceptionTest {
@Test
public void exercise() throws Exception {
byte[] payload = {
0,0,0,0, /* binary stream preamble*/
(byte)0xAC, (byte)0xED, 0x00, 0x05, /* object input stream header */
1, 2, 3, 4, 5 /* bogus data */
};

ClassicCommandTransport ct = (ClassicCommandTransport) ClassicCommandTransport.create(Mode.BINARY, new ByteArrayInputStream(payload), new NullOutputStream(), new NullOutputStream(), getClass().getClassLoader(), new Capability());

try {
ct.read();
fail();
} catch (DiagnosedStreamCorruptionException e) {
StringWriter s = new StringWriter();
PrintWriter w = new PrintWriter(s);
e.printStackTrace(w);
w.close();

String msg = s.toString();
assertTrue(msg.contains("Read ahead: 02030405"));
assertTrue(msg.contains("invalid type code: 01"));
assertSame(StreamCorruptedException.class, e.getCause().getClass());
}
}
}

0 comments on commit 68be3b1

Please sign in to comment.