Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
[JENKINS-25504]
Browse files Browse the repository at this point in the history
The previous attempt failed because CpsStepContext can get duplicated because of persistence.

This is a weaker fix that only waits for the "primary" body execution. That is, this doesn't work correctly if a Step is like parallel step that executes multiple bodies at the same time.

That said, the current ParallelStep implementation never reports itself completed until all the bodies check in, so this code should work correctly with all the known Step impls.
  • Loading branch information
kohsuke committed Nov 19, 2014
1 parent e2527ba commit 68de87c
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 1 deletion.
Expand Up @@ -40,9 +40,11 @@
import org.jenkinsci.plugins.workflow.graph.BlockEndNode;
import org.jenkinsci.plugins.workflow.graph.BlockStartNode;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.steps.FlowInterruptedException;
import org.jenkinsci.plugins.workflow.steps.Step;
import org.jenkinsci.plugins.workflow.steps.StepContext;
import org.jenkinsci.plugins.workflow.steps.StepDescriptor;
import org.jenkinsci.plugins.workflow.steps.StepExecution;
import org.jenkinsci.plugins.workflow.support.DefaultStepContext;
import org.jenkinsci.plugins.workflow.support.concurrent.Futures;

Expand Down Expand Up @@ -319,6 +321,33 @@ public void onSuccess(CpsThreadGroup g) {
body = null;
CpsThread thread = getThread(g);
if (thread != null) {
CpsThread nit = thread.getNextInner();
if (nit!=thread) {
// can't mark this done until the inner thread is done.
// defer the processing until the inner thread is done
nit.addCompletionHandler(new FutureCallback<Object>() {
public void onSuccess(Object _) { scheduleNextRun(); }
public void onFailure(Throwable _) { scheduleNextRun(); }
});
if (getOutcome().isFailure()) {
// if the step with a currently running body reported a failure,
// make some effort to try to interrupt the running body
StepExecution s = nit.getStep();
if (s != null) {
// TODO: ideally this needs to work like interrupt, in that
// if s==null the next StepExecution gets interrupted when it happen
FlowInterruptedException cause = new FlowInterruptedException(Result.FAILURE);
cause.initCause(getOutcome().getAbnormal());
try {
s.stop(cause);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Failed to stop the body execution in response to the failure of the parent");
}
}
}
return;
}

if (n instanceof StepStartNode) {
FlowNode tip = thread.head.get();
parents.set(0, tip);
Expand Down
Expand Up @@ -31,10 +31,13 @@
import org.jenkinsci.plugins.workflow.cps.persistence.PersistIn;
import org.jenkinsci.plugins.workflow.steps.StepExecution;

import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.logging.Logger;

Expand Down Expand Up @@ -99,6 +102,11 @@ public final class CpsThread implements Serializable {
*/
private StepExecution step;

/**
* Gets called when the thread is done.
*/
private final List<FutureCallback<Object>> completionHandlers = new ArrayList<FutureCallback<Object>>();

CpsThread(CpsThreadGroup group, int id, Continuable program, FlowHead head, ContextVariableSet contextVariables) {
this.group = group;
this.id = id;
Expand Down Expand Up @@ -209,6 +217,32 @@ boolean isAlive() {
return program.isResumable();
}

@CpsVmThreadOnly
void addCompletionHandler(FutureCallback<Object> h) {
completionHandlers.add(h);
}

@CpsVmThreadOnly
void fireCompletionHandlers(Outcome o) {
for (FutureCallback<Object> h : completionHandlers) {
if (o.isSuccess()) h.onSuccess(o.getNormal());
else h.onFailure(o.getAbnormal());
}
}

/**
* Finds the next younger {@link CpsThread} that shares the same {@link FlowHead}.
*
* Can be {@code this.}
*/
@CheckForNull CpsThread getNextInner() {
for (CpsThread t : group.threads.values()) {
if (t.id <= this.id) continue;
if (t.head==this.head) return t;
}
return null;
}

/**
* Schedules the execution of this thread from the last {@linkplain Continuable#suspend(Object)} point.
*
Expand Down Expand Up @@ -242,5 +276,4 @@ public static CpsThread current() {
// getExecution().getOwner() would be useful but seems problematic.
return "Thread #" + id + String.format(" @%h", this);
}

}
Expand Up @@ -275,6 +275,7 @@ private void run() throws IOException {

if (!t.isAlive()) {
LOGGER.fine("completed " + t);
t.fireCompletionHandlers(o); // do this after ErrorAction is set above

threads.remove(t.id);
if (threads.isEmpty()) {
Expand Down

0 comments on commit 68de87c

Please sign in to comment.