Navigation Menu

Skip to content

Commit

Permalink
[FIXED JENKINS-9882] Use Java5 ExecutorService for thread management.
Browse files Browse the repository at this point in the history
It is a better implementation of the thread pool, except one catch
about how it prefers queueing over adding more threads.
  • Loading branch information
kohsuke committed Mar 15, 2012
1 parent 9108ce8 commit ae51133
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 319 deletions.
116 changes: 116 additions & 0 deletions src/java/winstone/BoundedExecutorService.java
@@ -0,0 +1,116 @@
package winstone;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Wraps {@link Executor} so that we only ask the wrapped Executor to execute N number of tasks
* at any given time.
*
* <p>
* The intention is to use this with {@link ThreadPoolExecutor} with {@link SynchronousQueue}
* with unbounded max capacity (so that for up to N tasks we keep creating more threads for work,
* but beyond that we start to push the tasks into the queue of an infinite capacity.)
*
* <p>
* This is necessary because {@link ThreadPoolExecutor} tries to push work into the queue
* first and only create more threads once the queue is full, so for a queue with infinite
* capacity it'll never create threads beyond the core pool size.
* See http://www.kimchy.org/juc-executorservice-gotcha/ for more discussion of this.
*
* <p>
* Because there's no call back to tell us when the wrapped {@link ExecutorService} has
* finished executing something, this class needs to hand out the next task slightly
* before the wrapped {@link ExecutorService} is done with the previous task. The net result
* is that the wrapped {@link ExecutorService} will end up running N+1 threads (of which
* 1 is almost always idle.) I'm not sure how to fix this.
*
* @author Kohsuke Kawaguchi
*/
public class BoundedExecutorService extends AbstractExecutorService {
/**
* The FIFO queue of tasks waiting to be handed to the wrapped {@link ExecutorService}.
*/
private final List<Runnable> tasks = new LinkedList<Runnable>();

private final ExecutorService base;
private final int max;

/**
* How many tasks the wrapped {@link ExecutorService} is executing right now?
* Touched only in a synchronized block.
*/
private int current;

private boolean isShutdown = false;

public BoundedExecutorService(ExecutorService base, int max) {
this.base = base;
this.max = max;
}

public synchronized void execute(final Runnable r) {
if (isShutdown)
throw new RejectedExecutionException("already shut down");
tasks.add(r);
if (current < max)
scheduleNext();
}

private synchronized void scheduleNext() {
if (tasks.isEmpty()) {
if (isShutdown)
base.shutdown();
return;
}
final Runnable task = tasks.remove(0);
base.execute(new Runnable() {
public void run() {
try {
task.run();
} finally {
done();
}
}
});
current++;
}

private synchronized void done() {
current--;
scheduleNext(); // we already know that current<max
}

public synchronized void shutdown() {
isShutdown = true;
if (tasks.isEmpty())
base.shutdown();
}

public synchronized List<Runnable> shutdownNow() {
isShutdown = true;
List<Runnable> r = base.shutdownNow();
r.addAll(tasks);
tasks.clear();
return r;
}

public boolean isShutdown() {
return isShutdown;
}

public boolean isTerminated() {
return base.isTerminated();
}

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return base.awaitTermination(timeout,unit);
}
}
182 changes: 42 additions & 140 deletions src/java/winstone/ObjectPool.java
Expand Up @@ -11,10 +11,14 @@
import java.io.IOException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Holds the object pooling code for Winstone. Presently this is only responses
Expand All @@ -23,30 +27,27 @@
* @author <a href="mailto:rick_knowles@hotmail.com">Rick Knowles</a>
* @version $Id: ObjectPool.java,v 1.9 2006/11/18 14:56:59 rickknowles Exp $
*/
public class ObjectPool implements Runnable {
public class ObjectPool {
private static final long FLUSH_PERIOD = 60000L;

private int STARTUP_REQUEST_HANDLERS_IN_POOL = 5;
private int MAX_IDLE_REQUEST_HANDLERS_IN_POOL = 50;
private int MAX_REQUEST_HANDLERS_IN_POOL = 1000;
private int MAX_IDLE_REQUEST_HANDLERS_IN_POOL = 20;
private int MAX_REQUEST_HANDLERS_IN_POOL = 200;
private long RETRY_PERIOD = 1000;
private int START_REQUESTS_IN_POOL = 10;
private int MAX_REQUESTS_IN_POOL = 1000;
private int START_RESPONSES_IN_POOL = 10;
private int MAX_RESPONSES_IN_POOL = 1000;
private List unusedRequestHandlerThreads;
private List usedRequestHandlerThreads;
private List unusedRequestPool;
private List unusedResponsePool;
private Object requestHandlerSemaphore = true;
private Object requestPoolSemaphore = true;
private Object responsePoolSemaphore = true;
private int threadIndex = 0;

private final ExecutorService requestHandler;

private List<WinstoneRequest> unusedRequestPool;
private List<WinstoneResponse> unusedResponsePool;
private final Object requestPoolSemaphore = new Object();
private final Object responsePoolSemaphore = new Object();
private boolean simulateModUniqueId;
private boolean saveSessions;

private Thread thread;

/**
* Constructs an instance of the object pool, including handlers, requests
* and responses
Expand All @@ -55,9 +56,22 @@ public ObjectPool(Map args) throws IOException {
this.simulateModUniqueId = Option.SIMULATE_MOD_UNIQUE_ID.get(args);
this.saveSessions = Option.USE_SAVED_SESSIONS.get(args);

// Build the initial pool of handler threads
this.unusedRequestHandlerThreads = new ArrayList();
this.usedRequestHandlerThreads = new ArrayList();
ExecutorService es = new ThreadPoolExecutor(MAX_IDLE_REQUEST_HANDLERS_IN_POOL, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, // idle thread will only hang around for 60 secs
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
private int threadIndex;
public synchronized Thread newThread(Runnable r) {
String threadName = Launcher.RESOURCES.getString(
"RequestHandlerThread.ThreadName", "" + (++threadIndex));

// allocate a thread to run on this object
Thread thread = new Thread(r, threadName);
thread.setDaemon(true);
return thread;
}
});
requestHandler = new BoundedExecutorService(es,MAX_REQUEST_HANDLERS_IN_POOL);

// Build the request/response pools
this.unusedRequestPool = new ArrayList();
Expand All @@ -68,143 +82,31 @@ public ObjectPool(Map args) throws IOException {
MAX_REQUEST_HANDLERS_IN_POOL = Option.HANDLER_COUNT_MAX.get(args);
MAX_IDLE_REQUEST_HANDLERS_IN_POOL = Option.HANDLER_COUNT_MAX_IDLE.get(args);

// Start the base set of handler threads
for (int n = 0; n < STARTUP_REQUEST_HANDLERS_IN_POOL; n++) {
this.unusedRequestHandlerThreads
.add(new RequestHandlerThread(this,
this.threadIndex++, this.simulateModUniqueId,
this.saveSessions));
}

// Initialise the request/response pools
for (int n = 0; n < START_REQUESTS_IN_POOL; n++) {
this.unusedRequestPool.add(new WinstoneRequest());
}
for (int n = 0; n < START_RESPONSES_IN_POOL; n++) {
this.unusedResponsePool.add(new WinstoneResponse());
}

this.thread = new Thread(this, "WinstoneObjectPoolMgmt");
this.thread.setDaemon(true);
this.thread.start();
}

public void run() {
boolean interrupted = false;
while (!interrupted) {
try {
Thread.sleep(FLUSH_PERIOD);
removeUnusedRequestHandlers();
} catch (InterruptedException err) {
interrupted = true;
}
}
this.thread = null;
}

private void removeUnusedRequestHandlers() {
// Check max idle requestHandler count
synchronized (this.requestHandlerSemaphore) {
// If we have too many idle request handlers
while (this.unusedRequestHandlerThreads.size() > MAX_IDLE_REQUEST_HANDLERS_IN_POOL) {
RequestHandlerThread rh = (RequestHandlerThread) this.unusedRequestHandlerThreads.get(0);
rh.destroy();
this.unusedRequestHandlerThreads.remove(rh);
}
}
}

public void destroy() {
synchronized (this.requestHandlerSemaphore) {
Collection usedHandlers = new ArrayList(this.usedRequestHandlerThreads);
for (Object usedHandler : usedHandlers) releaseRequestHandler((RequestHandlerThread) usedHandler);
Collection unusedHandlers = new ArrayList(this.unusedRequestHandlerThreads);
for (Object unusedHandler : unusedHandlers) ((RequestHandlerThread) unusedHandler).destroy();
this.unusedRequestHandlerThreads.clear();
}
if (this.thread != null) {
this.thread.interrupt();
}
requestHandler.shutdown();
}

/**
* Once the socket request comes in, this method is called. It reserves a
* request handler, then delegates the socket to that class. When it
* finishes, the handler is released back into the pool.
*/
public void handleRequest(Socket socket, Listener listener)
throws IOException, InterruptedException {
RequestHandlerThread rh = null;
synchronized (this.requestHandlerSemaphore) {
// If we have any spare, get it from the pool
int unused = this.unusedRequestHandlerThreads.size();
if (unused > 0) {
rh = (RequestHandlerThread) this.unusedRequestHandlerThreads.remove(unused - 1);
this.usedRequestHandlerThreads.add(rh);
Logger.log(Logger.FULL_DEBUG, Launcher.RESOURCES,
"ObjectPool.UsingRHPoolThread", "" + this.usedRequestHandlerThreads.size(),
"" + this.unusedRequestHandlerThreads.size());
}

// If we are out (and not over our limit), allocate a new one
else if (this.usedRequestHandlerThreads.size() < MAX_REQUEST_HANDLERS_IN_POOL) {
rh = new RequestHandlerThread(this,
this.threadIndex++, this.simulateModUniqueId,
this.saveSessions);
this.usedRequestHandlerThreads.add(rh);
Logger.log(Logger.FULL_DEBUG, Launcher.RESOURCES,
"ObjectPool.NewRHPoolThread", "" + this.usedRequestHandlerThreads.size(),
"" + this.unusedRequestHandlerThreads.size());
}

// otherwise throw fail message - we've blown our limit
else {
// Possibly insert a second chance here ? Delay and one retry ?
// Remember to release the lock first
Logger.log(Logger.WARNING, Launcher.RESOURCES,
"ObjectPool.NoRHPoolThreadsRetry");
// socket.close();
// throw new UnavailableException("NoHandlersAvailable");
}
}

if (rh != null)
rh.commenceRequestHandling(socket, listener);
else {
// Sleep for a set period and try again from the pool
Thread.sleep(RETRY_PERIOD);

synchronized (this.requestHandlerSemaphore) {
if (this.usedRequestHandlerThreads.size() < MAX_REQUEST_HANDLERS_IN_POOL) {
rh = new RequestHandlerThread(this,
this.threadIndex++, this.simulateModUniqueId,
this.saveSessions);
this.usedRequestHandlerThreads.add(rh);
Logger.log(Logger.FULL_DEBUG, Launcher.RESOURCES,
"ObjectPool.NewRHPoolThread", "" + this.usedRequestHandlerThreads.size(),
"" + this.unusedRequestHandlerThreads.size());
}
}
if (rh != null)
rh.commenceRequestHandling(socket, listener);
else {
Logger.log(Logger.WARNING, Launcher.RESOURCES,
"ObjectPool.NoRHPoolThreads");
socket.close();
}
}
}

/**
* Release the handler back into the pool
*/
public void releaseRequestHandler(RequestHandlerThread rh) {
synchronized (this.requestHandlerSemaphore) {
this.usedRequestHandlerThreads.remove(rh);
this.unusedRequestHandlerThreads.add(rh);
Logger.log(Logger.FULL_DEBUG, Launcher.RESOURCES,
"ObjectPool.ReleasingRHPoolThread", "" + this.usedRequestHandlerThreads.size(),
"" + this.unusedRequestHandlerThreads.size());
public void handleRequest(Socket socket, Listener listener) throws IOException, InterruptedException {
try {
requestHandler.submit(new RequestHandlerThread(this.simulateModUniqueId,this.saveSessions,socket,listener));
} catch (RejectedExecutionException e) {
Logger.log(Logger.WARNING, Launcher.RESOURCES,
"ObjectPool.NoRHPoolThreads");
socket.close();
}
}

Expand All @@ -217,7 +119,7 @@ public WinstoneRequest getRequestFromPool() throws IOException {
// If we have any spare, get it from the pool
int unused = this.unusedRequestPool.size();
if (unused > 0) {
req = (WinstoneRequest) this.unusedRequestPool.remove(unused - 1);
req = this.unusedRequestPool.remove(unused - 1);
Logger.log(Logger.FULL_DEBUG, Launcher.RESOURCES,
"ObjectPool.UsingRequestFromPool", ""
+ this.unusedRequestPool.size());
Expand Down Expand Up @@ -250,7 +152,7 @@ public WinstoneResponse getResponseFromPool() {
// If we have any spare, get it from the pool
int unused = this.unusedResponsePool.size();
if (unused > 0) {
rsp = (WinstoneResponse) this.unusedResponsePool.remove(unused - 1);
rsp = this.unusedResponsePool.remove(unused - 1);
Logger.log(Logger.FULL_DEBUG, Launcher.RESOURCES,
"ObjectPool.UsingResponseFromPool", ""
+ this.unusedResponsePool.size());
Expand Down

0 comments on commit ae51133

Please sign in to comment.