Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
[JENKINS-29875] New API for long lived synchronous steps
Browse files Browse the repository at this point in the history
  • Loading branch information
amuniz committed Aug 11, 2015
1 parent 4f5e0af commit f379738
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 3 deletions.
@@ -0,0 +1,162 @@
package org.jenkinsci.plugins.workflow.steps;

import hudson.model.Run;
import hudson.model.TaskListener;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;

import jenkins.model.Jenkins;

import org.jenkinsci.plugins.workflow.steps.AbstractStepDescriptorImpl;
import org.jenkinsci.plugins.workflow.steps.AbstractStepImpl;
import org.jenkinsci.plugins.workflow.steps.AbstractSynchronousNonBlockingStepExecution;
import org.jenkinsci.plugins.workflow.steps.StepContext;
import org.jvnet.hudson.test.JenkinsRule;
import org.jvnet.hudson.test.TestExtension;
import org.kohsuke.stapler.DataBoundConstructor;

import com.google.inject.Inject;

public final class SynchronousNonBlockingStep extends AbstractStepImpl implements Serializable {

/** State of semaphore steps within one Jenkins home and thus (possibly restarting) test. */
private static final class State {
private static final Map<File,State> states = new HashMap<File,State>();
static synchronized State get() {
File home = Jenkins.getActiveInstance().getRootDir();
State state = states.get(home);
if (state == null) {
state = new State();
states.put(home, state);
}
return state;
}
private State() {}
private final Map<String,Integer> iota = new HashMap<String,Integer>();
synchronized int allocateNumber(String id) {
Integer old = iota.get(id);
if (old == null) {
old = 0;
}
int number = old + 1;
iota.put(id, number);
return number;
}
/** map from {@link #k} to serial form of {@link StepContext} */
final Map<String,String> contexts = new HashMap<String,String>();
final Map<String,Object> returnValues = new HashMap<String,Object>();
final Map<String,Throwable> errors = new HashMap<String,Throwable>();
final Set<String> started = new HashSet<String>();
}

private final String id;
private final int number;

@DataBoundConstructor
public SynchronousNonBlockingStep(String id) {
this.id = id;
number = State.get().allocateNumber(id);
}

public String getId() {
return id;
}

private String k() {
return id + "/" + number;
}

public StepContext getContext() {
return getContext(State.get(), k());
}

private static StepContext getContext(State s, String k) {
return (StepContext) Jenkins.XSTREAM.fromXML(s.contexts.get(k));
}

public static void waitForStart(@Nonnull String k, @CheckForNull Run<?,?> b) throws IOException, InterruptedException {
State s = State.get();
synchronized (s) {
while (!s.started.contains(k)) {
if (b != null && !b.isBuilding()) {
throw new AssertionError(JenkinsRule.getLog(b));
}
s.wait(1000);
}
}
}

public static class StepExecutionImpl extends AbstractSynchronousNonBlockingStepExecution<Void> {

@Inject(optional=true)
private SynchronousNonBlockingStep step;

@Override
protected Void run() throws Exception {
// Send a test message to the listener
getContext().get(TaskListener.class).getLogger().println("Test Sync Message");

State s = State.get();
String k = step.k();
if (s.returnValues.containsKey(k)) {
System.err.println("Immediately running " + k);
getContext().onSuccess(s.returnValues.get(k));
} else if (s.errors.containsKey(k)) {
System.err.println("Immediately failing " + k);
getContext().onFailure(s.errors.get(k));
} else {
System.err.println("Blocking " + k);
s.contexts.put(k, Jenkins.XSTREAM.toXML(getContext()));
}

// Let's wait 2 seconds to give time to the main test thread to reach the wait point
Thread.sleep(2000);

synchronized (s) {
s.started.add(k);
s.notifyAll();
}

// Let's wait 2 additional seconds after unblocking the test thread
// During this 2 seconds it will check for the messages in the build log
System.out.println("Sleeping inside the syncnonblocking thread");
Thread.sleep(2000);
System.out.println("Continue syncnonblocking");

return null;
}

private static final long serialVersionUID = 1L;
}

@TestExtension
public static final class DescriptorImpl extends AbstractStepDescriptorImpl {

public DescriptorImpl() {
super(StepExecutionImpl.class);
}

@Override
public String getFunctionName() {
return "syncnonblocking";
}

@Override
public String getDisplayName() {
return "Sync non-blocking Test step";
}

}

private static final long serialVersionUID = 1L;

}
@@ -0,0 +1,61 @@
package org.jenkinsci.plugins.workflow.steps;

import static org.junit.Assert.assertTrue;

import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.graph.FlowGraphWalker;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.junit.Rule;
import org.junit.Test;
import org.jvnet.hudson.test.JenkinsRule;

