Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
Merge pull request #176 from amuniz/JENKINS-29875
Browse files Browse the repository at this point in the history
[JENKINS-29875] New API for long lived synchronous steps
  • Loading branch information
jglick committed Aug 18, 2015
2 parents bfcd19e + 46eedec commit 3570b88
Show file tree
Hide file tree
Showing 13 changed files with 370 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Expand Up @@ -5,6 +5,7 @@ Only noting significant user changes, not internal code cleanups and minor bug f
## 1.10 (upcoming)

* [JENKINS-29890](https://issues.jenkins-ci.org/browse/JENKINS-29890): `input` step submitter was not being consistently logged.
* [JENKINS-25879](https://issues.jenkins-ci.org/browse/JENKINS-25879), [JENKINS-29875](https://issues.jenkins-ci.org/browse/JENKINS-29875): New API to run long lived tasks that could block on I/O in a separate thread avoiding to block main CPS VM thread.

## 1.9 (Aug 06 2015)

Expand Down
@@ -0,0 +1,250 @@
package org.jenkinsci.plugins.workflow.steps;

import static org.junit.Assert.assertTrue;
import hudson.model.Result;
import hudson.model.TaskListener;
import hudson.model.Run;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import jenkins.model.Jenkins;

import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.cps.nodes.StepNode;
import org.jenkinsci.plugins.workflow.flow.FlowExecution;
import org.jenkinsci.plugins.workflow.graph.FlowGraphWalker;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.jvnet.hudson.test.BuildWatcher;
import org.jvnet.hudson.test.JenkinsRule;
import org.jvnet.hudson.test.TestExtension;
import org.kohsuke.stapler.DataBoundConstructor;

import com.google.inject.Inject;

public class SynchronousNonBlockingStepTest {

@Rule
public JenkinsRule j = new JenkinsRule();

@ClassRule
public static BuildWatcher buildWatcher = new BuildWatcher();

@Test
public void basicNonBlockingStep() throws Exception {
WorkflowJob p = j.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("node {\n" +
"echo 'First message'\n" +
"syncnonblocking 'wait'\n" +
"echo 'Second message'\n" +
"}"));
WorkflowRun b = p.scheduleBuild2(0).getStartCondition().get();

// Wait for syncnonblocking to be started
System.out.println("Waiting to syncnonblocking to start...");
SynchronousNonBlockingStep.waitForStart("wait", b);

// At this point the execution is paused inside the synchronous non-blocking step
// Check for FlowNode created
FlowGraphWalker walker = new FlowGraphWalker(b.getExecution());
boolean found = false;
// TODO: use iterator when https://github.com/jenkinsci/workflow-plugin/pull/178 merged
for (FlowNode n = walker.next(); n != null; n = walker.next()) {
if (n instanceof StepNode && ((StepNode) n).getDescriptor() instanceof SynchronousNonBlockingStep.DescriptorImpl) {
found = true;
break;
}
}

System.out.println("Checking flow node added...");
assertTrue("FlowNode has to be added just when the step starts running", found);

// Check for message the test message sent to context listener
System.out.println("Checking build log message present...");
j.waitForMessage("Test Sync Message", b);
// The last step did not run yet
j.assertLogContains("First message", b);
j.assertLogNotContains("Second message", b);

// Let syncnonblocking to continue
SynchronousNonBlockingStep.notify("wait");

System.out.println("Waiting until syncnonblocking (and the full flow) finishes");
j.waitForCompletion(b);
System.out.println("Build finished. Continue.");
// Check for the last message
j.assertLogContains("Second message", b);
j.assertBuildStatusSuccess(b);
}

@Test
public void interruptedTest() throws Exception {
WorkflowJob p = j.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("node {\n" +
"echo 'First message'\n" +
"try { syncnonblocking 'wait' } catch(InterruptedException e) { echo 'Interrupted!' }\n" +
"echo 'Second message'\n" +
"}"));
WorkflowRun b = p.scheduleBuild2(0).getStartCondition().get();

// Wait for syncnonblocking to be started
System.out.println("Waiting to syncnonblocking to start...");
SynchronousNonBlockingStep.waitForStart("wait", b);

// At this point syncnonblocking is waiting for an interruption

FlowExecution e = b.getExecutionPromise().get();
// Let's force a call to stop. This will try to send an interruption to the run Thread
e.interrupt(Result.ABORTED);
System.out.println("Looking for interruption received log message");
j.waitForMessage("Interrupted!", b);
j.waitForCompletion(b);
}

@Test
public void parallelTest() throws Exception {
WorkflowJob p = j.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("node {\n" +
"echo 'First message'\n" +
"parallel( a: { syncnonblocking 'wait0'; echo 'a branch'; }, b: { semaphore 'wait1'; echo 'b branch'; } )\n" +
"echo 'Second message'\n" +
"}"));
WorkflowRun b = p.scheduleBuild2(0).getStartCondition().get();

SynchronousNonBlockingStep.waitForStart("wait0", b);
SemaphoreStep.success("wait1/1", null);

// Wait for "b" branch to print its message
j.waitForMessage("b branch", b);
System.out.println("b branch finishes");

// Check that "a" branch is effectively blocked
j.assertLogNotContains("a branch", b);

// Notify "a" branch
System.out.println("Continue on wait0");
SynchronousNonBlockingStep.notify("wait0");

// Wait for "a" branch to finish
j.waitForMessage("a branch", b);
j.waitForCompletion(b);
}

