Skip to content

Commit

Permalink
Merge pull request #76 from jglick/timeout-block-JENKINS-34637
Browse files Browse the repository at this point in the history
[JENKINS-34637] Failure to kill bodies from timeout
  • Loading branch information
jglick committed Oct 20, 2016
2 parents b5c8ca8 + da75238 commit bee2879
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 47 deletions.
15 changes: 7 additions & 8 deletions pom.xml
Expand Up @@ -28,8 +28,8 @@
<parent>
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>plugin</artifactId>
<version>2.15</version>
<relativePath />
<version>2.17</version>
<relativePath/>
</parent>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-cps</artifactId>
Expand Down Expand Up @@ -63,7 +63,6 @@
</pluginRepositories>
<properties>
<jenkins.version>1.642.3</jenkins.version>
<jenkins-test-harness.version>2.15</jenkins-test-harness.version>
<npm.loglevel>--silent</npm.loglevel>
</properties>
<dependencies>
Expand All @@ -73,14 +72,14 @@
<version>2.4</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-api</artifactId>
<version>2.3</version>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-support</artifactId>
<version>2.6</version>
<version>2.9</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down Expand Up @@ -109,9 +108,9 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-support</artifactId>
<version>1.15</version>
<version>2.9</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
Expand Down
Expand Up @@ -8,9 +8,12 @@
import com.cloudbees.groovy.cps.impl.FunctionCallEnv;
import com.cloudbees.groovy.cps.impl.TryBlockEnv;
import com.cloudbees.groovy.cps.sandbox.SandboxInvoker;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.SettableFuture;
import hudson.model.Action;
import hudson.model.Result;
import hudson.util.Iterators;
import jenkins.model.CauseOfInterruption;
import org.jenkinsci.plugins.workflow.actions.BodyInvocationAction;
import org.jenkinsci.plugins.workflow.actions.ErrorAction;
Expand All @@ -25,9 +28,12 @@

import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -36,6 +42,8 @@
import static java.util.logging.Level.*;
import javax.annotation.Nonnull;
import static org.jenkinsci.plugins.workflow.cps.persistence.PersistenceContext.*;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.graphanalysis.LinearBlockHoppingScanner;

/**
* {@link BodyExecution} impl for CPS.
Expand Down Expand Up @@ -172,29 +180,86 @@ private Continuable createContinuable(CpsThread currentThread, CpsCallableInvoca
}

@Override
public synchronized Collection<StepExecution> getCurrentExecutions() {
if (thread==null) return Collections.emptyList();

StepExecution s = thread.getStep();
if (s!=null) return Collections.singleton(s);
else return Collections.emptyList();
public Collection<StepExecution> getCurrentExecutions() {
CpsThread t;
synchronized (this) {
t = thread;
if (t == null) {
return Collections.emptySet();
}
}
final SettableFuture<Collection<StepExecution>> result = SettableFuture.create();
t.getExecution().runInCpsVmThread(new FutureCallback<CpsThreadGroup>() {
@Override public void onSuccess(CpsThreadGroup g) {
try {
List<StepExecution> executions = new ArrayList<>();
// cf. trick in CpsFlowExecution.getCurrentExecutions(true)
Map<FlowHead, CpsThread> m = new LinkedHashMap<>();
for (CpsThread t : g.threads.values()) {
m.put(t.head, t);
}
for (CpsThread t : m.values()) {
// TODO seems cumbersome to have to go through the flow graph to find out whether a head is a descendant of ours, yet FlowHead does not seem to retain a parent field
LinearBlockHoppingScanner scanner = new LinearBlockHoppingScanner();
scanner.setup(t.head.get());
for (FlowNode node : scanner) {
if (node.getId().equals(startNodeId)) {
// this head is inside this body execution
StepExecution execution = t.getStep();
if (execution != null) {
executions.add(execution);
}
break;
}
}
}
result.set(executions);
} catch (Exception x) {
result.setException(x);
}
}
@Override public void onFailure(Throwable t) {
result.setException(t);
}
});
try {
return result.get(1, TimeUnit.MINUTES);
} catch (ExecutionException | InterruptedException | TimeoutException x) {
// TODO access to CpsThreadGroup.threads must be restricted to the CPS VM thread, but the API signature does not allow us to return a ListenableFuture or throw checked exceptions
throw new RuntimeException(x);
}
}

@Override
public boolean cancel(final CauseOfInterruption... causes) {
// 'stopped' and 'thread' are updated atomically
final CpsThread t;
CpsThread t;
synchronized (this) {
if (isDone()) return false; // already complete
stopped = new FlowInterruptedException(Result.ABORTED, causes); // TODO: the fact that I'm hard-coding exception seems to indicate an abstraction leak. Come back and think about this.
// TODO should perhaps rather override cancel(Throwable) and make this overload just delegate to that one
stopped = new FlowInterruptedException(Result.ABORTED, causes);
t = this.thread;
}

if (t!=null) {
t.getExecution().runInCpsVmThread(new FutureCallback<CpsThreadGroup>() {
@Override
public void onSuccess(CpsThreadGroup g) {
t.stop(stopped);
// Similar to getCurrentExecutions but we want the raw CpsThread, not a StepExecution; cf. CpsFlowExecution.interrupt
Map<FlowHead, CpsThread> m = new LinkedHashMap<>();
for (CpsThread t : thread.group.threads.values()) {
m.put(t.head, t);
}
for (CpsThread t : Iterators.reverse(ImmutableList.copyOf(m.values()))) {
LinearBlockHoppingScanner scanner = new LinearBlockHoppingScanner();
scanner.setup(t.head.get());
for (FlowNode node : scanner) {
if (node.getId().equals(startNodeId)) {
t.stop(stopped);
break;
}
}
}
}

@Override
Expand Down
Expand Up @@ -61,6 +61,10 @@ public class CpsFlowDefinition extends FlowDefinition {
private final String script;
private final boolean sandbox;

/**
* @deprecated use {@link #CpsFlowDefinition(String, boolean)} instead
*/
@Deprecated
public CpsFlowDefinition(String script) {
this(script, false);
}
Expand Down
Expand Up @@ -65,8 +65,10 @@
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import jenkins.model.CauseOfInterruption;
import jenkins.util.ContextResettingExecutorService;
import org.codehaus.groovy.runtime.InvokerInvocationException;
import org.jenkinsci.plugins.workflow.cps.nodes.StepNode;

