Skip to content

Commit

Permalink
[JENKINS-47965, JENKINS-38696] - IOHub should not wait infinitely for…
Browse files Browse the repository at this point in the history
… Selector in Windows. (#221)

* [JENKINS-47965, JENKINS-38696] - IOHub should not wait infinitely for Selector in Windows.

Windows implementation of NIO Channel Selector does not request "Select now" in https://github.com/jenkinsci/remoting/blob/6165d6fb6a71a7f4fce52ce5fc4cac479052ce04/src/main/java/org/jenkinsci/remoting/protocol/IOHub.java#L436 and hence calls selector#select()... and this method has no timeout.
When the code gets into this branch, Selector will be waiting infinitely without calling selector#isOpen(). Such implementation relies on the Selector implementation, which shout interrupt the select() wait if the selector is being closed. But apparently it does not no my machine (Win 10, amd64, Oracle JDK 8u131)

This change adds wait timeout and makes the IOHub implementation to loop in the logic.

* [JENKINS-47965] - Lame implementation of IOHub Selector wakeup thread for Windows

* [JENKINS-47965] - Polish the implementation

* [JENKINS-38696] - Add Windows back to CI and make @jtnord happy

* [JENKINS-47965] - Simplify the logic

* [JENKINS-47965] - Replace Thread.sleep() by object wait()/notify()

* [JENKINS-47965] - Cleanup the locking logic

* [JENKINS-47965] - Address comments from @jtnord, run Selector Watcher in Unix systems as well
  • Loading branch information
oleg-nenashev committed Dec 22, 2017
1 parent 3f07563 commit a916c64
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 6 deletions.
3 changes: 1 addition & 2 deletions Jenkinsfile
Expand Up @@ -8,8 +8,7 @@ properties([[$class: 'BuildDiscarderProperty',
/* These platforms correspond to labels in ci.jenkins.io, see:
* https://github.com/jenkins-infra/documentation/blob/master/ci.adoc
*/
//TODO: Enable Windows once JENKINS-38696 is fixed. No sense to spend CPU cycles before that
List platforms = ['linux']
List platforms = ['linux', 'windows']
Map branches = [:]

for (int i = 0; i < platforms.size(); ++i) {
Expand Down
76 changes: 72 additions & 4 deletions src/main/java/org/jenkinsci/remoting/protocol/IOHub.java
Expand Up @@ -55,6 +55,7 @@
import javax.annotation.Nonnull;
import javax.annotation.OverridingMethodsMustInvokeSuper;
import javax.annotation.concurrent.GuardedBy;

import org.jenkinsci.remoting.util.ByteBufferPool;
import org.jenkinsci.remoting.util.DirectByteBufferPool;
import org.kohsuke.accmod.Restricted;
Expand All @@ -71,6 +72,13 @@ public class IOHub implements Executor, Closeable, Runnable, ByteBufferPool {
* Our logger.
*/
private static final Logger LOGGER = Logger.getLogger(IOHub.class.getName());

/**
* Defines the Selector wakeup timeout via a system property. Defaults to {@code 1000ms}.
* @since TODO
*/
private static final long SELECTOR_WAKEUP_TIMEOUT_MS = Long.getLong(IOHub.class.getName() + ".selectorWakeupTimeout", 1000);

/**
* The next ID to use.
*/
Expand All @@ -83,6 +91,9 @@ public class IOHub implements Executor, Closeable, Runnable, ByteBufferPool {
* Our selector.
*/
private final Selector selector;
private volatile boolean ioHubRunning = false;
private final Object selectorLockObject = new Object();

/**
* Our executor.
*/
Expand Down Expand Up @@ -126,6 +137,7 @@ public class IOHub implements Executor, Closeable, Runnable, ByteBufferPool {
*/
private IOHub(Executor executor) throws IOException {
this.selector = Selector.open();
this.ioHubRunning = true;
this.executor = executor;
this.bufferPool = new DirectByteBufferPool(16916, Runtime.getRuntime().availableProcessors() * 4);
}
Expand All @@ -140,6 +152,8 @@ private IOHub(Executor executor) throws IOException {
public static IOHub create(Executor executor) throws IOException {
IOHub result = new IOHub(executor);
executor.execute(result);
LOGGER.log(Level.FINE, "Staring an additional Selector wakeup thread. See JENKINS-47965 for more info");
executor.execute(new IOHubSelectorWatcher(result));
return result;
}

Expand Down Expand Up @@ -408,6 +422,10 @@ public final void unregister(SelectableChannel channel) {
selectionKey.attach(null);
}

private String getThreadNameBase(String executorThreadName) {
return "IOHub#" + _id + ": Selector[keys:" + selector.keys().size() + ", gen:" + gen + "] / " + executorThreadName;
}

/**
* {@inheritDoc}
*/
Expand All @@ -419,9 +437,7 @@ public final void run() {
long cpuOverheatProtection = System.nanoTime();
try {
while (isOpen()) {
selectorThread.setName(
"IOHub#" + _id + ": Selector[keys:" + selector.keys().size() + ", gen:" + gen + "] / " + oldName
);
selectorThread.setName(getThreadNameBase(oldName));
try {
processScheduledTasks();
boolean wantSelectNow = processRegistrations();
Expand All @@ -433,8 +449,12 @@ public final void run() {
// in an immediately ready selection key, hence we use the non-blocking form
selected = selector.selectNow();
} else {
selected = selector.select(); // WINDOWS!!! Waiting the whole timeout anyway if timeout specified?!?
// On Windows the select(timeout) operation ALWAYS waits for the timeout,
// so we workaround it by IOHubSelectorWatcher
// "Ubuntu on Windows also qualifies as Windows, so we just rely on the wakeup thread ad use infinite timeout"
selected = selector.select();
}

if (selected == 0) {
// don't stress the GC by creating instantiating the selected keys
continue;
Expand Down Expand Up @@ -489,6 +509,54 @@ public final void run() {
// ignore, happens routinely
} finally {
selectorThread.setName(oldName);
ioHubRunning = false;
synchronized (selectorLockObject) {
selectorLockObject.notifyAll();
}
}
}

/**
* This is an artificial thread, which monitors IOHub Selector and wakes it up if it waits for more than 1 second.
* It is a workaround for Selector#select(long timeout) on Windows, where the call always waits for the entire timeout before returning back.
* Since the same behavior happens on Unix emulation layer in Windows, we run this thread on Unix platforms as well.
*/
private static class IOHubSelectorWatcher implements Runnable {

private final IOHub iohub;

public IOHubSelectorWatcher(IOHub iohub) {
this.iohub = iohub;
}

@Override
public void run() {
final Thread watcherThread = Thread.currentThread();
final String oldName = watcherThread.getName();
final String watcherName = "Windows IOHub Watcher for " + iohub.getThreadNameBase(oldName);
LOGGER.log(Level.FINEST, "{0}: Started", watcherName);
try {
watcherThread.setName(watcherName);
while (true) {
synchronized (iohub.selectorLockObject) {
if (iohub.ioHubRunning) {
iohub.selectorLockObject.wait(SELECTOR_WAKEUP_TIMEOUT_MS);
} else {
break;
}
}
iohub.selector.wakeup();
}
} catch (InterruptedException ex) {
// interrupted
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "{0}: Interrupted", ex);
}
return;
} finally {
watcherThread.setName(oldName);
LOGGER.log(Level.FINEST, "{0}: Finished", watcherName);
}
}
}

Expand Down

0 comments on commit a916c64

Please sign in to comment.