Skip to content

Commit

Permalink
Merge pull request #25 from jenkinsci/parallel-errors
Browse files Browse the repository at this point in the history
Fix [JENKINS-38089] which is caused by one specific edge case with incomplete parallels
  • Loading branch information
svanoort committed Nov 29, 2016
2 parents e44f506 + 21ada24 commit 7a3bd90
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 3 deletions.
Expand Up @@ -327,7 +327,9 @@ ArrayDeque<ParallelBlockStart> leastCommonAncestor(@Nonnull final Set<FlowNode>
}

// Walk through, merging flownodes one-by-one until everything has merged to one ancestor
while (iterators.size() > 1) {
boolean mergedAll = false;
// Ends when we merged all branches together, or hit the start of the flow without it
while (!mergedAll && iterators.size() > 0) {
ListIterator<Filterator<FlowNode>> itIterator = iterators.listIterator();
ListIterator<FlowPiece> pieceIterator = livePieces.listIterator();

Expand Down Expand Up @@ -374,6 +376,9 @@ ArrayDeque<ParallelBlockStart> leastCommonAncestor(@Nonnull final Set<FlowNode>
// Merging removes the piece & its iterator from heads
itIterator.remove();
pieceIterator.remove();
if (iterators.size() == 1) { // Merged in the final branch
mergedAll = true;
}
}
}
}
Expand All @@ -386,6 +391,7 @@ ArrayDeque<ParallelBlockStart> leastCommonAncestor(@Nonnull final Set<FlowNode>
protected void setHeads(@Nonnull Collection<FlowNode> heads) {
if (heads.size() > 1) {
parallelBlockStartStack = leastCommonAncestor(new LinkedHashSet<FlowNode>(heads));
assert parallelBlockStartStack.size() > 0;
currentParallelStart = parallelBlockStartStack.pop();
currentParallelStartNode = currentParallelStart.forkStart;
myCurrent = currentParallelStart.unvisited.pop();
Expand Down
Expand Up @@ -33,7 +33,6 @@
import org.jenkinsci.plugins.workflow.cps.steps.ParallelStep;
import org.jenkinsci.plugins.workflow.flow.FlowExecution;
import org.jenkinsci.plugins.workflow.graph.BlockStartNode;
import org.jenkinsci.plugins.workflow.graph.FlowGraphWalker;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
Expand All @@ -49,7 +48,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -406,6 +404,74 @@ public void testLeastCommonAncestor() throws Exception {
Assert.assertEquals(2, starts.size());
}

@Test
public void testVariousParallelCombos() throws Exception {
WorkflowJob job = r.jenkins.createProject(WorkflowJob.class, "ParallelTimingBug");
job.setDefinition(new CpsFlowDefinition(
"stage 'test' \n" +
" parallel([\n" + // ID 5 is start
" 'unit': {\n" +
" retry(1) {\n" +
" sleep 1;\n" +
" sleep 10; echo 'hello'; \n" +
" }\n" +
" },\n" +
" 'otherunit': {\n" +
" retry(1) {\n" +
" sleep 1;\n" +
" sleep 5; \n" +
" echo 'goodbye' \n" +
" }\n" +
" }\n" + // end of branch:
" ])\n"
));
/*Node dump follows, format:
[ID]{parent,ids}(millisSinceStartOfRun) flowNodeClassName stepDisplayName [st=startId if a block end node]
Action format:
- actionClassName actionDisplayName
------------------------------------------------------------------------------------------
[2]{}FlowStartNode Start of Pipeline
[3]{2}StepAtomNode test
[4]{3}StepStartNode Execute in parallel : Start
[6]{4}StepStartNode Branch: unit
[7]{4}StepStartNode Branch: otherunit
A [8]{6}StepStartNode Retry the body up to N times : Start
A [9]{8}StepStartNode Retry the body up to N times : Body : Start
B [10]{7}StepStartNode Retry the body up to N times : Start
B [11]{10}StepStartNode Retry the body up to N times : Body : Start
A [12]{9}StepAtomNode Sleep
B [13]{11}StepAtomNode Sleep
A [14]{12}StepAtomNode Sleep
B [15]{13}StepAtomNode Sleep
B [16]{15}StepAtomNode Print Message
B [17]{16}StepEndNode Retry the body up to N times : Body : End [st=11]
B [18]{17}StepEndNode Retry the body up to N times : End [st=10]
B [19]{18}StepEndNode Execute in parallel : Body : End [st=7]
A [20]{14}StepAtomNode Print Message
A [21]{20}StepEndNode Retry the body up to N times : Body : End [st=9]
A [22]{21}StepEndNode Retry the body up to N times : End [st=8]
A [23]{22}StepEndNode Execute in parallel : Body : End [st=6]
[24]{23,19}StepEndNode Execute in parallel : End [st=4]
[25]{24}FlowEndNode End of Pipeline [st=2]*/
WorkflowRun b = r.assertBuildStatusSuccess(job.scheduleBuild2(0));
FlowExecution exec = b.getExecution();
ForkScanner scan = new ForkScanner();

// Test different start points in branch A & B, 20 and 19 were one error case.
for (int i=0; i < 4; i++) {
for (int j=0; j<5; j++) {
int branchANodeId = i+20;
int branchBNodeId = j+15;
System.out.println("Starting test with nodes "+branchANodeId+","+branchBNodeId);
ArrayList<FlowNode> starts = new ArrayList<FlowNode>();
FlowTestUtils.addNodesById(starts, exec, branchANodeId, branchBNodeId);
List<FlowNode> all = scan.filteredNodes(starts, Predicates.<FlowNode>alwaysTrue());
Assert.assertEquals(new HashSet<FlowNode>(all).size(), all.size());
scan.reset();
}
}
}

/** For nodes, see {@link #SIMPLE_PARALLEL_RUN} */
@Test
public void testSimpleVisitor() throws Exception {
Expand Down

0 comments on commit 7a3bd90

Please sign in to comment.