Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge pull request #18 from carlossg/thread-pool
[JENKINS-32695] Result from health checks are lost when more than 3 run at the same time
  • Loading branch information
stephenc committed Feb 2, 2016
2 parents 1de26f6 + f065ff9 commit ef2d2f9
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 20 deletions.
43 changes: 23 additions & 20 deletions src/main/java/jenkins/metrics/api/Metrics.java
Expand Up @@ -45,12 +45,11 @@
import hudson.security.Permission;
import hudson.security.PermissionGroup;
import hudson.security.PermissionScope;
import hudson.util.DaemonThreadFactory;
import hudson.util.ExceptionCatchingThreadFactory;
import hudson.util.PluginServletFilter;
import hudson.util.StreamTaskListener;
import hudson.util.TimeUnit2;
import jenkins.model.Jenkins;
import jenkins.metrics.util.HealthChecksThreadPool;

import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;
Expand All @@ -67,11 +66,7 @@
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -112,19 +107,9 @@ public class Metrics extends Plugin {
*/
private static final Logger LOGGER = Logger.getLogger(Metrics.class.getName());
/**
* Thread pool for running health checks. We set the pool upper limit to 4 and we keep threads around for 5 seconds
* as this is a bursty pool used once per minute.
* Thread pool for running health checks.
*/
private static final ExecutorService threadPoolForHealthChecks = new ThreadPoolExecutor(0, 4,
5L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ExceptionCatchingThreadFactory(new DaemonThreadFactory(new ThreadFactory() {
private final AtomicInteger number = new AtomicInteger();

public Thread newThread(Runnable r) {
return new Thread(r, "Metrics-HealthChecks-" + number.incrementAndGet());
}
})));
private static ExecutorService threadPoolForHealthChecks;
/**
* The registry of metrics.
*/
Expand Down Expand Up @@ -312,10 +297,14 @@ public static void afterExtensionsAugmented() {
}
for (HealthCheckProvider p : Jenkins.getInstance().getExtensionList(HealthCheckProvider.class)) {
LOGGER.log(Level.FINER, "Registering health check provider {0} (type {1})", new Object[]{p, p.getClass()});
for (Map.Entry<String, HealthCheck> c : p.getHealthChecks().entrySet()) {
Map<String, HealthCheck> healthChecks = p.getHealthChecks();
for (Map.Entry<String, HealthCheck> c : healthChecks.entrySet()) {
plugin.healthCheckRegistry.register(c.getKey(), c.getValue());
}
LOGGER.log(Level.FINER, "Registered health check provider {0} (type {1}) with {2} checks: {3}",
new Object[] { p, p.getClass(), healthChecks.size(), healthChecks.keySet() });
}
threadPoolForHealthChecks = new HealthChecksThreadPool(healthCheckRegistry());
LOGGER.log(Level.FINE, "Metric provider and health check provider extensions registered");
}

Expand Down Expand Up @@ -430,6 +419,10 @@ public final void doRun() {
HealthChecker.class.getName() + " thread is still running. Execution aborted.");
return;
}
if (threadPoolForHealthChecks == null) {
LOGGER.info("Health checks thread pool not yet initialized, skipping until next execution");
return;
}
future = threadPoolForHealthChecks.submit(new Runnable() {
public void run() {
logger.log(Level.FINE, "Started " + HealthChecker.class.getName());
Expand All @@ -451,6 +444,11 @@ public void run() {
}
} catch (InterruptedException e) {
e.printStackTrace(l.fatalError("aborted"));
} catch (Exception e) {
logger.log(Level.SEVERE, "Error running " + HealthChecker.class.getName(), e);
if (l != null) {
e.printStackTrace(l.fatalError(e.getMessage()));
}
} finally {
if (l != null) {
l.closeQuietly();
Expand Down Expand Up @@ -490,6 +488,11 @@ private void execute(TaskListener listener) throws IOException, InterruptedExcep
SortedMap<String, HealthCheck.Result> results;
try {
results = registry.runHealthChecks(threadPoolForHealthChecks);
} catch (RejectedExecutionException e) {
// should never happen, as we are using a DiscardOldestPolicy in the thread pool queue
listener.error("Health checks execution was rejected instead of queued: {0}", e);
LOGGER.log(Level.WARNING, "Health checks execution was rejected instead of queued: {0}", e);
return;
} finally {
context.stop();
}
Expand Down
173 changes: 173 additions & 0 deletions src/main/java/jenkins/metrics/util/HealthChecksThreadPool.java
@@ -0,0 +1,173 @@
/*
* The MIT License
*
* Copyright (c) 2016, CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package jenkins.metrics.util;

import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.DoNotUse;

import com.codahale.metrics.health.HealthCheckRegistry;

import hudson.util.DaemonThreadFactory;
import hudson.util.ExceptionCatchingThreadFactory;
import jenkins.metrics.api.Metrics.HealthChecker;

/**
* Thread pool for running health checks. We set the pool size to 4 by default (configurable with system property
* jenkins.metrics.util.HealthChecksThreadPool.maxThreadNumber) and we keep threads around for 5 seconds as this is a
* bursty pool used once per minute.
*
* The queue size is limited to the current number of health checks dynamically, minus the 4 threads in the pool, plus
* one, as the {@link HealthChecker} itself is executed in the pool too. For example for 10 health checks we have the
* thread pool (4) + the queue (7) = 11 for the 10 health checks and the HealthChecker.
*
* The {@link RejectedExecutionHandler} is configured to drop oldest items in the queue as new ones come in, to avoid
* running more than one health check in each recurrence period.
*
* @since 3.1.2.3
*/
public class HealthChecksThreadPool extends ThreadPoolExecutor {

private static final Logger LOGGER = Logger.getLogger(HealthChecksThreadPool.class.getName());

private static final int MAX_THREAD_POOL_SIZE = Integer
.parseInt(System.getProperty(HealthChecksThreadPool.class.getName() + ".maxThreadNumber", "4"));

private static long rejectedExecutions;

private HealthCheckRegistry healthCheckRegistry;

public HealthChecksThreadPool(HealthCheckRegistry healthCheckRegistry) {
super(MAX_THREAD_POOL_SIZE, MAX_THREAD_POOL_SIZE, //
5L, TimeUnit.SECONDS, //
new LinkedBlockingQueue<Runnable>(), //
new ExceptionCatchingThreadFactory(new DaemonThreadFactory(new ThreadFactory() {
private final AtomicInteger number = new AtomicInteger();

public Thread newThread(Runnable r) {
return new Thread(r, "Metrics-HealthChecks-" + number.incrementAndGet());
}
})), new MetricsRejectedExecutionHandler(healthCheckRegistry));
this.allowCoreThreadTimeOut(true); // allow stopping all threads if idle
this.healthCheckRegistry = healthCheckRegistry;
LOGGER.log(Level.FINE,
"Created thread pool with a max of {0} threads (plus {1} in queue) for {2} health checks",
new Object[] { getMaximumPoolSize(), queueCapacity(), healthCheckRegistry.getNames().size() });
}

/**
* Queue capacity is dynamically calculated based on the number of health checks. One thread is taken by the
* executor itself
*/
private int queueCapacity() {
return Math.max(0, 1 + healthCheckRegistry.getNames().size() - MAX_THREAD_POOL_SIZE);
}

/**
* Manually handle the queue size so it doesn't grow over our calculated queue capacity based on the number of
* health checks
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
LOGGER.log(Level.FINEST, "Executing health check, pool/queue size is {0}/{1} for {2} health checks",
new Object[] { getMaximumPoolSize(), queueCapacity(), healthCheckRegistry.getNames().size() });
// avoid going over queueCapacity, drop the oldest in queue if that happens
// if there is any race condition MetricsRejectedExecutionHandler will catch it anyway
if (getQueue().size() >= queueCapacity()) {
dropOldestInQueue(this, healthCheckRegistry);
}
super.beforeExecute(t, r);
}

/**
* Drop the oldest health check in executor queue and cancel it
*/
static void dropOldestInQueue(ThreadPoolExecutor executor, HealthCheckRegistry healthCheckRegistry) {
LOGGER.log(Level.WARNING,
"Too many health check executions queued, dropping oldest one. This may mean some health checks are taking too long to execute:"
+ " {0}, queue size={1}, health checks={2} ({3})",
new Object[] { executor, executor.getQueue().size(), healthCheckRegistry.getNames(),
healthCheckRegistry.getNames().size() });

Runnable discarded = executor.getQueue().poll();
cancelQueuedHealthCheck(discarded);
}

/**
* Cancel the future execution, so that
* {@link HealthCheckRegistry#runHealthChecks(java.util.concurrent.ExecutorService)} doesn't wait indefinitely. It
* is not enough with removing it from the queue.
*/
@SuppressWarnings("rawtypes")
private static void cancelQueuedHealthCheck(Runnable discarded) {
if (discarded instanceof Future) {
// it has to be a Future
((Future) discarded).cancel(false);
} else {
LOGGER.log(Level.WARNING, "HealthCheck Runnable is not an instance of Future: {0}", discarded);
}
}

@Restricted(DoNotUse.class) // testing only
public static long getRejectedExecutions() {
return rejectedExecutions;
}

/**
* Log the rejection, discard the first (oldest) item in the queue and retry. Should only happen when beforeExecute
* is called simultaneously and doesn't preemptively avoid going over the calculated max size for the queue.
*/
private static class MetricsRejectedExecutionHandler implements RejectedExecutionHandler {

private HealthCheckRegistry healthCheckRegistry;

public MetricsRejectedExecutionHandler(HealthCheckRegistry healthCheckRegistry) {
this.healthCheckRegistry = healthCheckRegistry;
}

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
rejectedExecutions++;
LOGGER.log(Level.WARNING,
"Execution of health check was rejected:" + " {0}, queue size={1}, health checks={2} ({3})",
new Object[] { executor, executor.getQueue().size(), healthCheckRegistry.getNames(),
healthCheckRegistry.getNames().size() });

// copied from DiscardOldestPolicy to ensure health check gets cancelled
if (!executor.isShutdown()) {
dropOldestInQueue(executor, healthCheckRegistry);
executor.execute(r);
}
}
}

}
@@ -0,0 +1,55 @@
/*
* The MIT License
*
* Copyright (c) 2016, CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package jenkins.metrics.api;

import java.util.Map;
import java.util.Map.Entry;

import com.codahale.metrics.health.HealthCheck;

import edu.umd.cs.findbugs.annotations.NonNull;
import hudson.Extension;

@Extension
public class HealthCheckProviderForTesting extends HealthCheckProvider {

public static int runs;

@NonNull
@Override
public Map<String, HealthCheck> getHealthChecks() {
return checks(check(1), check(2), check(3), check(4), check(5), check(6));
}

private Entry<String, HealthCheck> check(int i) {
return check("short-running-health-check-" + i, new HealthCheck() {
@Override
protected Result check() throws Exception {
runs++;
Thread.sleep(1 * 1000);
return Result.unhealthy("some error message");
}
});
}
}
51 changes: 51 additions & 0 deletions src/test/java/jenkins/metrics/api/HealthCheckerTest.java
@@ -0,0 +1,51 @@
/*
* The MIT License
*
* Copyright (c) 2016, CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package jenkins.metrics.api;

import static org.junit.Assert.*;

import org.junit.Rule;
import org.junit.Test;
import org.jvnet.hudson.test.JenkinsRule;

import jenkins.metrics.api.Metrics.HealthChecker;
import jenkins.metrics.util.HealthChecksThreadPool;

/**
* Test the {@link HealthChecker} execution of health checks
*/
public class HealthCheckerTest {

@Rule
public JenkinsRule j = new JenkinsRule();

@Test
public void testHealthChecksAreNotRejected() throws Exception {
while (HealthCheckProviderForTesting.runs < 6 && HealthChecksThreadPool.getRejectedExecutions() == 0) {
Thread.sleep(1000);
}
assertEquals(0, HealthChecksThreadPool.getRejectedExecutions());
}

}

0 comments on commit ef2d2f9

Please sign in to comment.