Skip to content

Commit

Permalink
[FIXED JENKINS-9189] fixed the race condition between I/O operation a…
Browse files Browse the repository at this point in the history
…nd the return of the Channel.call() execution in more fundamental way.

(cherry picked from commit 9cdd9cc)
  • Loading branch information
kohsuke authored and vjuranek committed Aug 17, 2011
1 parent 8a2209f commit ef2c8a7
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 19 deletions.
40 changes: 34 additions & 6 deletions remoting/src/main/java/hudson/remoting/ProxyOutputStream.java
Expand Up @@ -156,11 +156,37 @@ protected void finalize() throws Throwable {
}
}

/**
* I/O operations in remoting gets executed by a separate pipe thread asynchronously.
* So if a closure performs some I/O (such as writing to the RemoteOutputStream) then returns,
* it is possible that the calling thread unblocks before the I/O actually completes.
* <p>
* This race condition creates a truncation problem like JENKINS-9189 or JENKINS-7871.
* The initial fix for this was to introduce {@link Channel#syncLocalIO()}, but given the
* recurrence in JENKINS-9189, I concluded that it's too error prone to expect the user of the
* remoting to make such a call in the right place.
* <p>
* So the goal of this code is to automatically ensure the proper ordering of the return from
* the {@link Request#call(Channel)} and the I/O operations done during the call. We do this
* by attributing I/O call to a {@link Request}, then keeping track of the last I/O operation
* performed.
*/
private static void markForIoSync(Channel channel, int requestId, java.util.concurrent.Future<?> ioOp) {
Request<?,?> call = channel.pendingCalls.get(requestId);
// call==null if:
// 1) the remote peer uses old version that doesn't set the requestId field
// 2) a bug in the code, but in that case we are being defensive
if (call!=null)
call.lastIo = ioOp;
}


/**
* {@link Command} for sending bytes.
*/
private static final class Chunk extends Command {
private final int oid;
private final int requestId = Request.getCurrentRequestId();
private final byte[] buf;

public Chunk(int oid, byte[] buf, int start, int len) {
Expand All @@ -179,7 +205,7 @@ public Chunk(int oid, byte[] buf, int start, int len) {

protected void execute(final Channel channel) {
final OutputStream os = (OutputStream) channel.getExportedObject(oid);
channel.pipeWriter.submit(new Runnable() {
markForIoSync(channel,requestId,channel.pipeWriter.submit(new Runnable() {
public void run() {
try {
os.write(buf);
Expand Down Expand Up @@ -208,7 +234,7 @@ public void run() {
}
}
}
});
}));
}

public String toString() {
Expand All @@ -223,6 +249,7 @@ public String toString() {
*/
private static final class Flush extends Command {
private final int oid;
private final int requestId = Request.getCurrentRequestId();

public Flush(int oid) {
super(false);
Expand All @@ -231,15 +258,15 @@ public Flush(int oid) {

protected void execute(Channel channel) {
final OutputStream os = (OutputStream) channel.getExportedObject(oid);
channel.pipeWriter.submit(new Runnable() {
markForIoSync(channel,requestId,channel.pipeWriter.submit(new Runnable() {
public void run() {
try {
os.flush();
} catch (IOException e) {
// ignore errors
}
}
});
}));
}

public String toString() {
Expand Down Expand Up @@ -282,6 +309,7 @@ public String toString() {
*/
private static final class EOF extends Command {
private final int oid;
private final int requestId = Request.getCurrentRequestId();

public EOF(int oid) {
this.oid = oid;
Expand All @@ -290,7 +318,7 @@ public EOF(int oid) {

protected void execute(final Channel channel) {
final OutputStream os = (OutputStream) channel.getExportedObject(oid);
channel.pipeWriter.submit(new Runnable() {
markForIoSync(channel,requestId,channel.pipeWriter.submit(new Runnable() {
public void run() {
channel.unexport(oid);
try {
Expand All @@ -299,7 +327,7 @@ public void run() {
// ignore errors
}
}
});
}));
}

public String toString() {
Expand Down
37 changes: 24 additions & 13 deletions remoting/src/main/java/hudson/remoting/Request.java
Expand Up @@ -72,6 +72,11 @@ abstract class Request<RSP extends Serializable,EXC extends Throwable> extends C
*/
protected volatile transient Future<?> future;

/**
* If this request performed some I/O back in the caller side during the remote call execution, set to last such
* operation, so that we can block until its completion.
*/
/*package*/ volatile transient Future<?> lastIo;

protected Request() {
synchronized(Request.class) {
Expand Down Expand Up @@ -128,6 +133,13 @@ public final RSP call(Channel channel) throws EXC, InterruptedException, IOExcep
t.setName(name);
}

if (lastIo!=null)
try {
lastIo.get();
} catch (ExecutionException e) {
// ignore the I/O error
}

Object exc = response.exception;

if (exc!=null) {
Expand Down Expand Up @@ -266,13 +278,16 @@ protected final void execute(final Channel channel) {
public void run() {
try {
Command rsp;
CURRENT.set(Request.this);
try {
RSP r = Request.this.perform(channel);
// normal completion
rsp = new Response<RSP,EXC>(id,r);
} catch (Throwable t) {
// error return
rsp = new Response<RSP,Throwable>(id,t);
} finally {
CURRENT.set(null);
}
if(chainCause)
rsp.createdAt.initCause(createdAt);
Expand Down Expand Up @@ -307,19 +322,15 @@ public void run() {
*/
public static boolean chainCause = Boolean.getBoolean(Request.class.getName()+".chainCause");

//private static final Unsafe unsafe = getUnsafe();

//private static Unsafe getUnsafe() {
// try {
// Field f = Unsafe.class.getDeclaredField("theUnsafe");
// f.setAccessible(true);
// return (Unsafe)f.get(null);
// } catch (NoSuchFieldException e) {
// throw new Error(e);
// } catch (IllegalAccessException e) {
// throw new Error(e);
// }
//}
/**
* Set to the {@link Request} object during {@linkplain #perform(Channel) the execution of the call}.
*/
/*package*/ static ThreadLocal<Request> CURRENT = new ThreadLocal<Request>();

/*package*/ static int getCurrentRequestId() {
Request r = CURRENT.get();
return r!=null ? r.id : 0;
}

/**
* Interrupts the execution of the remote computation.
Expand Down

0 comments on commit ef2c8a7

Please sign in to comment.