Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
[JENKINS-25938] Starting to use AsynchronousExecutable.
Browse files Browse the repository at this point in the history
  • Loading branch information
jglick committed Mar 20, 2015
1 parent aa937d3 commit ee2ca39
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 36 deletions.
Expand Up @@ -26,6 +26,7 @@

import com.google.common.base.Function;
import hudson.model.Computer;
import hudson.model.Executor;
import hudson.model.Node;
import hudson.model.ParametersAction;
import hudson.model.ParametersDefinitionProperty;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void evaluate() throws Throwable {
startBuilding();
waitForWorkflowToSuspend();
assertTrue(b.isBuilding());
assertFalse(jenkins().toComputer().isIdle());
liveness();
}
});
story.addStep(new Statement() {
Expand All @@ -118,14 +119,22 @@ public void evaluate() throws Throwable {
for (int i = 0; i < 600 && !Queue.getInstance().isEmpty(); i++) {
Thread.sleep(100);
}
assertFalse(jenkins().toComputer().isIdle());
liveness();
FileUtils.write(new File(jenkins().getRootDir(), "touch"), "I'm here");
watchDescriptor.watchUpdate();
waitForWorkflowToComplete();
assertBuildCompletedSuccessfully();
}
});
}
private void liveness() {
assertFalse(jenkins().toComputer().isIdle());
Executor e = b.getOneOffExecutor();
assertNotNull(e);
assertEquals(e, b.getExecutor());
assertTrue(e.isActive());
assertFalse(e.isAlive());
}

/**
* Workflow captures a stateful object, and we verify that it survives the restart
Expand Down
Expand Up @@ -63,7 +63,10 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckForNull;
Expand All @@ -73,6 +76,8 @@
import jenkins.model.Jenkins;
import jenkins.model.lazy.BuildReference;
import jenkins.model.lazy.LazyBuildMixIn;
import jenkins.model.queue.Executable2;
import jenkins.util.Timer;
import org.jenkinsci.plugins.workflow.actions.LogAction;
import org.jenkinsci.plugins.workflow.flow.FlowDefinition;
import org.jenkinsci.plugins.workflow.flow.FlowExecution;
Expand All @@ -89,7 +94,7 @@

@SuppressWarnings("SynchronizeOnNonFinalField")
@edu.umd.cs.findbugs.annotations.SuppressWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER") // completed is an unusual usage
public final class WorkflowRun extends Run<WorkflowJob,WorkflowRun> implements Queue.Executable, LazyBuildMixIn.LazyLoadingRun<WorkflowJob,WorkflowRun> {
public final class WorkflowRun extends Run<WorkflowJob,WorkflowRun> implements Executable2, LazyBuildMixIn.LazyLoadingRun<WorkflowJob,WorkflowRun> {

private static final Logger LOGGER = Logger.getLogger(WorkflowRun.class.getName());

Expand All @@ -108,8 +113,6 @@ public final class WorkflowRun extends Run<WorkflowJob,WorkflowRun> implements Q
};
private transient StreamBuildListener listener;
private transient AtomicBoolean completed;
/** Jenkins instance in effect when {@link #waitForCompletion} was last called. */
private transient Jenkins jenkins;
/** map from node IDs to log positions from which we should copy text */
private Map<String,Long> logsToCopy;

