Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
[JENKINS-25504] Async step should continue executing until all the bo…
Browse files Browse the repository at this point in the history
…dies are done and the outcome is set.

Previously, as soon as an outcome is set the step was considered done, even when the body was running.
This corrupts the flow graph as multiple CpsThreads collide on trying to update the same FlowHead.
  • Loading branch information
kohsuke committed Nov 19, 2014
1 parent 4076fe3 commit 1e843f1
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 6 deletions.
Expand Up @@ -269,6 +269,7 @@ private void setOutcome(Outcome o) {
this.outcome = o;
notifyAll(); // wake up everyone waiting for the outcome.
}
context.incrementEndedBodies();
context.saveState();
}

Expand Down
Expand Up @@ -134,13 +134,15 @@ public CpsBodyExecution start() {
throw new IllegalStateException("Can't specify Actions if there will be no StepStartNode");
}

if (owner.isCompleted()) {
if (owner.isOutcomeSet()) {
// if this step is already done, no further body invocations can happen doing so will end up
// causing two CpsThreads competing on the same FlowHead.
// if this restriction ever needs to be lifted, the newly launched body will have to run in a separate thread.
throw new IllegalStateException("The " + owner.getStepDescriptor().getFunctionName() + " step has already completed.");
}

owner.incrementStartedBodies();

if (owner.isSyncMode()) {
// we call 'launch' later from DSL.ThreadTaskImpl.
// in this mode, the first thread inherits the same thread, but
Expand Down
Expand Up @@ -79,6 +79,33 @@
* graph. Wherever we need {@link CpsFlowExecution} we do that by following {@link FlowExecutionOwner}, and
* when we need pointers to individual objects inside, we use IDs (such as {@link #id}}.
*
*
* <h2>State Transitions</h2>
* <p>
* A step execution goes through the following state transition.
*
* <pre>
*
* INITIAL STATE ----> StepExecution.start() ---> done synchronously?
* |
* +--(Yes)--> DONE STATE
* |
* +--(No)--+
* |
* +----------------------------------------------------------+
* |
* |
* | || ||
* | ||----> body executions ----+---->||
* | || ^ | ||
* +--> ASYNC EXEC STATE --|| | | ||----> DONE STATE
* || +------------+ ||
* || ||
* || ||
* ||----> outcome set ------------->||
* || ||
* </pre>
*
* @author Kohsuke Kawaguchi
* @see Step#start(StepContext)
*/
Expand All @@ -89,7 +116,7 @@ public class CpsStepContext extends DefaultStepContext { // TODO add XStream cla
private static final Logger LOGGER = Logger.getLogger(CpsStepContext.class.getName());

@GuardedBy("this")
private transient Outcome outcome;
private Outcome outcome;

// see class javadoc.
// transient because if it's serialized and deserialized, it should come back in the async mode.
Expand Down Expand Up @@ -129,6 +156,11 @@ public class CpsStepContext extends DefaultStepContext { // TODO add XStream cla
*/
final Map<Integer,String> bodyInvHeads = new TreeMap<Integer,String>();

@GuardedBy("this")
private int startedBodies;
@GuardedBy("this")
private int endedBodies;

/**
* If the invocation of the body is requested, this object remembers how to start it.
*
Expand Down Expand Up @@ -276,15 +308,15 @@ protected <T> T doGet(Class<T> key) throws IOException, InterruptedException {
public synchronized void onFailure(Throwable t) {
if (t==null)
throw new IllegalArgumentException();
if (isCompleted())
if (isOutcomeSet())
throw new IllegalStateException("Already completed", t);
this.outcome = new Outcome(null,t);

scheduleNextRun();
}

public synchronized void onSuccess(Object returnValue) {
if (isCompleted())
if (isOutcomeSet())
throw new IllegalStateException("Already completed");
this.outcome = new Outcome(returnValue,null);

Expand All @@ -301,6 +333,13 @@ private void scheduleNextRun() {
return;
}

if (!isCompleted()) {
// two move on to the next step we need both the outcome be set and all the bodies done.
// this method gest called every time situation changes on both fronts, so if we aren't ready yet
// this method will be called back again.
return;
}

try {
final FlowNode n = getNode();
final CpsFlowExecution flow = getFlowExecution();
Expand Down Expand Up @@ -356,10 +395,20 @@ public void setResult(Result r) {
return (CpsFlowExecution)executionRef.get();
}

synchronized boolean isCompleted() {
/**
* This {@link StepContext} has received {@link FutureCallback} calls to set the result.
*/
synchronized boolean isOutcomeSet() {
return outcome!=null;
}

/**
* {@linkplain #isOutcomeSet() the outcome is set} as well as all the body invocations have completed.
*/
synchronized boolean isCompleted() {
return isOutcomeSet() && startedBodies==endedBodies;
}

synchronized boolean isSyncMode() {
return syncMode;
}
Expand Down Expand Up @@ -397,7 +446,7 @@ synchronized Outcome getOutcome() {
synchronized boolean switchToAsyncMode() {
if (!syncMode) throw new AssertionError();
syncMode = false;
return !isCompleted();
return !isOutcomeSet();
}

@Override public ListenableFuture<Void> saveState() {
Expand All @@ -423,6 +472,24 @@ synchronized boolean switchToAsyncMode() {
}
}

/**
* Called whenever new {@linkplain CpsBodyInvoker#start() a new body gets started}.
*/
/*package*/ synchronized void incrementStartedBodies() {
startedBodies++;
}

/**
* Called whenever the body execution completes
* {@linkplain CpsBodyExecution#onSuccess successfully} or
* {@linkplain CpsBodyExecution#onFailure otherwise}.
*/
/*package*/ synchronized void incrementEndedBodies() {
endedBodies++;
// do we have both the result set and the bodies all done?
scheduleNextRun();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down

0 comments on commit 1e843f1

Please sign in to comment.