Skip to content

Commit

Permalink
[JENKINS-31801] Initial work on throttle(category) step - needs tests
Browse files Browse the repository at this point in the history
  • Loading branch information
abayer committed Mar 8, 2017
1 parent d177460 commit af877cd
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 18 deletions.
26 changes: 15 additions & 11 deletions pom.xml
Expand Up @@ -26,7 +26,7 @@ THE SOFTWARE.
<parent>
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>plugin</artifactId>
<version>2.6</version>
<version>2.23</version>
</parent>

<artifactId>throttle-concurrents</artifactId>
Expand All @@ -44,10 +44,10 @@ THE SOFTWARE.
</licenses>

<properties>
<jenkins.version>1.609.3</jenkins.version>
<jenkins.version>1.642.3</jenkins.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compileSource>1.6</compileSource>
<compileTarget>1.6</compileTarget>
<compileSource>1.7</compileSource>
<compileTarget>1.7</compileTarget>
<!--TODO: do not fail on errors-->
<findbugs.failOnError>false</findbugs.failOnError>
</properties>
Expand Down Expand Up @@ -107,7 +107,17 @@ THE SOFTWARE.
<artifactId>matrix-project</artifactId>
<version>1.4.1</version>
</dependency>

<dependency>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-api</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-step-api</artifactId>
<version>2.3</version>
</dependency>

<!-- Dependencies for test -->
<dependency>
<groupId>org.jenkins-ci.plugins</groupId>
Expand Down Expand Up @@ -140,12 +150,6 @@ THE SOFTWARE.
<version>0.7.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

Expand Up @@ -3,36 +3,50 @@
import hudson.Extension;
import hudson.matrix.MatrixConfiguration;
import hudson.model.AbstractDescribableImpl;
import hudson.model.Computer;
import hudson.model.Descriptor;
import hudson.model.Item;
import hudson.model.ItemGroup;
import hudson.model.Job;
import hudson.model.JobProperty;
import hudson.model.JobPropertyDescriptor;
import hudson.model.Queue;
import hudson.model.Run;
import hudson.model.TaskListener;
import hudson.plugins.throttleconcurrents.pipeline.ThrottledStepInfo;
import hudson.util.FormValidation;
import hudson.util.ListBoxModel;
import hudson.Util;
import hudson.matrix.MatrixBuild;
import hudson.matrix.MatrixProject;
import hudson.matrix.MatrixRun;

import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.WeakHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;

import jenkins.model.Jenkins;

import net.sf.json.JSONObject;

