Skip to content

Commit

Permalink
[JENKINS-25938] Introduced AsynchronousExecutable.
Browse files Browse the repository at this point in the history
  • Loading branch information
jglick committed Mar 20, 2015
1 parent a88fb9d commit 46a900f
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 46 deletions.
6 changes: 2 additions & 4 deletions core/src/main/java/hudson/model/AbstractBuild.java
Expand Up @@ -30,8 +30,6 @@
import hudson.FilePath;
import hudson.Functions;
import hudson.Launcher;
import hudson.console.AnnotatedLargeText;
import hudson.console.ExpandableDetailsNote;
import hudson.console.ModelHyperlinkNote;
import hudson.model.Fingerprint.BuildPtr;
import hudson.model.Fingerprint.RangeSet;
Expand Down Expand Up @@ -70,7 +68,6 @@
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.StringWriter;
import java.lang.ref.WeakReference;
import java.util.AbstractSet;
import java.util.ArrayList;
Expand All @@ -93,6 +90,7 @@

import jenkins.model.lazy.BuildReference;
import jenkins.model.lazy.LazyBuildMixIn;
import jenkins.model.queue.Executable2;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.DoNotUse;

Expand All @@ -104,7 +102,7 @@
* @author Kohsuke Kawaguchi
* @see AbstractProject
*/
public abstract class AbstractBuild<P extends AbstractProject<P,R>,R extends AbstractBuild<P,R>> extends Run<P,R> implements Queue.Executable, LazyBuildMixIn.LazyLoadingRun<P,R> {
public abstract class AbstractBuild<P extends AbstractProject<P,R>,R extends AbstractBuild<P,R>> extends Run<P,R> implements Executable2, LazyBuildMixIn.LazyLoadingRun<P,R> {

/**
* Set if we want the blame information to flow from upstream to downstream build.
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/hudson/model/Computer.java
Expand Up @@ -870,15 +870,15 @@ public final long getDemandStartMilliseconds() {
/*package*/ synchronized void removeExecutor(Executor e) {
executors.remove(e);
addNewExecutorIfNecessary();
if(!isAlive())
if(!isAlive()) // TODO except from interrupt/doYank this is called while the executor still isActive(), so how could !this.isAlive()?
{
AbstractCIBase ciBase = Jenkins.getInstance();
ciBase.removeComputer(this);
}
}

/**
* Returns true if any of the executors are functioning.
* Returns true if any of the executors are {@linkplain Executor#isActive active}.
*
* Note that if an executor dies, we'll leave it in {@link #executors} until
* the administrator yanks it out, so that we can see why it died.
Expand Down
89 changes: 62 additions & 27 deletions core/src/main/java/hudson/model/Executor.java
Expand Up @@ -60,13 +60,15 @@
import static java.util.logging.Level.*;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import jenkins.model.queue.Executable2.AsynchronousExecution;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;


/**
* Thread that executes builds.
* Since 1.536, {@link Executor}s start threads on-demand.
* The entire logic should use {@link #isActive()} instead of {@link #isAlive()}
* in order to check if the {@link Executor} it ready to take tasks.
* <p>Callers should use {@link #isActive()} instead of {@link #isAlive()}.
* @author Kohsuke Kawaguchi
*/
@ExportedBean
Expand All @@ -88,6 +90,7 @@ public class Executor extends Thread implements ModelObject {
* {@link hudson.model.Queue.Executable} being executed right now, or null if the executor is idle.
*/
private volatile Queue.Executable executable;
private AsynchronousExecution asynchronousExecution;

/**
* When {@link Queue} allocates a work for this executor, this field is set
Expand Down Expand Up @@ -163,7 +166,11 @@ public void interrupt(Result result, CauseOfInterruption... causes) {
this.causes.add(c);
}
}
super.interrupt();
if (asynchronousExecution != null) {
asynchronousExecution.interrupt();
} else {
super.interrupt();
}
}

public Result abortResult() {
Expand Down Expand Up @@ -238,23 +245,14 @@ public void run() {
if (LOGGER.isLoggable(FINE))
LOGGER.log(FINE, getName()+" is now executing "+executable);
queue.execute(executable, task);
} catch (AsynchronousExecution x) {
x.setExecutor(this);
this.asynchronousExecution = x;
} catch (Throwable e) {
// for some reason the executor died. this is really
// a bug in the code, but we don't want the executor to die,
// so just leave some info and go on to build other things
LOGGER.log(Level.SEVERE, "Executor threw an exception", e);
workUnit.context.abort(e);
problems = e;
} finally {
long time = System.currentTimeMillis()-startTime;
if (LOGGER.isLoggable(FINE))
LOGGER.log(FINE, getName()+" completed "+executable+" in "+time+"ms");
try {
workUnit.context.synchronizeEnd(executable,problems,time);
} catch (InterruptedException e) {
workUnit.context.abort(e);
} finally {
workUnit.setExecutor(null);
if (asynchronousExecution == null) {
finish1(problems);
}
}
} catch (InterruptedException e) {
Expand All @@ -267,12 +265,49 @@ public void run() {
causeOfDeath = e;
LOGGER.log(SEVERE, "Unexpected executor death", e);
} finally {
if (causeOfDeath==null)
// let this thread die and be replaced by a fresh unstarted instance
owner.removeExecutor(this);
if (asynchronousExecution == null) {
finish2();
}
}
}

private void finish1(@CheckForNull Throwable problems) {
if (problems != null) {
// for some reason the executor died. this is really
// a bug in the code, but we don't want the executor to die,
// so just leave some info and go on to build other things
LOGGER.log(Level.SEVERE, "Executor threw an exception", problems);
workUnit.context.abort(problems);
}
long time = System.currentTimeMillis() - startTime;
LOGGER.log(FINE, "{0} completed {1} in {2}ms", new Object[] {getName(), executable, time});
try {
workUnit.context.synchronizeEnd(this, executable, problems, time);
} catch (InterruptedException e) {
workUnit.context.abort(e);
} finally {
workUnit.setExecutor(null);
}
}

private void finish2() {
if (causeOfDeath == null) {// let this thread die and be replaced by a fresh unstarted instance
owner.removeExecutor(this);
}
if (this instanceof OneOffExecutor) {
owner.remove((OneOffExecutor) this);
}
queue.scheduleMaintenance();
}

queue.scheduleMaintenance();
@Restricted(NoExternalUse.class)
public void completedAsynchronous(@CheckForNull Throwable error) {
try {
finish1(error);
} finally {
finish2();
}
asynchronousExecution = null;
}

/**
Expand Down Expand Up @@ -357,14 +392,14 @@ public boolean isBusy() {
/**
* Check if executor is ready to accept tasks.
* This method becomes the critical one since 1.536, which introduces the
* on-demand creation of executor threads. The entire logic should use
* this method instead of {@link #isAlive()}, because it provides wrong
* information for non-started threads.
* on-demand creation of executor threads. Callers should use
* this method instead of {@link #isAlive()}, which would be incorrect for
* non-started threads or running {@link AsynchronousExecution}.
* @return True if the executor is available for tasks
* @since 1.536
*/
public boolean isActive() {
return !started || isAlive();
return !started || asynchronousExecution != null || isAlive();
}

/**
Expand All @@ -377,7 +412,7 @@ public boolean isParking() {
/**
* If this thread dies unexpectedly, obtain the cause of the failure.
*
* @return null if the death is expected death or the thread is {@link #isAlive() still alive}.
* @return null if the death is expected death or the thread {@link #isActive}.
* @since 1.142
*/
public Throwable getCauseOfDeath() {
Expand Down Expand Up @@ -529,7 +564,7 @@ public HttpResponse doStop() {
@RequirePOST
public HttpResponse doYank() {
Jenkins.getInstance().checkPermission(Jenkins.ADMINISTER);
if (isAlive())
if (isActive())
throw new Failure("Can't yank a live executor");
owner.removeExecutor(this);
return HttpResponses.redirectViaContextPath("/");
Expand Down
9 changes: 0 additions & 9 deletions core/src/main/java/hudson/model/OneOffExecutor.java
Expand Up @@ -36,13 +36,4 @@ public class OneOffExecutor extends Executor {
public OneOffExecutor(Computer owner) {
super(owner,-1);
}

@Override
public void run() {
try {
super.run();
} finally {
owner.remove(this);
}
}
}
5 changes: 3 additions & 2 deletions core/src/main/java/hudson/model/Queue.java
Expand Up @@ -95,7 +95,6 @@
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
Expand All @@ -121,6 +120,7 @@
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.converters.basic.AbstractSingleValueConverter;
import javax.annotation.CheckForNull;
import jenkins.model.queue.Executable2;
import org.kohsuke.stapler.QueryParameter;
import org.kohsuke.stapler.interceptor.RequirePOST;

Expand Down Expand Up @@ -1465,6 +1465,7 @@ public interface Task extends ModelObject, SubTask {
* <p>
* Implementation must have <tt>executorCell.jelly</tt>, which is
* used to render the HTML that indicates this executable is executing.
* @see Executable2
*/
public interface Executable extends Runnable {
/**
Expand All @@ -1480,7 +1481,7 @@ public interface Executable extends Runnable {
/**
* Called by {@link Executor} to perform the task
*/
void run();
@Override void run();

/**
* Estimate of how long will it take to execute this executable.
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/hudson/model/ResourceController.java
Expand Up @@ -88,6 +88,7 @@ public void execute(@Nonnull Runnable task, ResourceActivity activity ) throws I
try {
task.run();
} finally {
// TODO if AsynchronousExecution, do that later
synchronized(this) {
inProgress.remove(activity);
inUse = ResourceList.union(resourceView);
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/java/hudson/model/queue/WorkUnitContext.java
Expand Up @@ -76,6 +76,7 @@ public WorkUnitContext(BuildableItem item) {
protected void onCriteriaMet() {
// on behalf of the member Executors,
// the one that executes the main thing will send notifications
// Unclear if this will work with AsynchronousExecution; it *seems* this is only called from synchronize which is only called from synchronizeStart which is only called from an executor thread.
Executor e = Executor.currentExecutor();
if (e.getCurrentWorkUnit().isMainWork()) {
e.getOwner().taskAccepted(e,task);
Expand Down Expand Up @@ -121,18 +122,22 @@ public void synchronizeStart() throws InterruptedException {
}
}

@Deprecated
public void synchronizeEnd(Queue.Executable executable, Throwable problems, long duration) throws InterruptedException {
synchronizeEnd(Executor.currentExecutor(), executable, problems, duration);
}

/**
* All the {@link Executor}s that jointly execute a {@link Task} call this method to synchronize on the end of the task.
*
* @throws InterruptedException
* If any of the member thread is interrupted while waiting for other threads to join, all
* the member threads will report {@link InterruptedException}.
*/
public void synchronizeEnd(Queue.Executable executable, Throwable problems, long duration) throws InterruptedException {
public void synchronizeEnd(Executor e, Queue.Executable executable, Throwable problems, long duration) throws InterruptedException {
endLatch.synchronize();

// the main thread will send a notification
Executor e = Executor.currentExecutor();
WorkUnit wu = e.getCurrentWorkUnit();
if (wu.isMainWork()) {
if (problems == null) {
Expand Down
99 changes: 99 additions & 0 deletions core/src/main/java/jenkins/model/queue/Executable2.java
@@ -0,0 +1,99 @@
/*
* The MIT License
*
* Copyright 2015 Jesse Glick.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package jenkins.model.queue;

import hudson.model.Executor;
import hudson.model.ExecutorListener;
import hudson.model.OneOffExecutor;
import hudson.model.Queue;
import hudson.model.Resource;
import hudson.model.ResourceActivity;
import hudson.model.ResourceController;
import hudson.model.ResourceList;
import javax.annotation.CheckForNull;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;

/**
* Extended interface for running tasks with some additional logic.
* @since TODO
*/
public interface Executable2 extends Queue.Executable {

/**
* {@inheritDoc}
* @throws AsynchronousExecution if you would like to continue without consuming a thread
*/
@Override void run() throws AsynchronousExecution;

/**
* Special means of indicating that an executable will proceed in the background without consuming a native thread ({@link Executor}).
* May be thrown from {@link Executable2#run} after doing any preparatory work synchronously.
* <p>{@link Executor#isActive} will remain true (even though {@link Executor#isAlive} is not) until {@link #completed} is called.
* The thrower will need to hold on to a reference to this instance as a handle to call {@link #completed}.
* <p>The execution may not extend into another Jenkins session; if you wish to model a long-running execution, you must schedule a new task after restart.
* This class is not serializable anyway.
* <p>Mainly intended for use with {@link OneOffExecutor} (from a {@link hudson.model.Queue.FlyweightTask}), of which there could be many,
* but could also be used with a heavyweight executor even though the number of executors is bounded by node configuration.
* <p>{@link ResourceController}/{@link ResourceActivity}/{@link ResourceList}/{@link Resource} are not currently supported.
* Nor are {@link hudson.model.Queue.Task#getSubTasks} other than the primary task.
*/
abstract class AsynchronousExecution extends RuntimeException {

private Executor executor;

/** Constructor for subclasses. */
protected AsynchronousExecution() {}

/**
* Called in lieu of {@link Thread#interrupt} by {@link Executor#interrupt()} and its overloads.
* As with the standard Java method, you are requested to cease work as soon as possible, but there is no enforcement of this.
* You might also want to call {@link Executor#recordCauseOfInterruption} on {@link #getExecutor}.
*/
public abstract void interrupt();

/**
* Obtains the associated executor.
*/
public final Executor getExecutor() {
return executor;
}

@Restricted(NoExternalUse.class)
public final void setExecutor(Executor executor) {
this.executor = executor;
}

/**
* To be called when the task is actually complete.
* @param error normally null (preferable to handle errors yourself), but may be specified to simulate an exception from {@link Executable2#run}, as per {@link ExecutorListener#taskCompletedWithProblems}
*/
public final void completed(@CheckForNull Throwable error) {
executor.completedAsynchronous(error);
}

}

}

0 comments on commit 46a900f

Please sign in to comment.