Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[JENKINS-48130] - Improve handling and diagnostics of RejectedExecuti…
…onException in the code (#156)

* Introduce the reloable ExecutorServiceUtils#submitAsync() method

* Rework existing callers of ExecutorService#submit() to ExecutorServiceUtils, process errors

* Introduce the FatalRejectedExecutionException and proper handling in ExecutorServiceUtils

* RejectedExecutionException in SingleLaneService should refer the base exec service

* Polish the PipeTest implementation for a better diagnosability

* Fix the double submission issue in the SingleLaneExecutor
  • Loading branch information
oleg-nenashev committed Nov 29, 2017
1 parent c186279 commit eee8030
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 51 deletions.
4 changes: 3 additions & 1 deletion src/main/java/hudson/remoting/AtmostOneThreadExecutor.java
Expand Up @@ -8,6 +8,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.jenkinsci.remoting.util.ExecutorServiceUtils.FatalRejectedExecutionException;

/**
* {@link ExecutorService} that uses at most one executor.
Expand Down Expand Up @@ -86,7 +87,8 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
public void execute(Runnable command) {
synchronized (q) {
if (isShutdown()) {
throw new RejectedExecutionException("This executor has been shutdown.");
// No way this executor service can be recovered
throw new FatalRejectedExecutionException("This executor has been shutdown.");
}
q.add(command);
if (!isAlive()) {
Expand Down
113 changes: 78 additions & 35 deletions src/main/java/hudson/remoting/JarCacheSupport.java
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jenkinsci.remoting.util.ExecutorServiceUtils;

/**
* Default partial implementation of {@link JarCache}.
Expand Down Expand Up @@ -50,50 +51,92 @@ public Future<URL> resolve(final Channel channel, final long sum1, final long su

while (true) {// might have to try a few times before we get successfully resolve

final Checksum key = new Checksum(sum1,sum2);
final AsyncFutureImpl<URL> promise = new AsyncFutureImpl<URL>();
Future<URL> cur = inprogress.putIfAbsent(key, promise);
final Checksum key = new Checksum(sum1,sum2);
Future<URL> cur = inprogress.get(key);
if (cur!=null) {
// this computation is already in progress. piggy back on that one
return cur;
} else {
// we are going to resolve this ourselves and publish the result in 'promise' for others
downloader.submit(new Runnable() {
public void run() {
try {
URL url = retrieve(channel,sum1,sum2);
inprogress.remove(key);
promise.set(url);
} catch (ChannelClosedException e) {
// the connection was killed while we were still resolving the file
bailout(e);
} catch (RequestAbortedException e) {
// the connection was killed while we were still resolving the file
bailout(e);
} catch (InterruptedException e) {
// we are bailing out, but we need to allow another thread to retry later.
bailout(e);
try {
final AsyncFutureImpl<URL> promise = new AsyncFutureImpl<URL>();
ExecutorServiceUtils.submitAsync(downloader, new DownloadRunnable(channel, sum1, sum2, key, promise));
// Now we are sure that the task has been accepted to the queue, hence we cache the promise
// if nobody else caches it before.
inprogress.putIfAbsent(key, promise);
} catch (ExecutorServiceUtils.ExecutionRejectedException ex) {
final String message = "Downloader executor service has rejected the download command for checksum " + key;
LOGGER.log(Level.SEVERE, message, ex);
// Retry the submission after 100 ms if the error is not fatal
if (ex.isFatal()) {
// downloader won't accept anything else, do not even try
throw new IOException(message, ex);
} else {
//TODO: should we just fail? unrealistic case for the current AtmostOneThreadExecutor implementation anyway
Thread.sleep(100);
}
}
}
}
}

private class DownloadRunnable implements Runnable {

final Channel channel;
final long sum1;
final long sum2;
final Checksum key;
final AsyncFutureImpl<URL> promise;

LOGGER.log(Level.WARNING, String.format("Interrupted while resolving a jar %016x%016x",sum1,sum2), e);
} catch (Throwable e) {
// in other general failures, we aren't retrying
// TODO: or should we?
promise.set(e);
public DownloadRunnable(Channel channel, long sum1, long sum2, Checksum key, AsyncFutureImpl<URL> promise) {
this.channel = channel;
this.sum1 = sum1;
this.sum2 = sum2;
this.key = key;
this.promise = promise;
}

@Override
public void run() {
try {
// Deduplication: There is a risk that multiple downloadables get scheduled, hence we check if
// the promise is actually in the queue
Future<URL> inprogressDownload = inprogress.get(key);
if (promise != inprogressDownload) {
// Duplicated entry due to the race condition, do nothing
return;
}

URL url = retrieve(channel, sum1, sum2);
inprogress.remove(key);
promise.set(url);
} catch (ChannelClosedException e) {
// the connection was killed while we were still resolving the file
bailout(e);
} catch (RequestAbortedException e) {
// the connection was killed while we were still resolving the file
bailout(e);
} catch (InterruptedException e) {
// we are bailing out, but we need to allow another thread to retry later.
bailout(e);

LOGGER.log(Level.WARNING, String.format("Failed to resolve a jar %016x%016x",sum1,sum2), e);
}
}
LOGGER.log(Level.WARNING, String.format("Interrupted while resolving a jar %016x%016x", sum1, sum2), e);
} catch (Throwable e) {
// in other general failures, we aren't retrying
// TODO: or should we?
promise.set(e);

/**
* Report a failure of the retrieval and allows another thread to retry.
*/
private void bailout(Exception e) {
inprogress.remove(key); // this lets another thread to retry later
promise.set(e); // then tell those who are waiting that we aborted
}
});
LOGGER.log(Level.WARNING, String.format("Failed to resolve a jar %016x%016x", sum1, sum2), e);
}
}

