Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
[JENKINS-29705] added thread dump support
Browse files Browse the repository at this point in the history
Implemented the code to obtain the whole program state dump from
CpsThreadGroup.
  • Loading branch information
kohsuke committed Nov 15, 2015
1 parent 68a7212 commit 12cf061
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 2 deletions.
@@ -0,0 +1,108 @@
package org.jenkinsci.plugins.workflow.cps;

import org.apache.commons.lang.StringUtils;
import org.jenkinsci.plugins.workflow.cps.CpsThreadDump.ThreadInfo;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.jvnet.hudson.test.JenkinsRule;

import java.util.ArrayList;
import java.util.List;

import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;

/**
* @author Kohsuke Kawaguchi
*/
public class CpsThreadDumpTest {
@Rule
public JenkinsRule j = new JenkinsRule();
private WorkflowJob p;

@Before
public void setUp() throws Exception {
p = j.jenkins.createProject(WorkflowJob.class, "p");
}

@Test
public void threadDump() throws Exception {
p.setDefinition(new CpsFlowDefinition(StringUtils.join(asList(
"def foo() { bar() }",
"def bar() {",
" semaphore 'x'",
"}",
"foo()"
), "\n")));
WorkflowRun b = p.scheduleBuild2(0).waitForStart();
SemaphoreStep.waitForStart("x/1", b);
CpsFlowExecution e = (CpsFlowExecution) b.getExecution();
CpsThreadDump td = e.getThreadDump();
td.print(System.out);

{// verify that we got the right thread dump
List<ThreadInfo> threads = td.getThreads();
assertEquals(1, threads.size());
ThreadInfo t = threads.get(0);
assertThat(t.getHeadline(), containsString("Thread #0"));
assertStackTrace(t,
"org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep$Execution.<>(Native Method)",
"WorkflowScript.bar(WorkflowScript:3)",
"WorkflowScript.foo(WorkflowScript:1)",
"WorkflowScript.run(WorkflowScript:5)");
}
}

@Test
public void threadDump2() throws Exception {
p.setDefinition(new CpsFlowDefinition(StringUtils.join(asList(
"def foo(x) { bar(x) }",// 1
"def bar(x) {",
" semaphore x", // 3
"}",
"def zot() {",
" parallel(", // 6
" b1:{ foo('x') },",
" b2:{ bar('y') });",
"}",
"zot()" // 10
), "\n")));
WorkflowRun b = p.scheduleBuild2(0).waitForStart();
SemaphoreStep.waitForStart("x/1", b);
SemaphoreStep.waitForStart("y/1", b);

CpsFlowExecution e = (CpsFlowExecution) b.getExecution();
CpsThreadDump td = e.getThreadDump();
td.print(System.out);

assertStackTrace( td.getThreads().get(0),
"org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep$Execution.<>(Native Method)",
"WorkflowScript.bar(WorkflowScript:3)",
"WorkflowScript.foo(WorkflowScript:1)",
"WorkflowScript.zot(WorkflowScript:7)",
"org.jenkinsci.plugins.workflow.cps.steps.ParallelStepExecution.<>(Native Method)",
"WorkflowScript.zot(WorkflowScript:6)",
"WorkflowScript.run(WorkflowScript:10)");

assertStackTrace( td.getThreads().get(1),
"org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep$Execution.<>(Native Method)",
"WorkflowScript.bar(WorkflowScript:3)",
"WorkflowScript.zot(WorkflowScript:8)");
}

private void assertStackTrace(ThreadInfo t, String... expected) {
assertThat(toString(t.getStackTrace()), is(asList(expected)));
}

private List<String> toString(List<StackTraceElement> in) {
List<String> r = new ArrayList<String>();
for (StackTraceElement e : in)
r.add(e.toString());
return r;
}
}
Expand Up @@ -173,7 +173,7 @@ private Continuable createContinuable(CpsThread currentThread, CpsCallableInvoca
return new Continuable(
// this source location is a place holder for the step implementation.
// perhaps at some point in the future we'll let the Step implementation control this.
inv.invoke(env, SourceLocation.UNKNOWN, onSuccess));
inv.invoke(env, null, onSuccess));
}

