Skip to content

Commit

Permalink
Merge pull request #54 from jenkinsci/fix-memory-leak
Browse files Browse the repository at this point in the history
[JENKINS-28844] Fix memory leak
  • Loading branch information
kohsuke committed Jul 13, 2015
2 parents 05329f5 + b6714b7 commit bf81838
Show file tree
Hide file tree
Showing 9 changed files with 534 additions and 41 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -373,7 +373,7 @@ THE SOFTWARE.
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<version>2.5.2</version>
<version>3.0.1</version>
<configuration>
<excludeFilterFile>${basedir}/src/findbugs-filter.xml</excludeFilterFile>
<failOnError>true</failOnError>
Expand Down
132 changes: 121 additions & 11 deletions src/main/java/hudson/remoting/Channel.java
Expand Up @@ -23,16 +23,17 @@
*/
package hudson.remoting;

import org.jenkinsci.remoting.CallableDecorator;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import hudson.remoting.CommandTransport.CommandReceiver;
import hudson.remoting.PipeWindow.Key;
import hudson.remoting.PipeWindow.Real;
import hudson.remoting.forward.ForwarderFactory;
import hudson.remoting.forward.ListeningPort;
import hudson.remoting.forward.PortForwarder;
import org.jenkinsci.remoting.CallableDecorator;
import org.jenkinsci.remoting.RoleChecker;

