Skip to content

Commit

Permalink
Merge pull request #3 from ydubreuil/fix-socket-leak
Browse files Browse the repository at this point in the history
[JENKINS-27555]: Re-implements JNA agent with non-blocking code
  • Loading branch information
stephenc committed Jun 2, 2015
2 parents f0dc141 + 166ca5a commit 23afe23
Showing 1 changed file with 98 additions and 42 deletions.
Expand Up @@ -18,6 +18,7 @@
*/
package com.cloudbees.jenkins.plugins.sshagent.jna;

import jnr.enxio.channels.NativeSelectorProvider;
import jnr.posix.POSIXFactory;
import jnr.unixsocket.UnixServerSocket;
import jnr.unixsocket.UnixServerSocketChannel;
Expand All @@ -34,6 +35,11 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;


/**
Expand All @@ -47,7 +53,8 @@ public class AgentServer {
private UnixSocketAddress address;
private UnixServerSocketChannel channel;
private UnixServerSocket socket;
private volatile boolean stopped = false;
private Selector selector;
private volatile boolean selectable = true;

public AgentServer() {
this(new AgentImpl());
Expand All @@ -65,31 +72,52 @@ public String start() throws Exception {
authSocket = createLocalSocketAddress();
address = new UnixSocketAddress(new File(authSocket));
channel = UnixServerSocketChannel.open();
channel.configureBlocking(true);
channel.configureBlocking(false);
socket = channel.socket();
socket.bind(address);
stopped = false;
selector = NativeSelectorProvider.getInstance().openSelector();

channel.register(selector, SelectionKey.OP_ACCEPT, new SshAgentServerSocketHandler());

POSIXFactory.getPOSIX().chmod(authSocket, 0600);
thread = new Thread("SSH Agent thread") {
public void run() {
try {
while (!stopped) {
final UnixSocketChannel clientSock = channel.accept();
clientSock.configureBlocking(true);
new SshAgentSession(clientSock, agent);
}
} catch (Exception e) {
if (!stopped) {
e.printStackTrace();
}
}
}
};

thread = new Thread(new AgentSocketAcceptor(), "SSH Agent socket acceptor " + authSocket);
thread.setDaemon(true);
thread.start();
return authSocket;
}

final class AgentSocketAcceptor implements Runnable {
public void run() {
try {
while (selectable) {
// The select() will be woke up if some new connection
// have occurred, or if the selector has been explicitly
// woke up
if (selector.select() > 0) {
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();

while(selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();

if (key.isValid()) {
EventHandler processor = ((EventHandler) key.attachment());
processor.process(key);
}
}
} else {
break;
}
}

LOGGER.log(Level.FINE, "Death of thread " + Thread.currentThread().getName());
} catch (IOException ioe) {
LOGGER.log(Level.WARNING, "Error while waiting for events", ioe);
}
}
}

static String createLocalSocketAddress() throws IOException {
String name;
if (OsUtils.isUNIX()) {
Expand All @@ -105,64 +133,92 @@ static String createLocalSocketAddress() throws IOException {
}

public void close() {
stopped = true;
selectable = false;
selector.wakeup();

// forcibly close remaining sockets
for (SelectionKey key : selector.keys()) {
if (key != null) {
safelyClose(key.channel());
}
}

safelyClose(selector);
agent.close();
safelyClose(channel);
if (authSocket != null) {
FileUtils.deleteQuietly(new File(authSocket));
}
}

protected class SshAgentSession extends AbstractAgentClient implements Runnable {
interface EventHandler {
void process(SelectionKey key) throws IOException;
}

final class SshAgentServerSocketHandler implements EventHandler {
public final void process(SelectionKey key) throws IOException {
try {
UnixSocketChannel clientChannel = channel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ, new SshAgentSessionSocketHandler(clientChannel));
} catch (IOException ex) {
LOGGER.log(Level.WARNING, "failed to accept new connection", ex);
safelyClose(channel);
throw ex;
}
}
}

final class SshAgentSessionSocketHandler extends AbstractAgentClient implements EventHandler {

private final UnixSocketChannel channel;
private final UnixSocketChannel sessionChannel;

public SshAgentSession(UnixSocketChannel channel, SshAgent agent) {
public SshAgentSessionSocketHandler(UnixSocketChannel sessionChannel) {
super(agent);
this.channel = channel;
new Thread(this).start();
this.sessionChannel = sessionChannel;
}

public void run() {
public void process(SelectionKey key) {
try {
ByteBuffer buf = ByteBuffer.allocate(1024);
while (!stopped) {
buf.rewind();
int result = channel.read(buf);
if (result > 0) {
buf.flip();
messageReceived(new Buffer(buf.array(), buf.position(), buf.remaining()));
} else {
break;
}
int result = sessionChannel.read(buf);

if (result > 0) {
buf.flip();
messageReceived(new Buffer(buf.array(), buf.position(), buf.remaining()));
return;
}
} catch (Exception e) {
if (!stopped) {
e.printStackTrace();

if (result == -1) {
// EOF => remote closed the connection, cancel the selection key and close the channel.
key.cancel();
sessionChannel.close();
}
} finally {
safelyClose(channel);
} catch (IOException e) {
LOGGER.log(Level.INFO, "Could not write response to socket", e);
key.cancel();
safelyClose(sessionChannel);
}
}

protected void reply(Buffer buf) throws IOException {
ByteBuffer b = ByteBuffer.wrap(buf.array(), buf.rpos(), buf.available());
int result = channel.write(b);
int result = sessionChannel.write(b);
if (result < 0) {
throw new IOException("Could not write response to socket");
}
}

}

private static void safelyClose(Closeable channel) {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
// ignore
LOGGER.log(Level.INFO, "Error while closing resource", e);
}
}
}

private static final Logger LOGGER = Logger.getLogger(AgentServer.class.getName());
}

0 comments on commit 23afe23

Please sign in to comment.