Skip to content

Commit

Permalink
[FIXED JENKINS-34637] CpsBodyExecution.cancel was failing to interrup…
Browse files Browse the repository at this point in the history
…t the innermost execution, and block-scoped StepExecution.stop does not generally kill its body (JENKINS-26148).

getCurrentExecutions was also in direct violation of its Javadoc, though it does not appear to have ever been called, much less tested.
  • Loading branch information
jglick committed Oct 18, 2016
1 parent 72bba7f commit 6933a49
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 23 deletions.
11 changes: 5 additions & 6 deletions pom.xml
Expand Up @@ -28,8 +28,8 @@
<parent>
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>plugin</artifactId>
<version>2.15</version>
<relativePath />
<version>2.17</version>
<relativePath/>
</parent>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-cps</artifactId>
Expand Down Expand Up @@ -63,7 +63,6 @@
</pluginRepositories>
<properties>
<jenkins.version>1.642.3</jenkins.version>
<jenkins-test-harness.version>2.15</jenkins-test-harness.version>
<npm.loglevel>--silent</npm.loglevel>
</properties>
<dependencies>
Expand All @@ -80,7 +79,7 @@
<dependency>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-support</artifactId>
<version>2.6</version>
<version>2.9-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down Expand Up @@ -109,9 +108,9 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-support</artifactId>
<version>1.15</version>
<version>2.9-SNAPSHOT</version> <!-- TODO https://github.com/jenkinsci/workflow-support-plugin/pull/20 -->
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
Expand Down
Expand Up @@ -8,9 +8,11 @@
import com.cloudbees.groovy.cps.impl.FunctionCallEnv;
import com.cloudbees.groovy.cps.impl.TryBlockEnv;
import com.cloudbees.groovy.cps.sandbox.SandboxInvoker;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import hudson.model.Action;
import hudson.model.Result;
import hudson.util.Iterators;
import jenkins.model.CauseOfInterruption;
import org.jenkinsci.plugins.workflow.actions.BodyInvocationAction;
import org.jenkinsci.plugins.workflow.actions.ErrorAction;
Expand All @@ -25,9 +27,12 @@

import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -36,6 +41,8 @@
import static java.util.logging.Level.*;
import javax.annotation.Nonnull;
import static org.jenkinsci.plugins.workflow.cps.persistence.PersistenceContext.*;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.graphanalysis.LinearBlockHoppingScanner;