public static final class SynchronousNonBlockingStep extends AbstractStepImpl implements Serializable {

public static final class State {
private static final Map<File,State> states = new HashMap<File,State>();
static synchronized State get() {
File home = Jenkins.getActiveInstance().getRootDir();
State state = states.get(home);
if (state == null) {
state = new State();
states.put(home, state);
}
return state;
}
private State() {}
final Set<String> started = new HashSet<String>();
}

private String id;

@DataBoundConstructor
public SynchronousNonBlockingStep(String id) {
this.id = id;
}

public String getId() {
return id;
}

public static void waitForStart(String id, Run<?,?> b) throws IOException, InterruptedException {
State s = State.get();
synchronized (s) {
while (!s.started.contains(id)) {
if (b != null && !b.isBuilding()) {
throw new AssertionError();
}
s.wait(1000);
}
}
}

public static final void notify(String id) {
State s = State.get();
synchronized (s) {
if (s.started.remove(id)) {
s.notifyAll();
}
}
}

public static class StepExecutionImpl extends AbstractSynchronousNonBlockingStepExecution<Void> {

@Inject(optional=true)
private transient SynchronousNonBlockingStep step;

@StepContextParameter
private transient TaskListener listener;

@Override
protected Void run() throws Exception {
System.out.println("Starting syncnonblocking " + step.getId());
// Send a test message to the listener
listener.getLogger().println("Test Sync Message");

State s = State.get();
synchronized (s) {
s.started.add(step.getId());
s.notifyAll();
}

// Wait until somone (main test thread) notify us
System.out.println("Sleeping inside the syncnonblocking thread (" + step.getId() + ")");
synchronized (s) {
while (s.started.contains(step.getId())) {
s.wait(1000);
}
}
System.out.println("Continue syncnonblocking " + step.getId());

return null;
}

private static final long serialVersionUID = 1L;
}

@TestExtension
public static final class DescriptorImpl extends AbstractStepDescriptorImpl {

public DescriptorImpl() {
super(StepExecutionImpl.class);
}

@Override
public String getFunctionName() {
return "syncnonblocking";
}

@Override
public String getDisplayName() {
return "Sync non-blocking Test step";
}

}

private static final long serialVersionUID = 1L;

}
}
Expand Up @@ -6,18 +6,21 @@
import hudson.model.Run;
import hudson.model.TaskListener;
import hudson.remoting.VirtualChannel;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.inject.Inject;

import jenkins.MasterToSlaveFileCallable;
import jenkins.util.BuildListenerAdapter;