@Override
Expand Down
Expand Up @@ -529,7 +529,7 @@ private void loadProgramFailed(final Throwable problem, SettableFuture<CpsThread
*
* <p>
* If the {@link CpsThreadGroup} deserializatoin fails, {@link FutureCallback#onFailure(Throwable)} will
* be invoked (on a random thread, since {@link CpsVmThread} doesn't exist without a valid program.)
* be invoked (on a random thread, since CpsVmThread doesn't exist without a valid program.)
*/
void runInCpsVmThread(final FutureCallback<CpsThreadGroup> callback) {
if (programPromise == null) {
Expand Down Expand Up @@ -637,6 +637,31 @@ public void onFailure(Throwable t) {
return r;
}

/**
* Synchronously obtain the current state of the workflow program.
*
* <p>
* The workflow can be already completed, or it can be in the process of
*/
public CpsThreadDump getThreadDump() {
if (programPromise == null || isComplete()) {
return CpsThreadDump.EMPTY;
}
if (!programPromise.isDone()) {
// CpsThreadGroup state isn't ready yet, but this is probably one of the common cases
// when one wants to obtain the stack trace. Can we do anything better?
return CpsThreadDump.UNKNOWN;
}

try {
return programPromise.get().getThreadDump();
} catch (InterruptedException e) {
throw new AssertionError(); // since we are checking programPromise.isDone() upfront
} catch (ExecutionException e) {
return CpsThreadDump.from(new Exception("Failed to resurrect program state",e));
}
}

@Override
public synchronized boolean isCurrentHead(FlowNode n) {
for (FlowHead h : heads.values()) {
Expand Down
Expand Up @@ -259,6 +259,10 @@ public Future<Object> resume(Outcome v) {
return promise;
}

public List<StackTraceElement> getStackTrace() {
return program.getStackTrace();
}

private static final Logger LOGGER = Logger.getLogger(CpsThread.class.getName());

private static final long serialVersionUID = 1L;
Expand Down
@@ -0,0 +1,139 @@
package org.jenkinsci.plugins.workflow.cps;

import org.jenkinsci.plugins.workflow.steps.StepExecution;

import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;

/**
* Immutable snapshot of a state of {@link CpsThreadGroup}.
*
* @author Kohsuke Kawaguchi
*/
public final class CpsThreadDump {
private final List<ThreadInfo> threads = new ArrayList<ThreadInfo>();

public static final class ThreadInfo {
private final String headline;
private final List<StackTraceElement> stack = new ArrayList<StackTraceElement>();

/**
* Given a list of {@link CpsThread}s that share the same {@link FlowHead}, in the order
* from outer to inner, reconstruct the thread stack.
*/
private ThreadInfo(List<CpsThread> e) {
CpsThread head = e.get(e.size() - 1);
headline = head.toString();

ListIterator<CpsThread> itr = e.listIterator(e.size());
while (itr.hasPrevious()) {
CpsThread t = itr.previous();

StepExecution s = t.getStep();
if (s !=null) {
stack.add(new StackTraceElement(s.getClass().getName(), "<>", null, -2));
}
stack.addAll(t.getStackTrace());
}
}

/**
* Create a fake {@link ThreadInfo} from a {@link Throwable} that copies its
* stack trace history.
*/
public ThreadInfo(Throwable t) {
headline = t.toString();
stack.addAll(Arrays.asList(t.getStackTrace()));
}

/**
* Can be empty but never be null. First element is the top of the stack.
*/
public List<StackTraceElement> getStackTrace() {
return Collections.unmodifiableList(stack);
}

public String getHeadline() {
return headline;
}

public void print(PrintWriter w) {
w.println(headline);
for (StackTraceElement e : stack) {
w.println("\tat " + e);
}
}

@Override
public String toString() {
StringWriter sw = new StringWriter();
print(new PrintWriter(sw));
return sw.toString();
}
}

/**
* Use one of the {@link #from(CpsThreadGroup)} method.
*/
private CpsThreadDump() {
}

public List<ThreadInfo> getThreads() {
return Collections.unmodifiableList(threads);
}

public void print(PrintStream ps) {
print(new PrintWriter(ps,true));
}

public void print(PrintWriter w) {
for (ThreadInfo t : threads)
t.print(w);
}

@Override
public String toString() {
StringWriter sw = new StringWriter();
print(new PrintWriter(sw));
return sw.toString();
}

public static CpsThreadDump from(Throwable t) {
CpsThreadDump td = new CpsThreadDump();
td.threads.add(new ThreadInfo(t));
return td;}

public static CpsThreadDump from(CpsThreadGroup g) {
// all the threads that share the same head form a logically single thread
Map<FlowHead, List<CpsThread>> m = new LinkedHashMap<FlowHead,List<CpsThread>>();
for (CpsThread t : g.threads.values()) {
List<CpsThread> l = m.get(t.head);
if (l==null) m.put(t.head, l = new ArrayList<CpsThread>());
l.add(t);
}

CpsThreadDump td = new CpsThreadDump();
for (List<CpsThread> e : m.values())
td.threads.add(new ThreadInfo(e));
return td;
}

/**
* Constant that indicates everything is done and no thread is alive.
*/
public static CpsThreadDump EMPTY = new CpsThreadDump();

/**
* Constant that indicates the state of {@link CpsThreadGroup} is unknown and so it is not possible
* to produe a thread dump.
*/
public static CpsThreadDump UNKNOWN = from(new Exception("Program state is not yet known"));
}
Expand Up @@ -322,6 +322,10 @@ public void run() {
});
}

public CpsThreadDump getThreadDump() {
return CpsThreadDump.from(this);
}

/**
* Persists the current state of {@link CpsThreadGroup}.
*/
Expand Down

0 comments on commit 12cf061

Please sign in to comment.