Skip to content

Commit

Permalink
[JENKINS-21251] Initial calculation of wait time was susseptable to
Browse files Browse the repository at this point in the history
clock drift.

The calculation on how long to wait in various places on the code was
suseptable to the clock changing as multiple calls where made to
System.currentTimeMillis() for the initial calculation.

Changed this so that we use a single call to System.nanoTime and made just
a single call for the initial calculation.

There is still a potential issue for any callers of Channel.getLastHeard()
this will be addressed in a future change and has only been noted in this
commit.
  • Loading branch information
jtnord committed Feb 20, 2015
1 parent e62cef4 commit 75fb355
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 36 deletions.
9 changes: 5 additions & 4 deletions src/main/java/hudson/remoting/AtmostOneThreadExecutor.java
Expand Up @@ -71,10 +71,11 @@ public boolean isTerminated() {

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
synchronized (q) {
long start = System.currentTimeMillis();
long end = start+unit.toMillis(timeout);
while (isAlive() && System.currentTimeMillis()<end) {
q.wait(end-System.currentTimeMillis());
long now = System.nanoTime();
long end = now + unit.toNanos(timeout);
while (isAlive() && (end - now > 0L)) {
q.wait(TimeUnit.NANOSECONDS.toMillis(end - now));
now = System.nanoTime();
}
}
return isTerminated();
Expand Down
19 changes: 15 additions & 4 deletions src/main/java/hudson/remoting/Channel.java
Expand Up @@ -51,6 +51,7 @@
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
Expand Down Expand Up @@ -476,7 +477,7 @@ protected Channel(ChannelBuilder settings, CommandTransport transport) throws IO

transport.setup(this, new CommandReceiver() {
public void handle(Command cmd) {
lastHeard = System.currentTimeMillis();
updateLastHeard();
if (logger.isLoggable(Level.FINE))
logger.fine("Received " + cmd);
try {
Expand Down Expand Up @@ -1006,9 +1007,12 @@ public void checkRoles(RoleChecker checker) throws SecurityException {
* @since 1.299
*/
public synchronized void join(long timeout) throws InterruptedException {
long start = System.currentTimeMillis();
while(System.currentTimeMillis()-start<timeout && (inClosed==null || outClosed==null))
wait(timeout+start-System.currentTimeMillis());
long now = System.nanoTime();
long end = now + TimeUnit.MILLISECONDS.toNanos(timeout);
while ((end - now > 0L) && (inClosed == null || outClosed == null)) {
wait(TimeUnit.NANOSECONDS.toMillis(end - now));
now = System.nanoTime();
}
}

/**
Expand Down Expand Up @@ -1386,12 +1390,19 @@ public ExportTable.ExportList startExportRecording() {
}

/**
* TODO: this is not safe against clock skew and is called from jenkins core (and potentially plugins)
* @see #lastHeard
*/
public long getLastHeard() {
// TODO - this is not safe against clock skew and is called from jenkins core (and potentially plugins)
return lastHeard;
}

private void updateLastHeard() {
// TODO - this is not safe against clock skew and is called from jenkins core (and potentially plugins)
lastHeard = System.currentTimeMillis();
}

/*package*/ static Channel setCurrent(Channel channel) {
Channel old = CURRENT.get();
CURRENT.set(channel);
Expand Down
27 changes: 14 additions & 13 deletions src/main/java/hudson/remoting/PingThread.java
Expand Up @@ -23,17 +23,16 @@
*/
package hudson.remoting;

import org.jenkinsci.remoting.Role;
import org.jenkinsci.remoting.RoleChecker;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
* Periodically perform a ping.
*
Expand All @@ -42,7 +41,7 @@
* or when the disconnection is not properly detected.
*
* <p>
* {@link #onDead()} method needs to be overrided to define
* {@link #onDead()} method needs to be overridden to define
* what to do when a connection appears to be dead.
*
* @author Kohsuke Kawaguchi
Expand Down Expand Up @@ -81,14 +80,15 @@ public PingThread(Channel channel) {
public void run() {
try {
while(true) {
long nextCheck = System.currentTimeMillis()+interval;
long nextCheck = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(interval);

ping();

// wait until the next check
long diff;
while((diff=nextCheck-System.currentTimeMillis())>0)
Thread.sleep(diff);
while((diff = nextCheck - System.nanoTime()) > 0) {
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(diff));
}
}
} catch (ChannelClosedException e) {
LOGGER.fine(getName()+" is closed. Terminating");
Expand All @@ -103,13 +103,13 @@ public void run() {
private void ping() throws IOException, InterruptedException {
Future<?> f = channel.callAsync(new Ping());
long start = System.currentTimeMillis();
long end = start +timeout;

long remaining;
long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout);
long remaining = end - System.nanoTime();

do {
remaining = end-System.currentTimeMillis();
try {
f.get(Math.max(0,remaining),MILLISECONDS);
f.get(Math.max(TimeUnit.MILLISECONDS.toNanos(10),remaining),TimeUnit.NANOSECONDS);
return;
} catch (ExecutionException e) {
if (e.getCause() instanceof RequestAbortedException)
Expand All @@ -120,9 +120,10 @@ private void ping() throws IOException, InterruptedException {
// get method waits "at most the amount specified in the timeout",
// so let's make sure that it really waited enough
}
remaining = end - System.nanoTime();
} while(remaining>0);

onDead(new TimeoutException("Ping started on "+start+" hasn't completed at "+System.currentTimeMillis()));//.initCause(e)
onDead(new TimeoutException("Ping started at "+start+" hasn't completed by "+System.currentTimeMillis()));//.initCause(e)
}

/**
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/hudson/remoting/Request.java
Expand Up @@ -256,21 +256,22 @@ public RSP get() throws InterruptedException, ExecutionException {
}

public RSP get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
synchronized(Request.this) {
synchronized (Request.this) {
// wait until the response arrives
// Note that the wait method can wake up for no reasons at all (AKA spurious wakeup),
long end = System.currentTimeMillis() + unit.toMillis(timeout);
long now;
while(response==null && (now=System.currentTimeMillis())<end) {
long now = System.nanoTime();
long end = now + unit.toNanos(timeout);
while (response == null && (end - now > 0L)) {
if (isCancelled()) {
throw new CancellationException();
}
Request.this.wait(Math.max(1,end-now));
Request.this.wait(Math.max(1, TimeUnit.NANOSECONDS.toMillis(end - now)));
now = System.nanoTime();
}
if(response==null)
if (response == null)
throw new TimeoutException();

if(response.exception!=null)
if (response.exception != null)
throw new ExecutionException(response.exception);

return response.returnValue;
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/hudson/remoting/SingleLaneExecutorService.java
Expand Up @@ -86,10 +86,11 @@ public synchronized boolean isTerminated() {
}

public synchronized boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long start = System.currentTimeMillis();
long end = start+unit.toMillis(timeout);
while (!isTerminated() && System.currentTimeMillis()<end) {
wait(end - System.currentTimeMillis());
long now = System.nanoTime();
long end = now + unit.toNanos(timeout);
while (!isTerminated() && (end - now) > 0L) {
wait(TimeUnit.NANOSECONDS.toMillis(end - now));
now = System.nanoTime();
}
return isTerminated();
}
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/hudson/remoting/SynchronousExecutorService.java
Expand Up @@ -33,12 +33,16 @@ public synchronized boolean isTerminated() {
}

public synchronized boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long end = System.currentTimeMillis() + unit.toMillis(timeout);
long now = System.nanoTime();
long end = now + unit.toNanos(timeout);

while (count!=0) {
long d = end - System.currentTimeMillis();
if (d<0) return false;
wait(d);
long d = end - now;
if (d<0) {
return false;
}
wait(TimeUnit.NANOSECONDS.toMillis(d));
now = System.nanoTime();
}
return true;
}
Expand Down

0 comments on commit 75fb355

Please sign in to comment.