import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.jenkinsci.plugins.workflow.flow.FlowExecution;
import org.jenkinsci.plugins.workflow.flow.FlowExecutionOwner;
import org.jenkinsci.plugins.workflow.steps.StepExecution;
import org.kohsuke.stapler.DataBoundConstructor;
import org.kohsuke.stapler.QueryParameter;
import org.kohsuke.stapler.StaplerRequest;
Expand Down Expand Up @@ -229,29 +243,82 @@ static List<Queue.Task> getCategoryTasks(String category) {
assert category != null && !category.equals("");
List<Queue.Task> categoryTasks = new ArrayList<Queue.Task>();
Collection<ThrottleJobProperty> properties;
DescriptorImpl descriptor = Jenkins.getActiveInstance().getDescriptorByType(DescriptorImpl.class);
DescriptorImpl descriptor = Jenkins.getActiveInstance().getDescriptorByType(DescriptorImpl.class);
synchronized (descriptor.propertiesByCategoryLock) {
Map<ThrottleJobProperty,Void> _properties = descriptor.propertiesByCategory.get(category);
Map<ThrottleJobProperty, Void> _properties = descriptor.propertiesByCategory.get(category);
properties = _properties != null ? new ArrayList<ThrottleJobProperty>(_properties.keySet()) : Collections.<ThrottleJobProperty>emptySet();
}
for (ThrottleJobProperty t : properties) {
if (t.getThrottleEnabled()) {
if (t.getCategories() != null && t.getCategories().contains(category)) {
Job<?,?> p = t.owner;
Job<?, ?> p = t.owner;
if (/*is a task*/ p instanceof Queue.Task && /* not deleted */getItem(p.getParent(), p.getName()) == p &&
/* has not since been reconfigured */ p.getProperty(ThrottleJobProperty.class) == t) {
categoryTasks.add((Queue.Task) p);
if (p instanceof MatrixProject && t.isThrottleMatrixConfigurations()) {
for (MatrixConfiguration mc : ((MatrixProject)p).getActiveConfigurations()) {
for (MatrixConfiguration mc : ((MatrixProject) p).getActiveConfigurations()) {
categoryTasks.add(mc);
}
}
}
}
}
}

return categoryTasks;
}

static List<ThrottledStepInfo> getThrottledPipelinesForCategory(String category) {
List<ThrottledStepInfo> throttledPipelines = new ArrayList<>();

DescriptorImpl descriptor = Jenkins.getActiveInstance().getDescriptorByType(DescriptorImpl.class);
for (Map.Entry<String,Integer> currentPipeline : descriptor.getThrottledPipelinesForCategory(category).entrySet()) {
Run<?,?> flowNodeRun = Run.fromExternalizableId(currentPipeline.getKey());

if (flowNodeRun == null) {
// No run found, so remove the throttle.
descriptor.removeThrottledPipelineForCategory(currentPipeline.getKey(), category, null);
} else if (!(flowNodeRun instanceof FlowExecutionOwner.Executable)) {
// If for some reason we've somehow ended up with a non-pipeline job, remove the throttle.
descriptor.removeThrottledPipelineForCategory(currentPipeline.getKey(), category, null);
} else if (!flowNodeRun.isBuilding()) {
// The run is done building, so remove the throttle.
descriptor.removeThrottledPipelineForCategory(currentPipeline.getKey(), category, null);
} else {
FlowExecutionOwner owner = ((FlowExecutionOwner.Executable)flowNodeRun).asFlowExecutionOwner();
FlowExecution execution = owner.getOrNull();
if (execution == null) {
// For some reason, the flow execution is null, so again? Remove the throttle.
descriptor.removeThrottledPipelineForCategory(currentPipeline.getKey(), category, null);
} else {
try {
for (StepExecution stepExec : execution.getCurrentExecutions(false).get()) {
try {
Computer c = stepExec.getContext().get(Computer.class);
ThrottledStepInfo candidateInfo = stepExec.getContext().get(ThrottledStepInfo.class);
if (c != null && candidateInfo != null) {
ThrottledStepInfo info = candidateInfo.forCategory(category);
if (info != null) {
if (info.getNode() == null && c.getNode() != null) {
info.setNode(c.getNode().getNodeName());
}
throttledPipelines.add(info);
}
}
} catch (IOException e) {
// TODO: What do we do here if anything?
}
}
} catch (InterruptedException | ExecutionException e) {
// TODO: What do we do here if anything?
}
}
}
}

return throttledPipelines;
}

private static Item getItem(ItemGroup group, String name) {
if (group instanceof Jenkins) {
return ((Jenkins) group).getItemMap().get(name);
Expand All @@ -262,7 +329,11 @@ private static Item getItem(ItemGroup group, String name) {

@Extension
public static final class DescriptorImpl extends JobPropertyDescriptor {
private static final Logger LOGGER = Logger.getLogger(DescriptorImpl.class.getName());

private List<ThrottleCategory> categories;

private Map<String,Map<String,Integer>> throttledPipelines;

/** Map from category names, to properties including that category. */
private transient Map<String,Map<ThrottleJobProperty,Void>> propertiesByCategory
Expand Down Expand Up @@ -369,7 +440,108 @@ public ListBoxModel doFillCategoryItems() {

return m;
}


@Override
public void load() {
super.load();
if (throttledPipelines == null) {
throttledPipelines = new TreeMap<>();
}
LOGGER.log(Level.FINE, "load: {0}", throttledPipelines);
}

@Override
public void save() {
super.save();
LOGGER.log(Level.FINE, "save: {0}", throttledPipelines);
}

@Nonnull
public synchronized Map<String,Integer> getThrottledPipelinesForCategory(@Nonnull String category) {
return internalGetThrottledPipelinesForCategory(category);
}

@Nonnull
private Map<String,Integer> internalGetThrottledPipelinesForCategory(@Nonnull String category) {
if (getCategoryByName(category) != null) {
if (throttledPipelines.containsKey(category)) {
return throttledPipelines.get(category);
}
}
return new TreeMap<>();
}

public synchronized void addThrottledPipelineForCategory(@Nonnull String runId,
@Nonnull String category,
TaskListener listener) {
if (getCategoryByName(category) == null) {
if (listener != null) {
listener.getLogger().println(Messages.ThrottleJobProperty_DescriptorImpl_NoSuchCategory(category));
}
} else {
Map<String,Integer> currentPipelines = internalGetThrottledPipelinesForCategory(category);

if (!currentPipelines.containsKey(runId)) {
currentPipelines.put(runId, 1);
} else {
currentPipelines.put(runId, currentPipelines.get(runId) + 1);
}

throttledPipelines.put(category, currentPipelines);
}
}

public synchronized void removeThrottledPipelineForCategory(@Nonnull String runId,
@Nonnull String category,
TaskListener listener) {
if (getCategoryByName(category) == null) {
if (listener != null) {
listener.getLogger().println(Messages.ThrottleJobProperty_DescriptorImpl_NoSuchCategory(category));
}
} else {
Map<String,Integer> currentPipelines = internalGetThrottledPipelinesForCategory(category);

if (!currentPipelines.isEmpty()) {
if (currentPipelines.containsKey(runId)) {
Integer currentCount = currentPipelines.get(runId);
if (currentCount > 1) {
currentPipelines.put(runId, currentCount - 1);
} else {
currentPipelines.remove(runId);
}
}
}

if (currentPipelines.isEmpty()) {
throttledPipelines.remove(category);
} else {
throttledPipelines.put(category, currentPipelines);
}
}
}

public synchronized void removeAllFromRunForCategory(@Nonnull String runId,
@Nonnull String category,
TaskListener listener) {
if (getCategoryByName(category) == null) {
if (listener != null) {
listener.getLogger().println(Messages.ThrottleJobProperty_DescriptorImpl_NoSuchCategory(category));
}
} else {
Map<String,Integer> currentPipelines = internalGetThrottledPipelinesForCategory(category);

if (!currentPipelines.isEmpty()) {
if (currentPipelines.containsKey(runId)) {
currentPipelines.remove(runId);
}
}
if (currentPipelines.isEmpty()) {
throttledPipelines.remove(category);
} else {
throttledPipelines.put(category, currentPipelines);
}
}
}
}

public static final class ThrottleCategory extends AbstractDescribableImpl<ThrottleCategory> {
Expand Down
Expand Up @@ -16,6 +16,7 @@
import hudson.model.labels.LabelAtom;
import hudson.model.queue.CauseOfBlockage;
import hudson.model.queue.QueueTaskDispatcher;
import hudson.plugins.throttleconcurrents.pipeline.ThrottledStepInfo;
import hudson.security.ACL;
import hudson.security.NotSerilizableSecurityContext;
import hudson.model.Action;
Expand Down Expand Up @@ -97,17 +98,21 @@ else if (tjp.getThrottleOption().equals("category")) {

// Double check category itself isn't null
if (category != null) {
int runCount = 0;

// Max concurrent per node for category
int maxConcurrentPerNode = getMaxConcurrentPerNodeBasedOnMatchingLabels(
node, category, category.getMaxConcurrentPerNode().intValue());
if (maxConcurrentPerNode > 0) {
int runCount = 0;
for (Task catTask : categoryTasks) {
if (jenkins.getQueue().isPending(catTask)) {
return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_BuildPending());
}
runCount += buildsOfProjectOnNode(node, catTask);
}
List<ThrottledStepInfo> throttledPipelines = ThrottleJobProperty.getThrottledPipelinesForCategory(catNm);
runCount += pipelinesOnNode(node, throttledPipelines);

// This would mean that there are as many or more builds currently running than are allowed.
if (runCount >= maxConcurrentPerNode) {
return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_MaxCapacityOnNode(runCount));
Expand Down Expand Up @@ -228,6 +233,8 @@ else if (tjp.getThrottleOption().equals("category")) {
}
totalRunCount += buildsOfProjectOnAllNodes(catTask);
}
List<ThrottledStepInfo> throttledPipelines = ThrottleJobProperty.getThrottledPipelinesForCategory(catNm);
totalRunCount += pipelinesOnAllNodes(throttledPipelines);

if (totalRunCount >= maxConcurrentTotal) {
return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_MaxCapacityTotal(totalRunCount));
Expand Down Expand Up @@ -363,6 +370,24 @@ private ThrottleJobProperty getThrottleJobProperty(Task task) {
return null;
}

private int pipelinesOnNode(@Nonnull Node node, @Nonnull List<ThrottledStepInfo> throttledPipelines) {
int runCount = 0;

String nodeName = node.getNodeName();

for (ThrottledStepInfo info : throttledPipelines) {
if (nodeName.equals(info.getNode())) {
runCount++;
}
}

return runCount;
}

private int pipelinesOnAllNodes(@Nonnull List<ThrottledStepInfo> throttledPipelines) {
return throttledPipelines.size();
}

private int buildsOfProjectOnNode(Node node, Task task) {
if (!shouldBeThrottled(task, getThrottleJobProperty(task))) {
return 0;
Expand Down

0 comments on commit af877cd

Please sign in to comment.