/**
* {@link BodyExecution} impl for CPS.
Expand Down Expand Up @@ -172,29 +179,63 @@ private Continuable createContinuable(CpsThread currentThread, CpsCallableInvoca
}

@Override
public synchronized Collection<StepExecution> getCurrentExecutions() {
public Collection<StepExecution> getCurrentExecutions() {
if (thread==null) return Collections.emptyList();

StepExecution s = thread.getStep();
if (s!=null) return Collections.singleton(s);
else return Collections.emptyList();
List<StepExecution> executions = new ArrayList<>();
// cf. trick in CpsFlowExecution.getCurrentExecutions(true)
Map<FlowHead, CpsThread> m = new LinkedHashMap<>();
// TODO access to CpsThreadGroup.threads should be restricted to the CPS VM thread, but the API signature does not allow us to return a promise or throw InterruptedException
for (CpsThread t : thread.group.threads.values()) {
m.put(t.head, t);
}
for (CpsThread t : m.values()) {
// TODO seems cumbersome to have to go through the flow graph to find out whether a head is a descendant of ours, yet FlowHead does not seem to retain a parent field
LinearBlockHoppingScanner scanner = new LinearBlockHoppingScanner();
scanner.setup(t.head.get());
for (FlowNode node : scanner) {
if (node.getId().equals(startNodeId)) {
// this head is inside this body execution
StepExecution execution = t.getStep();
if (execution != null) {
executions.add(execution);
}
break;
}
}
}
return executions;
}

@Override
public boolean cancel(final CauseOfInterruption... causes) {
// 'stopped' and 'thread' are updated atomically
final CpsThread t;
CpsThread t;
synchronized (this) {
if (isDone()) return false; // already complete
stopped = new FlowInterruptedException(Result.ABORTED, causes); // TODO: the fact that I'm hard-coding exception seems to indicate an abstraction leak. Come back and think about this.
// TODO should perhaps rather override cancel(Throwable) and make this overload just delegate to that one
stopped = new FlowInterruptedException(Result.ABORTED, causes);
t = this.thread;
}

if (t!=null) {
t.getExecution().runInCpsVmThread(new FutureCallback<CpsThreadGroup>() {
@Override
public void onSuccess(CpsThreadGroup g) {
t.stop(stopped);
// Similar to getCurrentExecutions but we want the raw CpsThread, not a StepExecution; cf. CpsFlowExecution.interrupt
Map<FlowHead, CpsThread> m = new LinkedHashMap<>();
for (CpsThread t : thread.group.threads.values()) {
m.put(t.head, t);
}
for (CpsThread t : Iterators.reverse(ImmutableList.copyOf(m.values()))) {
LinearBlockHoppingScanner scanner = new LinearBlockHoppingScanner();
scanner.setup(t.head.get());
for (FlowNode node : scanner) {
if (node.getId().equals(startNodeId)) {
t.stop(stopped);
break;
}
}
}
}

@Override
Expand Down
Expand Up @@ -408,6 +408,8 @@ public void onSuccess(CpsThreadGroup g) {
FlowInterruptedException cause = new FlowInterruptedException(Result.FAILURE, new BodyFailed());
cause.initCause(getOutcome().getAbnormal());
try {
// TODO JENKINS-26148/JENKINS-34637 this is probably wrong: should interrupt the innermost execution
// (the “next” one could be block-scoped, and we would want to interrupt all parallel heads)
s.stop(cause);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Failed to stop the body execution in response to the failure of the parent");
Expand Down
@@ -1,8 +1,18 @@
package org.jenkinsci.plugins.workflow.cps;

import com.google.common.base.Function;
import com.google.common.collect.Sets;
import groovy.lang.Closure;
import hudson.model.Result;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nonnull;
import org.codehaus.groovy.runtime.ScriptBytecodeAdapter;
import org.hamcrest.Matchers;
import org.jenkinsci.plugins.workflow.actions.ErrorAction;
import org.jenkinsci.plugins.workflow.cps.nodes.StepEndNode;
import org.jenkinsci.plugins.workflow.cps.nodes.StepNode;
Expand All @@ -13,22 +23,25 @@
import org.jenkinsci.plugins.workflow.steps.AbstractStepDescriptorImpl;
import org.jenkinsci.plugins.workflow.steps.AbstractStepExecutionImpl;
import org.jenkinsci.plugins.workflow.steps.AbstractStepImpl;
import org.jenkinsci.plugins.workflow.steps.BodyExecution;
import org.jenkinsci.plugins.workflow.steps.BodyExecutionCallback;
import org.jenkinsci.plugins.workflow.steps.StepExecution;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import static org.junit.Assert.*;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.jvnet.hudson.test.BuildWatcher;
import org.jvnet.hudson.test.Issue;
import org.jvnet.hudson.test.JenkinsRule;
import org.jvnet.hudson.test.TestExtension;
import org.kohsuke.stapler.DataBoundConstructor;

import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class CpsBodyExecutionTest {

import static org.junit.Assert.assertEquals;
@ClassRule public static BuildWatcher buildWatcher = new BuildWatcher();
@Rule public JenkinsRule jenkins = new JenkinsRule();

/**
* @author Kohsuke Kawaguchi
*/
public class CpsBodyExecutionTest extends AbstractCpsFlowTest {
/**
* When the body of a step is synchronous and explodes, the failure should be recorded and the pipeline job
* should move on.
Expand Down Expand Up @@ -121,4 +134,64 @@ public void stop(@Nonnull Throwable cause) throws Exception {

}

@Issue("JENKINS-34637")
@Test public void currentExecutions() throws Exception {
WorkflowJob p = jenkins.createProject(WorkflowJob.class);
p.setDefinition(new CpsFlowDefinition("parallel main: {retainsBody {parallel a: {retainsBody {semaphore 'a'}}, b: {retainsBody {semaphore 'b'}}}}, aside: {semaphore 'c'}", true));
WorkflowRun b = p.scheduleBuild2(0).waitForStart();
SemaphoreStep.waitForStart("a/1", b);
SemaphoreStep.waitForStart("b/1", b);
SemaphoreStep.waitForStart("c/1", b);
final RetainsBodyStep.Execution[] execs = new RetainsBodyStep.Execution[3];
StepExecution.applyAll(RetainsBodyStep.Execution.class, new Function<RetainsBodyStep.Execution, Void>() {
@Override public Void apply(RetainsBodyStep.Execution exec) {
execs[exec.count] = exec;
return null;
}
}).get();
assertNotNull(execs[0]);
assertNotNull(execs[1]);
assertNotNull(execs[2]);
final Set<SemaphoreStep.Execution> semaphores = new HashSet<>();
StepExecution.applyAll(SemaphoreStep.Execution.class, new Function<SemaphoreStep.Execution, Void>() {
@Override public Void apply(SemaphoreStep.Execution exec) {
if (exec.getStatus().matches("waiting on [ab]/1")) {
semaphores.add(exec);
}
return null;
}
}).get();
assertThat(semaphores, Matchers.<SemaphoreStep.Execution>iterableWithSize(2));
Collection<StepExecution> currentExecutions1 = execs[1].body.getCurrentExecutions(); // A or B, does not matter
assertThat(/* irritatingly, iterableWithSize does not show the collection in its mismatch message */currentExecutions1.toString(),
currentExecutions1, Matchers.<StepExecution>iterableWithSize(1));
Collection<StepExecution> currentExecutions2 = execs[2].body.getCurrentExecutions();
assertThat(currentExecutions2, Matchers.<StepExecution>iterableWithSize(1));
assertEquals(semaphores, Sets.union(Sets.newLinkedHashSet(currentExecutions1), Sets.newLinkedHashSet(currentExecutions2)));
assertEquals(semaphores, Sets.newLinkedHashSet(execs[0].body.getCurrentExecutions())); // the top-level one
execs[0].body.cancel();
SemaphoreStep.success("c/1", null);
jenkins.assertBuildStatus(Result.ABORTED, jenkins.waitForCompletion(b));
}
public static class RetainsBodyStep extends AbstractStepImpl {
@DataBoundConstructor public RetainsBodyStep() {}
@TestExtension("currentExecutions") public static class DescriptorImpl extends AbstractStepDescriptorImpl {
public DescriptorImpl() {super(Execution.class);}
@Override public String getFunctionName() {return "retainsBody";}
@Override public boolean takesImplicitBlockArgument() {return true;}
}
public static class Execution extends AbstractStepExecutionImpl {
static int counter;
BodyExecution body;
int count = counter++;
@Override public boolean start() throws Exception {
body = getContext().newBodyInvoker().withCallback(BodyExecutionCallback.wrap(getContext())).start();
return false;
}
@Override public void stop(Throwable cause) throws Exception {
throw new AssertionError("block #" + count + " not supposed to be killed directly", cause);
}
}
}

}

0 comments on commit 6933a49

Please sign in to comment.