Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
Merge pull request #213 from jglick/many-steps-JENKINS-30055
Browse files Browse the repository at this point in the history
[JENKINS-30055] Handle a large number of steps in quick succession
  • Loading branch information
jglick committed Jan 11, 2016
2 parents 97ce55c + 3686605 commit 0fdf534
Show file tree
Hide file tree
Showing 14 changed files with 338 additions and 180 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Expand Up @@ -4,6 +4,7 @@ Only noting significant user changes, not internal code cleanups and minor bug f

## 1.13 (upcoming)

* [JENKINS-30055](https://issues.jenkins-ci.org/browse/JENKINS-30055): poor performance and file handle leaks when running a script with an enormous number of steps in quick succession
* [JENKINS-26126](https://issues.jenkins-ci.org/browse/JENKINS-26126) (partial): introspect Workflow steps to generate static reference documentation (link from _Snippet Generator_). Planned to be used for IDE auto-completion as well.
* [JENKINS-31614](https://issues.jenkins-ci.org/browse/JENKINS-31614): avoiding various deadlocks involving `Queue`.
* [JENKINS-31897](https://issues.jenkins-ci.org/browse/JENKINS-31897): parameters with default values may now be omitted from the `parameters` option to the `build` step.
Expand Down
@@ -0,0 +1,70 @@
/*
* The MIT License
*
* Copyright 2015 CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package org.jenkinsci.plugins.workflow;

import java.util.List;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.flow.FlowExecution;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import static org.junit.Assert.*;
import org.junit.Test;
import org.junit.Rule;
import org.junit.runners.model.Statement;
import org.jvnet.hudson.test.Issue;
import org.jvnet.hudson.test.RestartableJenkinsRule;

public class ScalabilityTest {

@Rule public RestartableJenkinsRule story = new RestartableJenkinsRule();

@Issue("JENKINS-30055")
@Test public void manySteps() {
story.addStep(new Statement() {
@Override public void evaluate() throws Throwable {
WorkflowJob p = story.j.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("for (int i = 0; i < 10000; i++) {echo \"iteration #${i}\"}", true));
WorkflowRun b = story.j.assertBuildStatusSuccess(p.scheduleBuild2(0));
story.j.assertLogContains("iteration #9876", b); // arbitrary iteration close to the end
}
});
story.addStep(new Statement() {
@Override public void evaluate() throws Throwable {
WorkflowJob p = story.j.jenkins.getItemByFullName("p", WorkflowJob.class);
WorkflowRun b = p.getLastBuild();
story.j.assertLogContains("iteration #5432", b);
FlowExecution execution = b.getExecution();
assertNotNull(execution);
FlowNode node = execution.getNode("5678");
assertNotNull(node);
List<FlowNode> parents = node.getParents();
assertEquals(1, parents.size());
assertEquals("5677", parents.get(0).getId());
}
});
}

}
Expand Up @@ -158,8 +158,17 @@ public String getUrl() throws IOException {
*/
public abstract void interrupt(Result r, CauseOfInterruption... causes) throws IOException, InterruptedException;

/**
* Add a listener to changes in the flow graph structure.
* @param listener a listener to add
*/
public abstract void addListener(GraphListener listener);

/**
* Reverse of {@link #addListener}.
*/
public /*abstract*/ void removeListener(GraphListener listener) {}

/**
* Checks whether this flow execution has finished executing completely.
*/
Expand Down
Expand Up @@ -38,4 +38,12 @@ public interface GraphListener {
* One of the use cases of this listener is to persist the state of {@link FlowExecution}.
*/
void onNewHead(FlowNode node);

/**
* Listener which should be notified of events immediately as they occur.
* You must be very careful not to acquire locks or block.
* If you do not implement this marker interface, you will receive notifications in batched deliveries.
*/
interface Synchronous extends GraphListener {}

}
Expand Up @@ -24,29 +24,50 @@

package org.jenkinsci.plugins.workflow.graph;

import java.io.IOException;
import org.jenkinsci.plugins.workflow.flow.FlowExecution;

import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;

/**
* @author Kohsuke Kawaguchi
* @author Jesse Glick
* End of a block.
* @see BlockStartNode
*/
public abstract class BlockEndNode<START extends BlockStartNode> extends FlowNode {
private final START start;
private transient START start;
private final String startId;

public BlockEndNode(FlowExecution exec, String id, START start, FlowNode... parents) {
super(exec, id, parents);
this.start = start;
startId = start.getId();
}

public BlockEndNode(FlowExecution exec, String id, START start, List<FlowNode> parents) {
super(exec, id, parents);
this.start = start;
startId = start.getId();
}

public START getStartNode() {
/**
* Returns the matching start node.
* @return an earlier node matching this block
* @throws IllegalStateException if the start node could not be reloaded after deserialization
*/
public @Nonnull START getStartNode() {
if (start == null) {
try {
start = (START) getExecution().getNode(startId);
if (start == null) {
throw new IllegalStateException("Matching start node " + startId + " lost from deserialization");
}
} catch (IOException x) {
throw new IllegalStateException("Could not load matching start node: " + x);
}
}
return start;
}

Expand Down
Expand Up @@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Level;
import static java.util.logging.Level.*;
import java.util.logging.Logger;
import javax.annotation.CheckForNull;
Expand All @@ -53,26 +54,37 @@
*/
@ExportedBean
public abstract class FlowNode extends Actionable implements Saveable {
private final List<FlowNode> parents;
private transient List<FlowNode> parents;
private final List<String> parentIds;

private final String id;

// this is a copy-on-write array so synchronization isn't needed between reader & writer.
@edu.umd.cs.findbugs.annotations.SuppressWarnings("IS2_INCONSISTENT_SYNC")
private transient CopyOnWriteArrayList<Action> actions;
private transient CopyOnWriteArrayList<Action> actions = new CopyOnWriteArrayList<Action>();

private transient final FlowExecution exec;

protected FlowNode(FlowExecution exec, String id, List<FlowNode> parents) {
this.id = id;
this.exec = exec;
this.parents = ImmutableList.copyOf(parents);
parentIds = ids();
}

protected FlowNode(FlowExecution exec, String id, FlowNode... parents) {
this.id = id;
this.exec = exec;
this.parents = ImmutableList.copyOf(parents);
parentIds = ids();
}

private List<String> ids() {
List<String> ids = new ArrayList<String>(parents.size());
for (FlowNode n : parents) {
ids.add(n.id);
}
return ids;
}

/**
Expand Down Expand Up @@ -103,9 +115,24 @@ public final boolean isRunning() {
* Returns a read-only view of parents.
*/
public List<FlowNode> getParents() {
if (parents == null) {
parents = loadParents(parentIds);
}
return parents;
}

private List<FlowNode> loadParents(List<String> parentIds) {
List<FlowNode> _parents = new ArrayList<FlowNode>(parentIds.size());
for (String parentId : parentIds) {
try {
_parents.add(exec.getNode(parentId));
} catch (IOException x) {
LOGGER.log(Level.WARNING, "failed to load parents of " + id, x);
}
}
return _parents;
}

@Restricted(DoNotUse.class)
@Exported(name="parents")
public List<String> getParentIds() {
Expand Down
Expand Up @@ -27,9 +27,9 @@
import org.jenkinsci.plugins.workflow.flow.FlowExecution;

/**
* @author Kohsuke Kawaguchi
* @author Jesse Glick
* @deprecated unused
*/
@Deprecated
public class ForkNode extends BlockStartNode {
public ForkNode(FlowExecution storage, String id, FlowNode... parents) {
super(storage, id, parents);
Expand Down
Expand Up @@ -29,9 +29,9 @@
import java.util.List;

/**
* @author Kohsuke Kawaguchi
* @author Jesse Glick
* @deprecated unused
*/
@Deprecated
public class JoinNode extends BlockEndNode<ForkNode> {
public JoinNode(FlowExecution exec, String id, ForkNode forkNode, List<FlowNode> parents) {
super(exec, id, forkNode, parents);
Expand Down
Expand Up @@ -93,7 +93,10 @@

import static com.thoughtworks.xstream.io.ExtendedHierarchicalStreamWriterHelper.*;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import hudson.BulkChange;
import hudson.init.Terminator;
import hudson.model.Queue;
import hudson.model.Saveable;
import hudson.model.User;
import hudson.security.ACL;
import java.beans.Introspector;
Expand Down Expand Up @@ -714,6 +717,12 @@ public void addListener(GraphListener listener) {
listeners.add(listener);
}

@Override public void removeListener(GraphListener listener) {
if (listeners != null) {
listeners.remove(listener);
}
}

@Override
public void interrupt(Result result, CauseOfInterruption... causes) throws IOException, InterruptedException {
setResult(result);
Expand Down Expand Up @@ -793,10 +802,36 @@ synchronized FlowHead getFirstHead() {
return heads.firstEntry().getValue();
}

void notifyListeners(FlowNode node) {
void notifyListeners(List<FlowNode> nodes, boolean synchronous) {
if (listeners != null) {
for (GraphListener listener : listeners) {
listener.onNewHead(node);
Saveable s = Saveable.NOOP;
try {
Queue.Executable exec = owner.getExecutable();
if (exec instanceof Saveable) {
s = (Saveable) exec;
}
} catch (IOException x) {
LOGGER.log(Level.WARNING, "failed to notify listeners of changes to " + nodes + " in " + this, x);
}
BulkChange bc = new BulkChange(s);
try {
for (FlowNode node : nodes) {
for (GraphListener listener : listeners) {
if (listener instanceof GraphListener.Synchronous == synchronous) {
listener.onNewHead(node);
}
}
}
} finally {
if (synchronous) {
bc.abort(); // hack to skip save—we are holding a lock
} else {
try {
bc.commit();
} catch (IOException x) {
LOGGER.log(Level.WARNING, null, x);
}
}
}
}
}
Expand Down
Expand Up @@ -42,7 +42,10 @@
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
Expand Down Expand Up @@ -307,6 +310,8 @@ private void run() throws IOException {
}
}

private transient List<FlowNode> nodesToNotify;
private static final Object nodesToNotifyLock = new Object();
/**
* Notifies listeners of the new {@link FlowHead}.
*
Expand All @@ -315,9 +320,24 @@ private void run() throws IOException {
@CpsVmThreadOnly
/*package*/ void notifyNewHead(final FlowNode head) {
assertVmThread();
execution.notifyListeners(Collections.singletonList(head), true);
synchronized (nodesToNotifyLock) {
if (nodesToNotify == null) {
nodesToNotify = new ArrayList<FlowNode>();
}
nodesToNotify.add(head);
}
runner.execute(new Runnable() {
public void run() {
execution.notifyListeners(head);
List<FlowNode> _nodesToNotify;
synchronized (nodesToNotifyLock) {
if (nodesToNotify == null) {
return;
}
_nodesToNotify = nodesToNotify;
nodesToNotify = null;
}
execution.notifyListeners(_nodesToNotify, false);
}
});
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jenkinsci.plugins.workflow.actions.ErrorAction;
Expand Down Expand Up @@ -119,7 +120,9 @@ void setNewHead(FlowNode v) {
} else {
// in recovering from error and such situation, we sometimes need to grow the graph
// without running the program.
execution.notifyListeners(v);
// TODO can CpsThreadGroup.notifyNewHead be used instead to notify both kinds of listeners?
execution.notifyListeners(Collections.singletonList(v), true);
execution.notifyListeners(Collections.singletonList(v), false);
}
} catch (IOException e) {
LOGGER.log(Level.FINE, "Failed to record new head: " + v, e);
Expand Down

0 comments on commit 0fdf534

Please sign in to comment.