Skip to content

Commit

Permalink
[FIXED JENKINS-28840] Deadlock between Queue.maintain and Executor.in…
Browse files Browse the repository at this point in the history
…terrupt

More fun here:

- All this originates from Executor extending Thread.
- There is funky logic in the lock handling code of the JVM that makes assumptions
  about how it might proceed with the lock when the thread holding the lock has its
  interrupt flag set.
- Really it would be better if Executor did not extend Thread as that way we wouldn't
  have to deal with some of that complexity. But OTOH we are where we are and backwards
  compatibility may make such a change not possible without a lot of breakage.
- Fixing the issue at hand, firstly requires that interrupting a Computer happens with the
  Queue lock held (to speed up tests we have Jenkins.cleanup get the lock for all Computers)
  That prevents the Queue maintain thread from getting caught
- Secondly, when removing an executor from a computer we process the removal while
  holding the Queue lock, but we move the removal itself to a separate thread if we cannot
  get the Queue lock in order to avoid deadlock.
- Also add helper methods to wrap tasks to be performed while holding the lock
  and a helper method for Runnables that exposes the tryLock functionality
  • Loading branch information
stephenc committed Jun 15, 2015
1 parent 7e1a921 commit 6f343dc
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 15 deletions.
22 changes: 15 additions & 7 deletions core/src/main/java/hudson/model/Computer.java
Expand Up @@ -990,22 +990,25 @@ public final long getDemandStartMilliseconds() {
* Called by {@link Executor} to kill excessive executors from this computer.
*/
/*package*/ void removeExecutor(final Executor e) {
Queue.withLock(new Runnable() {
final Runnable task = new Runnable() {
@Override
public void run() {
synchronized (Computer.this) {
executors.remove(e);
addNewExecutorIfNecessary();
if (!isAlive()) // TODO except from interrupt/doYank this is called while the executor still isActive(), so how could !this.isAlive()?
{
if (!isAlive()) {
AbstractCIBase ciBase = Jenkins.getInstance();
if (ciBase != null) {
ciBase.removeComputer(Computer.this);
}
}
}
}
});
};
if (!Queue.tryWithLock(task)) {
// JENKINS-28840 if we couldn't get the lock push the operation to a separate thread to avoid deadlocks
threadPoolForRemoting.submit(Queue.wrapWithLock(task));
}
}

/**
Expand All @@ -1028,9 +1031,14 @@ protected boolean isAlive() {
* Called from {@link Jenkins#cleanUp}.
*/
public void interrupt() {
for (Executor e : executors) {
e.interruptForShutdown();
}
Queue.withLock(new Runnable() {
@Override
public void run() {
for (Executor e : executors) {
e.interruptForShutdown();
}
}
});
}

public String getSearchUrl() {
Expand Down
129 changes: 127 additions & 2 deletions core/src/main/java/hudson/model/Queue.java
Expand Up @@ -99,7 +99,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -113,6 +112,7 @@
import org.acegisecurity.AccessDeniedException;
import org.acegisecurity.Authentication;
import org.jenkinsci.bytecode.AdaptField;
import org.jenkinsci.remoting.RoleChecker;
import org.kohsuke.stapler.HttpResponse;
import org.kohsuke.stapler.HttpResponses;
import org.kohsuke.stapler.export.Exported;
Expand Down Expand Up @@ -337,7 +337,7 @@ public Void call() throws Exception {
}
});

private transient final Lock lock = new ReentrantLock();
private transient final ReentrantLock lock = new ReentrantLock();

private transient final Condition condition = lock.newCondition();

Expand Down Expand Up @@ -1219,6 +1219,59 @@ public static <V> V withLock(java.util.concurrent.Callable<V> callable) throws E
}
}

/**
* Some operations require to be performed with the {@link Queue} lock held. Use one of these methods rather
* than locking directly on Queue in order to allow for future refactoring.
* @param runnable the operation to perform.
* @return {@code true} if the lock available and the operation performed.
* @since 1.FIXME
*/
public static boolean tryWithLock(Runnable runnable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
if (queue == null) {
runnable.run();
return true;
} else {
return queue._tryWithLock(runnable);
}
}
/**
* Wraps a {@link Runnable} with the {@link Queue} lock held.
*
* @param runnable the operation to wrap.
* @since 1.FIXME
*/
public static Runnable wrapWithLock(Runnable runnable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
return queue == null ? runnable : new LockedRunnable(runnable);
}