import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -153,12 +154,7 @@ public class Channel implements VirtualChannel, IChannel, Closeable {
* Remembers last I/O ID issued from locally to the other side, per thread.
* int[1] is used as a holder of int.
*/
private final ThreadLocal<int[]> lastIoId = new ThreadLocal<int[]>() {
@Override
protected int[] initialValue() {
return new int[1];
}
};
private final ThreadLocal<int[]> lastIoId = new ThreadLastIoId();

/**
* Records the {@link Request}s being executed on this channel, sent by the remote peer.
Expand Down Expand Up @@ -194,6 +190,14 @@ protected int[] initialValue() {
* and therefore should be OK.
*/
private final WeakHashMap<PipeWindow.Key, WeakReference<PipeWindow>> pipeWindows = new WeakHashMap<PipeWindow.Key, WeakReference<PipeWindow>>();
/**
* There are cases where complex object cycles can cause a closed channel to fail to be garbage collected,
* these typically arrise when an {@link #export(Class, Object)} is {@link #setProperty(Object, Object)}
* (a supported and intended use case), the {@link Ref} allows us to break the object cycle on channel
* termination and simplify the circles into chains which can then be collected easily by the garbage collector.
* @since FIXME after merge
*/
private final Ref reference = new Ref(this);

/**
* Registered listeners.
Expand Down Expand Up @@ -504,6 +508,17 @@ public void terminate(IOException e) {
});
}

/**
* Gets the {@link Ref} for this {@link Channel}. The {@link Ref} will be {@linkplain Ref#clear()}ed when
* the channel is terminated in order to break any complex object cycles.
* @return the {@link Ref} for this {@link Channel}
* @since FIXME after merge
*/
@Nonnull
/*package*/ Ref ref() {
return reference;
}

/**
* Callback "interface" for changes in the state of {@link Channel}.
*/
Expand Down Expand Up @@ -814,6 +829,10 @@ public void terminate(IOException e) {
synchronized (this) {
if (e == null) throw new IllegalArgumentException();
outClosed = inClosed = e;
// we need to clear these out early in order to ensure that a GC operation while
// proceding with the close does not result in a batch of UnexportCommand instances
// being attempted to send over the locked and now closing channel.
RemoteInvocationHandler.notifyChannelTermination(this);
try {
transport.closeRead();
} catch (IOException x) {
Expand All @@ -833,6 +852,8 @@ public void terminate(IOException e) {
executingCalls.clear();
}
exportedObjects.abort(e);
// break any object cycles into simple chains to simplify work for the garbage collector
reference.clear();
} finally {
notifyAll();
}
Expand Down Expand Up @@ -936,7 +957,7 @@ public boolean isInClosed() {
* Returns true if this channel has any of the security restrictions enabled.
*
* @deprecated
* Use methods like {@link #allowsRemoteClassLoading()} and {@link #allowsArbitraryCallable()}
* Use methods like {@link #isRemoteClassLoadingAllowed()} and {@link #isArbitraryCallableAllowed()}
* to test individual features.
*/
@Deprecated
Expand All @@ -948,7 +969,7 @@ public boolean isRestricted() {
* Activates/disables all the security restriction mode.
*
* @deprecated
* Use methods like {@link #allowClassLoading(boolean)} and {@link #allowArbitraryCallable(boolean)}
* Use methods like {@link #setRemoteClassLoadingAllowed(boolean)} and {@link #setArbitraryCallableAllowed(boolean)}
* to control individual features.
*/
@Deprecated
Expand Down Expand Up @@ -1128,7 +1149,14 @@ public synchronized void close() throws IOException {
public synchronized void close(Throwable diagnosis) throws IOException {
if(outClosed!=null) return; // already closed

send(new CloseCommand(this,diagnosis));
try {
send(new CloseCommand(this, diagnosis));
} catch (IOException e) {
// send should only ever - worst case - throw an IOException so we'll just catch that and not Throwable
logger.log(Level.WARNING, "Having to terminate early", e);
terminate(e);
return;
}
outClosed = new IOException().initCause(diagnosis); // last command sent. no further command allowed. lock guarantees that no command will slip inbetween
notifyAll();
try {
Expand Down Expand Up @@ -1430,7 +1458,11 @@ private void updateLastHeard() {

/*package*/ static Channel setCurrent(Channel channel) {
Channel old = CURRENT.get();
CURRENT.set(channel);
if (channel == null) {
CURRENT.remove();
} else {
CURRENT.set(channel);
}
return old;
}

Expand Down Expand Up @@ -1484,4 +1516,82 @@ public static Channel current() {
// to avoid situations like this, create proxy classes that we need during the classloading
jarLoaderProxy=RemoteInvocationHandler.getProxyClass(JarLoader.class);
}

/**
* Do not use an anonymous inner class as that can cause a {@code this} reference to escape.
*/
private static class ThreadLastIoId extends ThreadLocal<int[]> {
@Override
protected int[] initialValue() {
return new int[1];
}
}

/**
* A reference for the {@link Channel} that can be cleared out on {@link #close()}/{@link #terminate(IOException)}.
* Could probably be replaced with {@link AtomicReference} but then we would not retain the only change being
* from valid channel to {@code null} channel symmantics of this class.
* @since FIXME after merge
* @see #reference
*/
/*package*/ static final class Ref {
/**
* The channel.
*/
@CheckForNull
private Channel channel;

/**
* Constructor.
* @param channel the {@link Channel}.
*/
private Ref(@CheckForNull Channel channel) {
this.channel = channel;
}

/**
* Returns the {@link Channel} or {@code null} if the the {@link Channel} has been closed/terminated.
* @return the {@link Channel} or {@code null}
*/
@CheckForNull
public Channel channel() {
return channel;
}

/**
* Clears the {@link #channel} to signify that the {@link Channel} has been closed and break any complex
* object cycles that might prevent the full garbage collection of the channel's associated object tree.
*/
public void clear() {
channel = null;
}

/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object o) {
// compare based on instance identity
return this == o;
}

/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
return System.identityHashCode(this);
}

/**
* {@inheritDoc}
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Channel.Ref{");
sb.append("channel=").append(channel);
sb.append('}');
return sb.toString();
}
}
}
4 changes: 3 additions & 1 deletion src/main/java/hudson/remoting/JarCacheSupport.java
Expand Up @@ -36,7 +36,9 @@ public abstract class JarCacheSupport extends JarCache {
/**
* Throttle the jar downloading activity so that it won't eat up all the channel bandwidth.
*/
private final ExecutorService downloader = new AtmostOneThreadExecutor();
private final ExecutorService downloader = new AtmostOneThreadExecutor(
new NamingThreadFactory(new DaemonThreadFactory(), JarCacheSupport.class.getSimpleName())
);

@Override
public Future<URL> resolve(final Channel channel, final long sum1, final long sum2) throws IOException, InterruptedException {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/hudson/remoting/Launcher.java
Expand Up @@ -158,7 +158,7 @@ public void setConnectTo(String target) {
System.err.println("Illegal parameter: "+target);
System.exit(1);
}
connectionTarget = new InetSocketAddress(tokens[0],Integer.valueOf(tokens[1]));
connectionTarget = new InetSocketAddress(tokens[0],Integer.parseInt(tokens[1]));
}

/**
Expand Down
56 changes: 56 additions & 0 deletions src/main/java/hudson/remoting/NamingThreadFactory.java
@@ -0,0 +1,56 @@
/*
* The MIT License
*
* Copyright 2013 Jesse Glick.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package hudson.remoting;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Thread factory that sets thread name so we know who is responsible for so many threads being created.
* @since FIXME after merge
*/
public class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final ThreadFactory delegate;
private final String name;

/**
* Creates a new naming factory.
* @param delegate a baseline factory, such as {@link Executors#defaultThreadFactory} or {@link DaemonThreadFactory}
* @param name an identifier to be used in thread names; might be e.g. your {@link Class#getSimpleName}
*/
public NamingThreadFactory(ThreadFactory delegate, String name) {
this.delegate = delegate;
this.name = name;
}

@Override
public Thread newThread(Runnable r) {
Thread result = delegate.newThread(r);
result.setName(String.format("%s [#%d]", name, threadNum.incrementAndGet()));
return result;
}
}
31 changes: 24 additions & 7 deletions src/main/java/hudson/remoting/RemoteClassLoader.java
Expand Up @@ -46,6 +46,8 @@

import org.jenkinsci.constant_pool_scanner.ConstantPoolScanner;

import javax.annotation.CheckForNull;

import static hudson.remoting.Util.*;
import static java.util.logging.Level.*;

Expand Down Expand Up @@ -83,7 +85,7 @@ final class RemoteClassLoader extends URLClassLoader {
/**
* Remote peer that the {@link #proxy} is connected to.
*/
private final Channel channel;
private /*mostly final*/ Channel.Ref channel;

private final Map<String,URLish> resourceMap = new HashMap<String,URLish>();
private final Map<String,Vector<URLish>> resourcesMap = new HashMap<String,Vector<URLish>>();
Expand Down Expand Up @@ -113,13 +115,24 @@ public static ClassLoader create(ClassLoader parent, IClassLoader proxy) {

private RemoteClassLoader(ClassLoader parent, IClassLoader proxy) {
super(new URL[0],parent);
this.channel = RemoteInvocationHandler.unwrap(proxy);
final Channel channel = RemoteInvocationHandler.unwrap(proxy);
this.channel = channel == null ? null : channel.ref();
this.underlyingProxy = proxy;
if (!channel.remoteCapability.supportsPrefetch() || channel.getJarCache()==null)
if (channel == null || !channel.remoteCapability.supportsPrefetch() || channel.getJarCache()==null)
proxy = new DumbClassLoaderBridge(proxy);
this.proxy = proxy;
}

/**
* Returns the backing channel or {@code null} if the channel is disconnected or otherwise unavailable.
* @return the backing channel or {@code null}.
* @since FIXME after merge
*/
@CheckForNull
private Channel channel() {
return this.channel == null ? null : this.channel.channel();
}

/**
* If this {@link RemoteClassLoader} represents a classloader from the specified channel,
* return its exported OID. Otherwise return -1.
Expand All @@ -133,7 +146,8 @@ protected Class<?> findClass(String name) throws ClassNotFoundException {
// first attempt to load from locally fetched jars
return super.findClass(name);
} catch (ClassNotFoundException e) {
if(!channel.isRemoteClassLoadingAllowed())
final Channel channel = channel();
if(channel == null || !channel.isRemoteClassLoadingAllowed())
throw e;
// delegate to remote
if (channel.remoteCapability.supportsMultiClassLoaderRPC()) {
Expand Down Expand Up @@ -309,7 +323,8 @@ private Class<?> loadClassFile(String name, byte[] bytes) {
throw new ClassFormatError(name + " is <8 bytes long");
}
short bytecodeLevel = (short) ((bytes[6] << 8) + (bytes[7] & 0xFF) - 44);
if (bytecodeLevel > channel.maximumBytecodeLevel) {
final Channel channel = channel();
if (channel != null && bytecodeLevel > channel.maximumBytecodeLevel) {
throw new ClassFormatError("this channel is restricted to JDK 1." + channel.maximumBytecodeLevel + " compatibility but " + name + " was compiled for 1." + bytecodeLevel);
}

Expand Down Expand Up @@ -347,7 +362,8 @@ private void definePackage(String name) {
public URL findResource(String name) {
// first attempt to load from locally fetched jars
URL url = super.findResource(name);
if(url!=null || !channel.isRemoteClassLoadingAllowed()) return url;
final Channel channel = channel();
if(url!=null || channel == null || !channel.isRemoteClassLoadingAllowed()) return url;

try {
if(resourceMap.containsKey(name)) {
Expand Down Expand Up @@ -393,7 +409,8 @@ private static Vector<URL> toURLs(Vector<URLish> src) throws MalformedURLExcepti
}

public Enumeration<URL> findResources(String name) throws IOException {
if(!channel.isRemoteClassLoadingAllowed())
final Channel channel = channel();
if(channel == null || !channel.isRemoteClassLoadingAllowed())
return EMPTY_ENUMERATION;

// TODO: use the locally fetched jars to speed up the look up
Expand Down

0 comments on commit bf81838

Please sign in to comment.