Expand Down Expand Up @@ -158,8 +161,7 @@ public WorkflowRun(WorkflowJob job, File dir) throws IOException {
*/
@Override public void run() {
if (!firstTime) {
waitForCompletion();
return;
throw sleep();
}
// Some code here copied from execute(RunExecution), but subsequently modified quite a bit.
try {
Expand All @@ -183,7 +185,6 @@ public WorkflowRun(WorkflowJob job, File dir) throws IOException {
logsToCopy = new LinkedHashMap<String,Long>();
execution.start();
executionPromise.set(execution);
waitForCompletion();
} catch (Throwable x) {
if (listener == null) {
LOGGER.log(Level.WARNING, this + " failed to start", x);
Expand All @@ -193,38 +194,42 @@ public WorkflowRun(WorkflowJob job, File dir) throws IOException {
setResult(Result.FAILURE);
executionPromise.setException(x);
}
throw sleep();
}

/**
* Sleeps until the run is finished, updating log messages periodically.
*/
void waitForCompletion() {
jenkins = Jenkins.getInstance();
synchronized (completed) {
while (!completed.get()) {
if (jenkins == null || jenkins.isTerminating()) {
LOGGER.log(Level.FINE, "shutting down, breaking waitForCompletion on {0}", this);
// Stop writing content, in case a new set of objects gets loaded after in-VM restart and starts writing to the same file:
listener.closeQuietly();
listener = new StreamBuildListener(new NullStream());
break;
}
private AsynchronousExecution sleep() {
final AsynchronousExecution asynchronousExecution = new AsynchronousExecution() {
@Override public void interrupt() {
try {
completed.wait(1000);
} catch (InterruptedException x) {
try {
execution.interrupt(Result.ABORTED);
} catch (Exception x2) {
LOGGER.log(Level.WARNING, null, x2);
execution.interrupt(Result.ABORTED);
} catch (Exception x2) {
LOGGER.log(Level.WARNING, null, x2);
}
getExecutor().recordCauseOfInterruption(WorkflowRun.this, listener);
}
};
final AtomicReference<ScheduledFuture<?>> copyLogsTask = new AtomicReference<ScheduledFuture<?>>();
copyLogsTask.set(Timer.get().scheduleAtFixedRate(new Runnable() {
@Override public void run() {
synchronized (completed) {
if (completed.get()) {
asynchronousExecution.completed(null);
copyLogsTask.get().cancel(false);
return;
}
Executor exec = Executor.currentExecutor();
if (exec != null) {
exec.recordCauseOfInterruption(this, listener);
Jenkins jenkins = Jenkins.getInstance();
if (jenkins == null || jenkins.isTerminating()) {
LOGGER.log(Level.FINE, "shutting down, breaking waitForCompletion on {0}", this);
// Stop writing content, in case a new set of objects gets loaded after in-VM restart and starts writing to the same file:
listener.closeQuietly();
listener = new StreamBuildListener(new NullStream());
return;
}
copyLogs();
}
copyLogs();
}
}
}, 1, 1, TimeUnit.SECONDS));
return asynchronousExecution;
}

@GuardedBy("completed")
Expand Down Expand Up @@ -421,7 +426,6 @@ private void finish(Result r) {
assert completed != null;
synchronized (completed) {
completed.set(true);
completed.notifyAll();
}
FlowExecutionList.get().unregister(execution.getOwner());
}
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.kohsuke.accmod.restrictions.NoExternalUse;

import static java.util.logging.Level.*;
import jenkins.model.queue.Executable2;

/**
* @author Kohsuke Kawaguchi
Expand Down Expand Up @@ -364,7 +365,7 @@ private static final class Callback extends BodyExecutionCallback {
/**
* Occupies {@link Executor} while workflow uses this slave.
*/
private final class PlaceholderExecutable implements ContinuableExecutable {
private final class PlaceholderExecutable implements ContinuableExecutable, Executable2 {

private static final String COOKIE_VAR = "JENKINS_SERVER_COOKIE";

Expand Down Expand Up @@ -423,7 +424,7 @@ private final class PlaceholderExecutable implements ContinuableExecutable {
while (runningTasks.containsKey(cookie)) {
LOGGER.log(FINE, "waiting on {0}", cookie);
try {
runningTasks.wait();
runningTasks.wait(); // TODO rather throw AsynchronousExecution
} catch (InterruptedException x) {
if (Jenkins.getInstance() != null) {
LOGGER.log(FINE, "interrupted {0} as by Executor.doStop", cookie);
Expand Down

0 comments on commit ee2ca39

Please sign in to comment.