Navigation Menu

Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
Merge pull request #88 from jtnord/failFast
Browse files Browse the repository at this point in the history
[JENKINS-26034] add option to fail fast on parallel branch failure.
  • Loading branch information
jglick committed Mar 13, 2015
2 parents f2b787c + 52189f1 commit 841d231
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Expand Up @@ -5,6 +5,7 @@ Only noting significant user-visible or major API changes, not internal code cle
## 1.4 (upcoming)

### User changes
* JENKINS-26034: added `failFast` option to the `parallel` step.
* JENKINS-26085: added `credentialsId` to the `git` step.
* JENKINS-26121: record the approver of an `input` step in build history.
* JENKINS-26122: Prepend `parallel` step execution logs with the branch label.
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.jenkinsci.plugins.workflow.support.visualization.table.FlowGraphTable;
import org.jenkinsci.plugins.workflow.support.visualization.table.FlowGraphTable.Row;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runners.model.Statement;
import org.jvnet.hudson.test.Issue;
Expand Down Expand Up @@ -80,7 +81,7 @@ public void failure_in_subflow_will_cause_join_to_fail() throws Exception {
" b: { error 'died' },",

// make sure this branch takes longer than a
" a: { sleep 3; writeFile text: '', file: 'b.done' }",
" a: { sleep 3; writeFile text: '', file: 'a.done' }",
" )",
" assert false;",
" } catch (ParallelStepException e) {",
Expand All @@ -92,14 +93,84 @@ public void failure_in_subflow_will_cause_join_to_fail() throws Exception {

startBuilding().get();
assertBuildCompletedSuccessfully();
assert jenkins().getWorkspaceFor(p).child("b.done").exists();
assert jenkins().getWorkspaceFor(p).child("a.done").exists();

buildTable();
shouldHaveParallelStepsInTheOrder("b","a");
}
});
}


/**
* Failure in a branch will cause the join to fail.
*/
@Test @Issue("JENKINS-26034")
public void failure_in_subflow_will_fail_fast() throws Exception {
story.addStep(new Statement() {
@Override public void evaluate() throws Throwable {
p = jenkins().createProject(WorkflowJob.class, "demo");
p.setDefinition(new CpsFlowDefinition(join(
"import "+AbortException.class.getName(),
"import "+ParallelStepException.class.getName(),

"node {",
" try {",
" parallel(",
" b: { error 'died' },",

// make sure this branch takes longer than a
" a: { sleep 25; writeFile text: '', file: 'a.done' },",
" failFast: true",
" )",
" assert false",
" } catch (ParallelStepException e) {",
" echo e.toString()",
" assert e.name=='b'",
" assert e.cause instanceof AbortException",
" }",
"}"
)));

startBuilding().get();
assertBuildCompletedSuccessfully();
Assert.assertFalse("a should have aborted", jenkins().getWorkspaceFor(p).child("a.done").exists());

}
});
}

/**
* FailFast should not kill branches if there is no failure.
*/
@Test @Issue("JENKINS-26034")
public void failFast_has_no_effect_on_suceess() throws Exception {
story.addStep(new Statement() {
@Override public void evaluate() throws Throwable {
p = jenkins().createProject(WorkflowJob.class, "demo");
p.setDefinition(new CpsFlowDefinition(join(
"import "+AbortException.class.getName(),
"import "+ParallelStepException.class.getName(),

"node {",
" parallel(",
" a: { echo 'hello from a';sleep 1;echo 'goodbye from a' },",
" b: { echo 'hello from b';sleep 1;echo 'goodbye from b' },",
" c: { echo 'hello from c';sleep 1;echo 'goodbye from c' },",
// make sure this branch is quicker than the others.
" d: { echo 'hello from d' },",
" failFast: true",
" )",
"}"
)));

startBuilding().get();
assertBuildCompletedSuccessfully();
}
});
}