/**
* Wraps a {@link hudson.remoting.Callable} with the {@link Queue} lock held.
*
* @param callable the operation to wrap.
* @since 1.FIXME
*/
public static <V, T extends Throwable> hudson.remoting.Callable<V, T> wrapWithLock(hudson.remoting.Callable<V, T> callable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
return queue == null ? callable : new LockedHRCallable<>(callable);
}

/**
* Wraps a {@link java.util.concurrent.Callable} with the {@link Queue} lock held.
*
* @param callable the operation to wrap.
* @since 1.FIXME
*/
public static <V> java.util.concurrent.Callable<V> wrapWithLock(java.util.concurrent.Callable<V> callable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
return queue == null ? callable : new LockedJUCCallable<V>(callable);
}

@Override
protected void _await() throws InterruptedException {
condition.await();
Expand All @@ -1244,6 +1297,26 @@ protected void _withLock(Runnable runnable) {
}
}

/**
* Some operations require to be performed with the {@link Queue} lock held. Use one of these methods rather
* than locking directly on Queue in order to allow for future refactoring.
* @param runnable the operation to perform.
* @return {@code true} if the lock available and the operation performed.
* @since 1.FIXME
*/
protected boolean _tryWithLock(Runnable runnable) {
if (lock.tryLock()) {
try {
runnable.run();
} finally {
lock.unlock();
}
return true;
} else {
return false;
}
}

/**
* Some operations require to be performed with the {@link Queue} lock held. Use one of these methods rather
* than locking directly on Queue in order to allow for future refactoring.
Expand Down Expand Up @@ -1308,6 +1381,13 @@ public void maintain() {
List<BuildableItem> lostPendings = new ArrayList<BuildableItem>(pendings);
for (Computer c : Jenkins.getInstance().getComputers()) {
for (Executor e : c.getExecutors()) {
if (e.isInterrupted()) {
// JENKINS-28840 we will deadlock if we try to touch this executor while interrupt flag set
// we need to clear lost pendings as we cannot know what work unit was on this executor
// while it is interrupted. (All this dancing is a result of Executor extending Thread)
lostPendings.clear(); // we'll get them next time around when the flag is cleared.
continue;
}
if (e.isParking()) {
parked.put(e, new JobOffer(e));
}
Expand Down Expand Up @@ -2571,6 +2651,51 @@ public Snapshot(Set<WaitingItem> waitingList, List<BlockedItem> blockedProjects,
this.pendings = new ArrayList<BuildableItem>(pendings);
}
}

private static class LockedRunnable implements Runnable {
private final Runnable delegate;

private LockedRunnable(Runnable delegate) {
this.delegate = delegate;
}

@Override
public void run() {
withLock(delegate);
}
}

private static class LockedJUCCallable<V> implements java.util.concurrent.Callable<V> {
private final java.util.concurrent.Callable<V> delegate;

private LockedJUCCallable(java.util.concurrent.Callable<V> delegate) {
this.delegate = delegate;
}

@Override
public V call() throws Exception {
return withLock(delegate);
}
}

private static class LockedHRCallable<V,T extends Throwable> implements hudson.remoting.Callable<V,T> {
private static final long serialVersionUID = 1L;
private final hudson.remoting.Callable<V,T> delegate;

private LockedHRCallable(hudson.remoting.Callable<V,T> delegate) {
this.delegate = delegate;
}

@Override
public V call() throws T {
return withLock(delegate);
}

@Override
public void checkRoles(RoleChecker checker) throws SecurityException {
delegate.checkRoles(checker);
}
}

@CLIResolver
public static Queue getInstance() {
Expand Down
18 changes: 12 additions & 6 deletions core/src/main/java/jenkins/model/Jenkins.java
Expand Up @@ -2778,13 +2778,19 @@ public void execute(Runnable command) {
LOGGER.log(SEVERE, "Failed to execute termination",e);
}

Set<Future<?>> pending = new HashSet<Future<?>>();
final Set<Future<?>> pending = new HashSet<Future<?>>();
terminating = true;
for( Computer c : computers.values() ) {
c.interrupt();
killComputer(c);
pending.add(c.disconnect(null));
}
// JENKINS-28840 we know we will be interrupting all the Computers so get the Queue lock once for all
Queue.withLock(new Runnable() {
@Override
public void run() {
for( Computer c : computers.values() ) {
c.interrupt();
killComputer(c);
pending.add(c.disconnect(null));
}
}
});
if(udpBroadcastThread!=null)
udpBroadcastThread.shutdown();
if(dnsMultiCast!=null)
Expand Down

0 comments on commit 6f343dc

Please sign in to comment.