/**
* Report a failure of the retrieval and allows another thread to retry.
*/
private void bailout(Exception e) {
inprogress.remove(key); // this lets another thread to retry later
promise.set(e); // then tell those who are waiting that we aborted
}
}

protected JarLoader getJarLoader(Channel channel) throws InterruptedException {
Expand All @@ -104,6 +147,6 @@ protected JarLoader getJarLoader(Channel channel) throws InterruptedException {
}
return jl;
}

private static final Logger LOGGER = Logger.getLogger(JarCacheSupport.class.getName());
}
40 changes: 35 additions & 5 deletions src/main/java/hudson/remoting/SingleLaneExecutorService.java
Expand Up @@ -8,6 +8,10 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jenkinsci.remoting.util.ExecutorServiceUtils;
import org.jenkinsci.remoting.util.ExecutorServiceUtils.FatalRejectedExecutionException;

/**
* Creates an {@link ExecutorService} that executes submitted tasks sequentially
Expand Down Expand Up @@ -43,6 +47,8 @@ public class SingleLaneExecutorService extends AbstractExecutorService {
* We have finished shut down. Every tasks are full executed.
*/
private boolean shutDown;

private static final Logger LOGGER = Logger.getLogger(SingleLaneExecutorService.class.getName());

/**
* @param base
Expand Down Expand Up @@ -95,18 +101,31 @@ public synchronized boolean awaitTermination(long timeout, TimeUnit unit) throws
return isTerminated();
}

// TODO: create a new method with non-Runtime exceptions and timeout support
@Override
public synchronized void execute(Runnable command) {
if (shuttingDown)
throw new RejectedExecutionException();

if (shuttingDown) {
throw new FatalRejectedExecutionException("Cannot execute the command " + command +
". The executor service is shutting down");
}

this.tasks.add(command);

// If we haven't been scheduled yet, do so now
if (!scheduled) {
scheduled = true;
base.submit(runner); // if we haven't been scheduled yet, do so now
try {
// Submit task in the async mode
ExecutorServiceUtils.submitAsync(base, runner);
} catch (ExecutorServiceUtils.ExecutionRejectedException ex) {
// Wrap by the runtime exception since there is no other solution here
throw new RejectedExecutionException("Base executor service " + base + " has rejected the task " + command, ex);
}
}
}

private final Runnable runner = new Runnable() {
@Override
public void run() {
try {
tasks.peek().run();
Expand All @@ -117,7 +136,18 @@ public void run() {
assert scheduled;
if (!tasks.isEmpty()) {
// we have still more things to do
base.submit(this);
try {
ExecutorServiceUtils.submitAsync(base, this);
} catch (ExecutorServiceUtils.ExecutionRejectedException ex) {
// It is supposed to be a fatal error, but we cannot propagate it properly
// So the code just logs the error and then throws RuntimeException as it
// used to do before the code migration to ExecutorServiceUtils.
// TODO: so this behavior still implies the BOOM risk, but there wil be a log entry at least
LOGGER.log(Level.SEVERE, String.format(
"Base executor service %s has rejected the queue task %s. Propagating the RuntimeException to the caller.",
ex.getExecutorServiceDisplayName(), ex.getRunnableDisplayName()), ex);
throw ExecutorServiceUtils.createRuntimeException("Base executor service has rejected the task from the queue", ex);
}
} else {
scheduled = false;
if (shuttingDown) {
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/org/jenkinsci/remoting/nio/NioChannelHub.java
Expand Up @@ -40,6 +40,7 @@

import static java.nio.channels.SelectionKey.*;
import static java.util.logging.Level.*;
import org.jenkinsci.remoting.util.ExecutorServiceUtils;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;

Expand Down Expand Up @@ -631,7 +632,7 @@ public void run() {
} while (!last);
assert packetSize==0;
if (packet.length > 0) {
t.swimLane.submit(new Runnable() {
ExecutorServiceUtils.submitAsync(t.swimLane, new Runnable() {
@Override
public void run() {
final ByteArrayReceiver receiver = t.receiver;
Expand All @@ -653,8 +654,9 @@ public void run() {
t.abort(new IOException(msg));
}
if (t.rb.isClosed()) {
// EOF. process this synchronously with respect to packets
t.swimLane.submit(new Runnable() {
// EOF. process this synchronously with respect to packets waiting for handling in the queue
ExecutorServiceUtils.submitAsync(t.swimLane, new Runnable() {
@Override
public void run() {
// if this EOF is unexpected, report an error.
if (!t.getChannel().isInClosed()) {
Expand All @@ -677,6 +679,11 @@ public void run() {
// It causes the channel failure, hence it is severe
LOGGER.log(SEVERE, "Communication problem in " + t + ". NIO Transport will be aborted.", e);
t.abort(e);
} catch (ExecutorServiceUtils.ExecutionRejectedException e) {
// TODO: should we try to reschedule the task if the issue is not fatal?
// The swimlane has rejected the execution, e.g. due to the "shutting down" state.
LOGGER.log(SEVERE, "The underlying executor service rejected the task in " + t + ". NIO Transport will be aborted.", e);
t.abort(e);
} catch (CancelledKeyException e) {
// see JENKINS-24050. I don't understand how this can happen, given that the selector
// thread is the only thread that cancels keys. So to better understand what's going on,
Expand Down

0 comments on commit eee8030

Please sign in to comment.