import static org.jenkinsci.plugins.workflow.cps.persistence.PersistenceContext.*;

Expand Down Expand Up @@ -98,6 +100,7 @@ public class CpsStepContext extends DefaultStepContext { // TODO add XStream cla

@GuardedBy("this")
private transient Outcome outcome;
private transient Throwable whenOutcomeDelivered;

// see class javadoc.
// transient because if it's serialized and deserialized, it should come back in the async mode.
Expand Down Expand Up @@ -321,23 +324,46 @@ protected <T> T doGet(Class<T> key) throws IOException, InterruptedException {
if (t == null) {
throw new IllegalArgumentException();
}
if (isCompleted()) {
LOGGER.log(Level.WARNING, "already completed " + this, new IllegalStateException(t));
return;
}
this.outcome = new Outcome(null,t);

scheduleNextRun();
completed(new Outcome(null, t));
}

@Override public synchronized void onSuccess(Object returnValue) {
if (isCompleted()) {
LOGGER.log(Level.WARNING, "already completed " + this, new IllegalStateException());
return;
}
this.outcome = new Outcome(returnValue,null);
completed(new Outcome(returnValue, null));

scheduleNextRun();
}

private void completed(@Nonnull Outcome newOutcome) {
if (outcome == null) {
outcome = newOutcome;
scheduleNextRun();
whenOutcomeDelivered = new Throwable();
} else {
Throwable failure = newOutcome.getAbnormal();
if (failure instanceof FlowInterruptedException) {
for (CauseOfInterruption cause : ((FlowInterruptedException) failure).getCauses()) {
if (cause instanceof BodyFailed) {
LOGGER.log(Level.FINE, "already completed " + this + " and now received body failure", failure);
// Predictable that the error would be thrown up here; quietly ignore it.
return;
}
}
}
LOGGER.log(Level.WARNING, "already completed " + this, new IllegalStateException("delivered here"));
if (failure != null) {
LOGGER.log(Level.INFO, "new failure", failure);
} else {
LOGGER.log(Level.INFO, "new success: {0}", outcome.getNormal());
}
if (whenOutcomeDelivered != null) {
LOGGER.log(Level.INFO, "previously delivered here", whenOutcomeDelivered);
}
failure = outcome.getAbnormal();
if (failure != null) {
LOGGER.log(Level.INFO, "earlier failure", failure);
} else {
LOGGER.log(Level.INFO, "earlier success: {0}", outcome.getNormal());
}
}
}

/**
Expand Down Expand Up @@ -379,9 +405,11 @@ public void onSuccess(CpsThreadGroup g) {
if (s != null) {
// TODO: ideally this needs to work like interrupt, in that
// if s==null the next StepExecution gets interrupted when it happen
FlowInterruptedException cause = new FlowInterruptedException(Result.FAILURE);
FlowInterruptedException cause = new FlowInterruptedException(Result.FAILURE, new BodyFailed());
cause.initCause(getOutcome().getAbnormal());
try {
// TODO JENKINS-26148/JENKINS-34637 this is probably wrong: should interrupt the innermost execution
// (the “next” one could be block-scoped, and we would want to interrupt all parallel heads)
s.stop(cause);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Failed to stop the body execution in response to the failure of the parent");
Expand Down Expand Up @@ -419,6 +447,12 @@ public void onFailure(Throwable t) {
}
}

private static class BodyFailed extends CauseOfInterruption {
@Override public String getShortDescription() {
return "Body of block-scoped step failed";
}
}

@Override
public void setResult(Result r) {
try {
Expand Down Expand Up @@ -519,7 +553,14 @@ public int hashCode() {
}

@Override public String toString() {
return "CpsStepContext[" + id + "]:" + executionRef;
String function = null;
if (node instanceof StepNode) {
StepDescriptor d = ((StepNode) node).getDescriptor();
if (d != null) {
function = d.getFunctionName();
}
}
return "CpsStepContext[" + id + ":" + function + "]:" + executionRef;
}

private static final long serialVersionUID = 1L;
Expand Down
Expand Up @@ -231,7 +231,7 @@ void fireCompletionHandlers(Outcome o) {
/**
* Finds the next younger {@link CpsThread} that shares the same {@link FlowHead}.
*
* Can be {@code this.}
* Cannot be {@code this}.
*/
@CheckForNull CpsThread getNextInner() {
for (CpsThread t : group.threads.values()) {
Expand Down
Expand Up @@ -126,7 +126,10 @@ public void onFailure(StepContext context, Throwable t) {
if (handler.originalFailure == null) {
handler.originalFailure = new SimpleEntry<String, Throwable>(name, t);
} else {
handler.originalFailure.getValue().addSuppressed(t);
Throwable originalT = handler.originalFailure.getValue();
if (t != originalT) { // could be the same abort being delivered across branches
originalT.addSuppressed(t);
}
}
checkAllDone(true);
}
Expand Down

0 comments on commit bee2879

Please sign in to comment.