@Test
public void localMethodCallWithinBranch() {
story.addStep(new Statement() {
Expand Down
@@ -1,9 +1,11 @@
package org.jenkinsci.plugins.workflow.cps.steps;

import com.cloudbees.groovy.cps.Outcome;

import groovy.lang.Closure;
import hudson.Extension;
import hudson.model.TaskListener;

import org.jenkinsci.plugins.workflow.cps.CpsVmThreadOnly;
import org.jenkinsci.plugins.workflow.cps.persistence.PersistIn;
import org.jenkinsci.plugins.workflow.steps.BodyExecutionCallback;
Expand All @@ -13,8 +15,10 @@
import org.jenkinsci.plugins.workflow.steps.StepExecution;

import java.io.Serializable;
import java.util.AbstractMap.SimpleEntry;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
Expand All @@ -30,13 +34,18 @@
* @author Kohsuke Kawaguchi
*/
public class ParallelStep extends Step {

/** should a failure in a parallel branch terminate other still executing branches. */
private final boolean failFast;

/**
* All the sub-workflows as {@link Closure}s, keyed by their names.
*/
/*package*/ final Map<String,Closure> closures;

public ParallelStep(Map<String,Closure> closures) {
public ParallelStep(Map<String,Closure> closures, boolean failFast) {
this.closures = closures;
this.failFast = failFast;
}

@Override
Expand All @@ -45,24 +54,46 @@ public StepExecution start(StepContext context) throws Exception {
return new ParallelStepExecution(this, context);
}

/*package*/ boolean isFailFast() {
return failFast;
}


@PersistIn(PROGRAM)
static class ResultHandler implements Serializable {
private final StepContext context;
private final ParallelStepExecution stepExecution;
private final boolean failFast;
/** Have we called stop on the StepExecution? */
private boolean stopSent = false;
/** if we failFast we need to record the first failure */
private SimpleEntry<String,Throwable> originalFailure = null;

/**
* Collect the results of sub-workflows as they complete.
* The key set is fully populated from the beginning.
*/
private final Map<String,Outcome> outcomes = new HashMap<String, Outcome>();

ResultHandler(StepContext context) {
ResultHandler(StepContext context, ParallelStepExecution parallelStepExecution, boolean failFast) {
this.context = context;
this.stepExecution = parallelStepExecution;
this.failFast = failFast;
}

Callback callbackFor(String name) {
outcomes.put(name, null);
return new Callback(this, name);
}

private void stopSent() {
stopSent = true;
}

private boolean isStopSent() {
return stopSent;
}

private static class Callback extends BodyExecutionCallback {

private final ResultHandler handler;
Expand All @@ -76,63 +107,100 @@ private static class Callback extends BodyExecutionCallback {
@Override
public void onSuccess(StepContext context, Object result) {
handler.outcomes.put(name, new Outcome(result, null));
checkAllDone();
checkAllDone(false);
}

@Override
public void onFailure(StepContext context, Throwable t) {
handler.outcomes.put(name, new Outcome(null, t));
checkAllDone();
if (handler.originalFailure == null) {
handler.originalFailure = new SimpleEntry<String, Throwable>(name, t);
}
checkAllDone(true);
}

private void checkAllDone() {
private void checkAllDone(boolean stepFailed) {
Map<String,Object> success = new HashMap<String, Object>();
Entry<String,Outcome> failure = null;
for (Entry<String,Outcome> e : handler.outcomes.entrySet()) {
Outcome o = e.getValue();

if (o==null)
return; // some of the results are not yet ready
if (o==null) {
// some of the results are not yet ready
if (stepFailed && handler.failFast && ! handler.isStopSent()) {
handler.stopSent();
try {
handler.stepExecution.stop(new FailFastException());
}
catch (Exception ignored) {
// ignored.
}
}
return;
}
if (o.isFailure()) {
failure= e;
if (handler.originalFailure == null) {
// in case the plugin is upgraded whilst a parallel step is running
handler.originalFailure = new SimpleEntry<String, Throwable>(e.getKey(), e.getValue().getAbnormal());
}
// recorded in the onFailure
} else {
success.put(e.getKey(), o.getNormal());
}
}

// all done
if (failure!=null) {
if (handler.originalFailure!=null) {
// wrap the exception so that the call stack leading up to parallel is visible
handler.context.onFailure(new ParallelStepException(failure.getKey(), failure.getValue().getAbnormal()));
handler.context.onFailure(new ParallelStepException(handler.originalFailure.getKey(), handler.originalFailure.getValue()));
} else {
handler.context.onSuccess(success);
}
}

private static final long serialVersionUID = 1L;
}

private static final long serialVersionUID = 1L;
}

/** Internal exception that is only used internally to abort a parallel body in the case of a failFast body failing. */
private static final class FailFastException extends Exception {
private static final long serialVersionUID = 1L;
}

@Extension
public static class DescriptorImpl extends StepDescriptor {
private final static String FAIL_FAST_FLAG = "failFast";

@Override
public String getFunctionName() {
return "parallel";
}

@Override
public Step newInstance(Map<String,Object> arguments) {
boolean failFast = false;
Map<String,Closure<?>> closures = new LinkedHashMap<String, Closure<?>>();
for (Entry<String,Object> e : arguments.entrySet()) {
if (!(e.getValue() instanceof Closure))
throw new IllegalArgumentException("Expected a closure but found "+e.getKey()+"="+e.getValue());
if ((e.getValue() instanceof Closure)) {
closures.put(e.getKey(), (Closure<?>)e.getValue());
}
else if (FAIL_FAST_FLAG.equals(e.getKey()) && e.getValue() instanceof Boolean) {
failFast = (Boolean)e.getValue();
}
else {
throw new IllegalArgumentException("Expected a closure or failFast but found "+e.getKey()+"="+e.getValue());
}
}
return new ParallelStep((Map)arguments);
return new ParallelStep((Map)closures, failFast);
}

@Override public Map<String,Object> defineArguments(Step step) throws UnsupportedOperationException {
return new TreeMap<String,Object>(((ParallelStep) step).closures);
ParallelStep ps = (ParallelStep) step;
Map<String,Object> retVal = new TreeMap<String,Object>(ps.closures);
if (ps.failFast) {
retVal.put(FAIL_FAST_FLAG, Boolean.TRUE);
}
return retVal;
}

@Override
Expand Down
Expand Up @@ -39,7 +39,7 @@ public boolean start() throws Exception {
CpsStepContext cps = (CpsStepContext) getContext();
CpsThread t = CpsThread.current();

ResultHandler r = new ResultHandler(cps);
ResultHandler r = new ResultHandler(cps, this, parallelStep.isFailFast());

for (Entry<String,Closure> e : parallelStep.closures.entrySet()) {
BodyExecution body = cps.newBodyInvoker(t.getGroup().export(e.getValue()))
Expand Down
Expand Up @@ -27,12 +27,14 @@ THE SOFTWARE.
<j:jelly xmlns:j="jelly:core" xmlns:f="/lib/form">
<f:block>
<p>
(No visual configuration.) Takes a map from branch names to closures:
(No visual configuration.) Takes a map from branch names to closures and an optional argument <code>failFast</code>
which will terminate all branches upon a failure in any other branch:
</p>
<pre>parallel firstBranch: {
// do something
}, secondBranch: {
// do something else
}</pre>
},
failFast: true|false</pre>
</f:block>
</j:jelly>

0 comments on commit 841d231

Please sign in to comment.