public class SynchronousNonBlockingStepTest {

@Rule
public JenkinsRule j = new JenkinsRule();

@Test
public void basicNonBlockingStep() throws Exception {
WorkflowJob p = j.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("node {\n" +
"echo 'First message'\n" +
"syncnonblocking 'wait'\n" +
"echo 'Second message'\n" +
"}"));
WorkflowRun b = p.scheduleBuild2(0).getStartCondition().get();

// Stop here until syncnonblocking has printed the message "Test Sync Message"
SynchronousNonBlockingStep.waitForStart("wait/1", b);

// At this point the execution is paused inside the synchronous non-blocking step
// Check for FlowNode created
FlowGraphWalker walker = new FlowGraphWalker(b.getExecution());
boolean found = false;
for (FlowNode n = walker.next(); n != null; n = walker.next()) {
if (n.getDisplayName().equals("Sync non-blocking Test step")) {
found = true;
break;
}
}

System.out.println("Checking flow node added...");
assertTrue("FlowNode has to be added just when the step starts running", found);

// Check for message the test message sent to context listener
System.out.println("Checking build log message present...");
j.assertLogContains("Test Sync Message", b);
// The last step did not run yet
j.assertLogNotContains("Second message", b);

// No need to call success since it's called in the syncnonblocking thread
// SynchronousNonBlockingStep.success("wait/1", null);
System.out.println("Waiting until syncnonblocking (and the full flow) finishes");
j.waitUntilNoActivity();
System.out.println("Build finished. Continue.");
// Check for the last message
j.assertLogContains("Second message", b);
j.assertBuildStatusSuccess(b);
}
}
Expand Up @@ -31,13 +31,16 @@
import hudson.model.listeners.SCMListener;
import hudson.scm.SCM;
import hudson.scm.SCMRevisionState;

import java.io.File;
import java.io.Serializable;

import javax.annotation.Nonnull;
import javax.inject.Inject;

import org.jenkinsci.plugins.workflow.steps.AbstractStepDescriptorImpl;
import org.jenkinsci.plugins.workflow.steps.AbstractStepImpl;
import org.jenkinsci.plugins.workflow.steps.AbstractSynchronousStepExecution;
import org.jenkinsci.plugins.workflow.steps.AbstractSynchronousNonBlockingStepExecution;
import org.jenkinsci.plugins.workflow.steps.StepContextParameter;
import org.kohsuke.stapler.DataBoundSetter;

Expand Down Expand Up @@ -67,7 +70,7 @@ public boolean isChangelog() {

protected abstract @Nonnull SCM createSCM();

public static final class StepExecutionImpl extends AbstractSynchronousStepExecution<Void> {
public static final class StepExecutionImpl extends AbstractSynchronousNonBlockingStepExecution<Void> {

@Inject private transient SCMStep step;
@StepContextParameter private transient Run<?,?> run;
Expand Down
@@ -0,0 +1,64 @@
package org.jenkinsci.plugins.workflow.steps;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import jenkins.util.Timer;

/**
* {@link StepExecution} that always executes synchronously and does not block the CPS VM thread.
* @param <T> the type of the return value (may be {@link Void})
*/
public abstract class AbstractSynchronousNonBlockingStepExecution<T> extends AbstractStepExecutionImpl {

private transient volatile ScheduledFuture<?> task;

protected AbstractSynchronousNonBlockingStepExecution() {
}

protected AbstractSynchronousNonBlockingStepExecution(StepContext context) {
super(context);
}

/**
* Meat of the execution.
*
* When this method returns, a step execution is over.
*/
protected abstract T run() throws Exception;

@Override
public final boolean start() throws Exception {
task = Timer.get().schedule(new StepRunner(this), 0, TimeUnit.MILLISECONDS);
return false;
}

/**
* If the computation is going synchronously, try to cancel that.
*/
@Override
public void stop(Throwable cause) throws Exception {
if (task != null) {
task.cancel(false);
}
getContext().onFailure(cause);
}

private class StepRunner implements Runnable {

private final AbstractSynchronousNonBlockingStepExecution<T> step;

public StepRunner(AbstractSynchronousNonBlockingStepExecution<T> step) {
this.step = step;
}

@Override
public void run() {
try {
step.getContext().onSuccess(step.run());
} catch (Exception e) {
step.getContext().onFailure(e);
}
}
}
}
Expand Up @@ -6,7 +6,8 @@
import static hudson.model.Result.ABORTED;

/**
* {@link StepExecution} that always executes synchronously.
* {@link StepExecution} that always executes synchronously. This API should be used for short-lived tasks that
* return almost instantly. For long-lived tasks use {@link AbstractSynchronousNonBlockingStepExecution}.
* @param <T> the type of the return value (may be {@link Void})
* @author Kohsuke Kawaguchi
*/
Expand Down

0 comments on commit f379738

Please sign in to comment.