/**
* @author Kohsuke Kawaguchi
*/
public class ArtifactArchiverStepExecution extends AbstractSynchronousStepExecution<Void> {
public class ArtifactArchiverStepExecution extends AbstractSynchronousNonBlockingStepExecution<Void> {

@StepContextParameter
private transient TaskListener listener;
Expand Down
@@ -1,6 +1,7 @@
package org.jenkinsci.plugins.workflow.steps;

import com.google.inject.Inject;

import hudson.AbortException;
import hudson.FilePath;
import hudson.model.Run;
Expand All @@ -16,7 +17,7 @@
/**
* @author Kohsuke Kawaguchi
*/
public class ArtifactUnarchiverStepExecution extends AbstractSynchronousStepExecution<List<FilePath>> {
public class ArtifactUnarchiverStepExecution extends AbstractSynchronousNonBlockingStepExecution<List<FilePath>> {
@StepContextParameter
private transient FilePath ws;

Expand Down
Expand Up @@ -25,6 +25,7 @@
package org.jenkinsci.plugins.workflow.steps;

import com.google.inject.Inject;

import hudson.Extension;
import hudson.FilePath;
import hudson.Launcher;
Expand All @@ -34,11 +35,14 @@
import hudson.model.TaskListener;
import hudson.tasks.Builder;
import hudson.tasks.Publisher;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import jenkins.model.Jenkins;
import jenkins.tasks.SimpleBuildStep;

import org.kohsuke.stapler.DataBoundConstructor;

/**
Expand All @@ -52,7 +56,7 @@ public final class CoreStep extends AbstractStepImpl {
this.delegate = delegate;
}

private static final class Execution extends AbstractSynchronousStepExecution<Void> {
private static final class Execution extends AbstractSynchronousNonBlockingStepExecution<Void> {

@Inject private transient CoreStep step;
@StepContextParameter private transient Run<?,?> run;
Expand Down
Expand Up @@ -59,7 +59,7 @@ public DescriptorImpl() {

}

public static final class Execution extends AbstractSynchronousStepExecution<Boolean> {
public static final class Execution extends AbstractSynchronousNonBlockingStepExecution<Boolean> {

@Inject private transient FileExistsStep step;
@StepContextParameter private transient FilePath workspace;
Expand Down
Expand Up @@ -24,10 +24,12 @@
package org.jenkinsci.plugins.workflow.steps;

import com.google.inject.Inject;

import hudson.AbortException;
import hudson.Extension;
import hudson.model.TaskListener;
import jenkins.plugins.mailer.tasks.MimeMessageBuilder;

import org.apache.commons.lang.StringUtils;
import org.kohsuke.stapler.DataBoundConstructor;
import org.kohsuke.stapler.DataBoundSetter;
Expand All @@ -38,6 +40,7 @@
import javax.mail.MessagingException;
import javax.mail.Transport;
import javax.mail.internet.MimeMessage;

import java.io.UnsupportedEncodingException;

/**
Expand Down Expand Up @@ -97,7 +100,7 @@ public DescriptorImpl() {
/**
* @author <a href="mailto:tom.fennelly@gmail.com">tom.fennelly@gmail.com</a>
*/
public static class MailStepExecution extends AbstractSynchronousStepExecution<Void> {
public static class MailStepExecution extends AbstractSynchronousNonBlockingStepExecution<Void> {

private static final long serialVersionUID = 1L;

Expand Down
Expand Up @@ -27,8 +27,11 @@
import hudson.Extension;
import hudson.FilePath;
import hudson.Util;

import java.io.InputStream;

import javax.inject.Inject;

import org.apache.commons.io.IOUtils;
import org.kohsuke.stapler.DataBoundConstructor;
import org.kohsuke.stapler.DataBoundSetter;
Expand Down Expand Up @@ -75,7 +78,7 @@ public DescriptorImpl() {

}

public static final class Execution extends AbstractSynchronousStepExecution<String> {
public static final class Execution extends AbstractSynchronousNonBlockingStepExecution<String> {

@Inject private transient ReadFileStep step;
@StepContextParameter private transient FilePath workspace;
Expand Down

0 comments on commit 3570b88

Please sign in to comment.