Skip to content

Commit

Permalink
Merge pull request #1738 from stephenc/jenkins-28840
Browse files Browse the repository at this point in the history
[FIXED JENKINS-28840] Deadlock between Queue.maintain and Executor.interrupt
  • Loading branch information
stephenc committed Jun 15, 2015
2 parents fe83963 + e385231 commit 71e684a
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 18 deletions.
3 changes: 3 additions & 0 deletions changelog.html
Expand Up @@ -77,6 +77,9 @@
<li class=bug>
Revert fix for issue <a href="https://issues.jenkins-ci.org/browse/JENKINS-17290">17290</a> due to the regressions it caused.
(<a href="https://issues.jenkins-ci.org/browse/JENKINS-28601">issue 28601</a>)
<li class=bug>
Fix deadlock between hudson.model.Queue and hudson.model.Computer.
(<a href="https://issues.jenkins-ci.org/browse/JENKINS-288040">issue 28840</a>)
</ul>
</div><!--=TRUNK-END=-->
<h3><a name=v1.617>What's new in 1.617</a> (2015/06/07)</h3>
Expand Down
22 changes: 15 additions & 7 deletions core/src/main/java/hudson/model/Computer.java
Expand Up @@ -990,22 +990,25 @@ public final long getDemandStartMilliseconds() {
* Called by {@link Executor} to kill excessive executors from this computer.
*/
/*package*/ void removeExecutor(final Executor e) {
Queue.withLock(new Runnable() {
final Runnable task = new Runnable() {
@Override
public void run() {
synchronized (Computer.this) {
executors.remove(e);
addNewExecutorIfNecessary();
if (!isAlive()) // TODO except from interrupt/doYank this is called while the executor still isActive(), so how could !this.isAlive()?
{
if (!isAlive()) {
AbstractCIBase ciBase = Jenkins.getInstance();
if (ciBase != null) {
ciBase.removeComputer(Computer.this);
}
}
}
}
});
};
if (!Queue.tryWithLock(task)) {
// JENKINS-28840 if we couldn't get the lock push the operation to a separate thread to avoid deadlocks
threadPoolForRemoting.submit(Queue.wrapWithLock(task));
}
}

/**
Expand All @@ -1028,9 +1031,14 @@ protected boolean isAlive() {
* Called from {@link Jenkins#cleanUp}.
*/
public void interrupt() {
for (Executor e : executors) {
e.interruptForShutdown();
}
Queue.withLock(new Runnable() {
@Override
public void run() {
for (Executor e : executors) {
e.interruptForShutdown();
}
}
});
}

public String getSearchUrl() {
Expand Down
131 changes: 128 additions & 3 deletions core/src/main/java/hudson/model/Queue.java
Expand Up @@ -99,7 +99,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -113,6 +112,7 @@
import org.acegisecurity.AccessDeniedException;
import org.acegisecurity.Authentication;
import org.jenkinsci.bytecode.AdaptField;
import org.jenkinsci.remoting.RoleChecker;
import org.kohsuke.stapler.HttpResponse;
import org.kohsuke.stapler.HttpResponses;
import org.kohsuke.stapler.export.Exported;
Expand Down Expand Up @@ -337,7 +337,7 @@ public Void call() throws Exception {
}
});

private transient final Lock lock = new ReentrantLock();
private transient final ReentrantLock lock = new ReentrantLock();

private transient final Condition condition = lock.newCondition();

Expand Down Expand Up @@ -1179,7 +1179,7 @@ public static void withLock(Runnable runnable) {
}

/**
* Some operations require to be performed with the {@link Queue} lock held. Use one of these methods rather
* Some operations require the {@link Queue} lock held. Use one of these methods rather
* than locking directly on Queue in order to allow for future refactoring.
*
* @param callable the operation to perform.
Expand Down Expand Up @@ -1219,6 +1219,59 @@ public static <V> V withLock(java.util.concurrent.Callable<V> callable) throws E
}
}

/**
* Invokes the supplied {@link Runnable} if the {@link Queue} lock was obtained without blocking.
*
* @param runnable the operation to perform.
* @return {@code true} if the lock was available and the operation was performed.
* @since 1.618
*/
public static boolean tryWithLock(Runnable runnable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
if (queue == null) {
runnable.run();
return true;
} else {
return queue._tryWithLock(runnable);
}
}
/**
* Wraps a {@link Runnable} with the {@link Queue} lock held.
*
* @param runnable the operation to wrap.
* @since 1.618
*/
public static Runnable wrapWithLock(Runnable runnable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
return queue == null ? runnable : new LockedRunnable(runnable);
}

/**
* Wraps a {@link hudson.remoting.Callable} with the {@link Queue} lock held.
*
* @param callable the operation to wrap.
* @since 1.618
*/
public static <V, T extends Throwable> hudson.remoting.Callable<V, T> wrapWithLock(hudson.remoting.Callable<V, T> callable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
return queue == null ? callable : new LockedHRCallable<>(callable);
}

/**
* Wraps a {@link java.util.concurrent.Callable} with the {@link Queue} lock held.
*
* @param callable the operation to wrap.
* @since 1.618
*/
public static <V> java.util.concurrent.Callable<V> wrapWithLock(java.util.concurrent.Callable<V> callable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
return queue == null ? callable : new LockedJUCCallable<V>(callable);
}

@Override
protected void _await() throws InterruptedException {
condition.await();
Expand All @@ -1244,6 +1297,26 @@ protected void _withLock(Runnable runnable) {
}
}

/**
* Invokes the supplied {@link Runnable} if the {@link Queue} lock was obtained without blocking.
*
* @param runnable the operation to perform.
* @return {@code true} if the lock was available and the operation was performed.
* @since 1.618
*/
protected boolean _tryWithLock(Runnable runnable) {
if (lock.tryLock()) {
try {
runnable.run();
} finally {
lock.unlock();
}
return true;
} else {
return false;
}
}

/**
* Some operations require to be performed with the {@link Queue} lock held. Use one of these methods rather
* than locking directly on Queue in order to allow for future refactoring.
Expand Down Expand Up @@ -1308,6 +1381,13 @@ public void maintain() {
List<BuildableItem> lostPendings = new ArrayList<BuildableItem>(pendings);
for (Computer c : Jenkins.getInstance().getComputers()) {
for (Executor e : c.getExecutors()) {
if (e.isInterrupted()) {
// JENKINS-28840 we will deadlock if we try to touch this executor while interrupt flag set
// we need to clear lost pendings as we cannot know what work unit was on this executor
// while it is interrupted. (All this dancing is a result of Executor extending Thread)
lostPendings.clear(); // we'll get them next time around when the flag is cleared.
continue;
}
if (e.isParking()) {
parked.put(e, new JobOffer(e));
}
Expand Down Expand Up @@ -2571,6 +2651,51 @@ public Snapshot(Set<WaitingItem> waitingList, List<BlockedItem> blockedProjects,
this.pendings = new ArrayList<BuildableItem>(pendings);
}
}

private static class LockedRunnable implements Runnable {
private final Runnable delegate;

private LockedRunnable(Runnable delegate) {
this.delegate = delegate;
}

@Override
public void run() {
withLock(delegate);
}
}

private static class LockedJUCCallable<V> implements java.util.concurrent.Callable<V> {
private final java.util.concurrent.Callable<V> delegate;

private LockedJUCCallable(java.util.concurrent.Callable<V> delegate) {
this.delegate = delegate;
}

@Override
public V call() throws Exception {
return withLock(delegate);
}
}

private static class LockedHRCallable<V,T extends Throwable> implements hudson.remoting.Callable<V,T> {
private static final long serialVersionUID = 1L;
private final hudson.remoting.Callable<V,T> delegate;

private LockedHRCallable(hudson.remoting.Callable<V,T> delegate) {
this.delegate = delegate;
}

@Override
public V call() throws T {
return withLock(delegate);
}

@Override
public void checkRoles(RoleChecker checker) throws SecurityException {
delegate.checkRoles(checker);
}
}

@CLIResolver
public static Queue getInstance() {
Expand Down
18 changes: 12 additions & 6 deletions core/src/main/java/jenkins/model/Jenkins.java
Expand Up @@ -2778,13 +2778,19 @@ public void execute(Runnable command) {
LOGGER.log(SEVERE, "Failed to execute termination",e);
}

Set<Future<?>> pending = new HashSet<Future<?>>();
final Set<Future<?>> pending = new HashSet<Future<?>>();
terminating = true;
for( Computer c : computers.values() ) {
c.interrupt();
killComputer(c);
pending.add(c.disconnect(null));
}
// JENKINS-28840 we know we will be interrupting all the Computers so get the Queue lock once for all
Queue.withLock(new Runnable() {
@Override
public void run() {
for( Computer c : computers.values() ) {
c.interrupt();
killComputer(c);
pending.add(c.disconnect(null));
}
}
});
if(udpBroadcastThread!=null)
udpBroadcastThread.shutdown();
if(dnsMultiCast!=null)
Expand Down
12 changes: 10 additions & 2 deletions test/src/test/java/hudson/slaves/CommandLauncherTest.java
Expand Up @@ -28,11 +28,14 @@
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import hudson.Functions;
import hudson.model.Node;

import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -66,7 +69,7 @@ public void commandSuceedsWithoutChannel() throws Exception {

public DumbSlave createSlave(String command) throws Exception {
DumbSlave slave;
synchronized (j.jenkins) {
synchronized (j.jenkins) { // TODO this lock smells like a bug post 1.607
slave = new DumbSlave(
"dummy",
"dummy",
Expand All @@ -81,7 +84,12 @@ public DumbSlave createSlave(String command) throws Exception {
j.jenkins.addNode(slave);
}

Thread.sleep(100);
try {
slave.toComputer().connect(false).get(1, TimeUnit.SECONDS);
fail("the slave was not supposed to connect successfully");
} catch (ExecutionException e) {
// ignore, we just want to
}

return slave;
}
Expand Down

0 comments on commit 71e684a

Please sign in to comment.