Skip to content

Commit

Permalink
[FIXED JENKINS-31718] Preserve the BufferedInputStream when building …
Browse files Browse the repository at this point in the history
…the transport

- For now I do not have the energy to fix the tests to not need the 'null' backdoor.
- Most likely some non-mock based tests would be better and likely would have caught this regression sooner.
  • Loading branch information
stephenc authored and kohsuke committed Nov 24, 2015
1 parent bfbcfb3 commit 17aa60d
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 21 deletions.
74 changes: 68 additions & 6 deletions src/main/java/hudson/remoting/Capability.java
@@ -1,15 +1,14 @@
package hudson.remoting;

import hudson.remoting.Channel.Mode;

import java.io.ObjectStreamClass;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;

/**
* Represents additional features implemented on {@link Channel}.
Expand Down Expand Up @@ -214,6 +213,69 @@ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, Clas

public static final Capability NONE = new Capability(0);

/**
* {@inheritDoc}
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Capability{");
boolean first = true;
if ((mask & MASK_MULTI_CLASSLOADER) != 0) {
first = false;
sb.append("Multi-ClassLoader RPC");
}
if ((mask & MASK_PIPE_THROTTLING) != 0) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append("Pipe throttling");
}
if ((mask & MASK_MIMIC_EXCEPTION) != 0) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append("Mimic Exception");
}
if ((mask & MASK_PREFETCH) != 0) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append("Prefetch");
}
if ((mask & GREEDY_REMOTE_INPUTSTREAM) != 0) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append("Greedy RemoteInputStream");
}
if ((mask & MASK_PROXY_WRITER_2_35) != 0) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append("Proxy writer 2.35");
}
if ((mask & MASK_CHUNKED_ENCODING) != 0) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append("Chunked encoding");
}
sb.append('}');
return sb.toString();
}

static {
try {
PREAMBLE = "<===[JENKINS REMOTING CAPACITY]===>".getBytes("UTF-8");
Expand Down
20 changes: 19 additions & 1 deletion src/main/java/hudson/remoting/ChannelBuilder.java
@@ -1,5 +1,7 @@
package hudson.remoting;

import java.util.logging.Level;
import java.util.logging.Logger;
import org.jenkinsci.remoting.CallableDecorator;
import hudson.remoting.Channel.Mode;
import org.jenkinsci.remoting.Role;
Expand Down Expand Up @@ -32,6 +34,12 @@
* @author Kohsuke Kawaguchi
*/
public class ChannelBuilder {

/**
* Our logger.
*/
private static final Logger LOGGER = Logger.getLogger(ChannelBuilder.class.getName());

private final String name;
private final ExecutorService executors;
private ClassLoader base = this.getClass().getClassLoader();
Expand Down Expand Up @@ -332,13 +340,17 @@ protected CommandTransport negotiate(final InputStream is, final OutputStream os
//
// so use magic preamble and discard all the data up to that to improve robustness.

LOGGER.log(Level.FINER, "Sending capability preamble: {0}", capability);
capability.writePreamble(os);

Mode mode = this.getMode();

if(mode!= Mode.NEGOTIATE) {
LOGGER.log(Level.FINER, "Sending mode preamble: {0}", mode);
os.write(mode.preamble);
os.flush(); // make sure that stream preamble is sent to the other end. avoids dead-lock
} else {
LOGGER.log(Level.FINER, "Awaiting mode preamble...");
}

{// read the input until we hit preamble
Expand All @@ -359,20 +371,25 @@ protected CommandTransport negotiate(final InputStream is, final OutputStream os
switch (i) {
case 0:
case 1:
LOGGER.log(Level.FINER, "Received mode preamble: {0}", modes[i]);
// transmission mode negotiation
if(mode==Mode.NEGOTIATE) {
// now we know what the other side wants, so send the consistent preamble
mode = modes[i];
LOGGER.log(Level.FINER, "Sending agreed mode preamble: {0}", mode);
os.write(mode.preamble);
os.flush(); // make sure that stream preamble is sent to the other end. avoids dead-lock
} else {
if(modes[i]!=mode)
throw new IOException("Protocol negotiation failure");
}
LOGGER.log(Level.FINE, "Channel name {0} negotiated mode {1} with capability {2}",
new Object[]{name, mode, cap});

return makeTransport(is, os, mode, cap);
case 2:
cap = Capability.read(is);
LOGGER.log(Level.FINER, "Received capability preamble: {0}", cap);
break;
}
ptr[i]=0; // reset
Expand All @@ -396,7 +413,8 @@ protected CommandTransport negotiate(final InputStream is, final OutputStream os
* The negotiated input stream that hides
* @param os
* {@linkplain CommandTransport#getUnderlyingStream() the underlying stream}.
* @param
* @param mode
* The mode to create the transport in.
* @param cap
* Capabilities of the other side, as determined during the handshaking.
*/
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/org/jenkinsci/remoting/engine/JnlpProtocol.java
Expand Up @@ -26,12 +26,13 @@
import hudson.remoting.Channel;
import hudson.remoting.ChannelBuilder;
import hudson.remoting.EngineListener;

import javax.annotation.Nullable;
import java.io.BufferedInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import javax.annotation.Nullable;

/**
* Handshake protocol used by JNLP slave when initiating a connection to
Expand Down Expand Up @@ -74,7 +75,7 @@ public Channel establishChannel(Socket socket, ChannelBuilder channelBuilder) th
DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
BufferedInputStream inputStream = new BufferedInputStream(socket.getInputStream());
if(performHandshake(outputStream, inputStream)) {
return buildChannel(socket, channelBuilder);
return buildChannel(socket, channelBuilder, inputStream);
}

return null;
Expand All @@ -96,9 +97,12 @@ abstract boolean performHandshake(DataOutputStream outputStream,
*
* @param socket The established {@link Socket} connection to the master.
* @param channelBuilder The {@link ChannelBuilder} to use.
* @param inputStream The {@link BufferedInputStream} of {@link Socket#getInputStream()} or {@code null} if there
* is none (TODO remove the null possibility).
* @return The constructed channel
*/
abstract Channel buildChannel(Socket socket, ChannelBuilder channelBuilder) throws IOException;
abstract Channel buildChannel(Socket socket, ChannelBuilder channelBuilder,
BufferedInputStream inputStream) throws IOException;

// The expected response from the master on successful completion of the
// handshake.
Expand Down
Expand Up @@ -75,9 +75,9 @@ boolean performHandshake(DataOutputStream outputStream,
}

@Override
Channel buildChannel(Socket socket, ChannelBuilder channelBuilder) throws IOException {
Channel buildChannel(Socket socket, ChannelBuilder channelBuilder, BufferedInputStream inputStream) throws IOException {
return channelBuilder.build(
new BufferedInputStream(socket.getInputStream()),
inputStream == null ? new BufferedInputStream(socket.getInputStream()) : inputStream, // TODO drop null
new BufferedOutputStream(socket.getOutputStream()));
}

Expand Down
Expand Up @@ -89,9 +89,9 @@ boolean performHandshake(DataOutputStream outputStream,
}

@Override
Channel buildChannel(Socket socket, ChannelBuilder channelBuilder) throws IOException {
Channel buildChannel(Socket socket, ChannelBuilder channelBuilder, BufferedInputStream inputStream) throws IOException {
return channelBuilder.build(
new BufferedInputStream(socket.getInputStream()),
inputStream == null ? new BufferedInputStream(socket.getInputStream()) : inputStream, // TODO drop null
new BufferedOutputStream(socket.getOutputStream()));
}

Expand Down
Expand Up @@ -157,9 +157,11 @@ boolean performHandshake(DataOutputStream outputStream,
}

@Override
Channel buildChannel(Socket socket, ChannelBuilder channelBuilder) throws IOException {
Channel buildChannel(Socket socket, ChannelBuilder channelBuilder, BufferedInputStream inputStream) throws IOException {
return channelBuilder.build(
new CipherInputStream(socket.getInputStream(), channelCiphers.getDecryptCipher()),
new CipherInputStream(
inputStream == null ? socket.getInputStream() : inputStream, // TODO drop null
channelCiphers.getDecryptCipher()),
new CipherOutputStream(socket.getOutputStream(), channelCiphers.getEncryptCipher())
);
}
Expand Down
Expand Up @@ -117,6 +117,6 @@ public void testBuildChannel() throws Exception {
whenNew(BufferedInputStream.class).withArguments(mockInputStream).thenReturn(mockBufferedInputStream);
when(mockChannelBuilder.build(mockBufferedInputStream, mockBufferedOutputStream)).thenReturn(mockChannel);

assertSame(mockChannel, protocol.buildChannel(mockSocket, mockChannelBuilder));
assertSame(mockChannel, protocol.buildChannel(mockSocket, mockChannelBuilder, null));
}
}
Expand Up @@ -181,6 +181,6 @@ public void testBuildChannel() throws Exception {
whenNew(BufferedInputStream.class).withArguments(mockInputStream).thenReturn(mockBufferedInputStream);
when(mockChannelBuilder.build(mockBufferedInputStream, mockBufferedOutputStream)).thenReturn(mockChannel);

assertSame(mockChannel, protocol.buildChannel(mockSocket, mockChannelBuilder));
assertSame(mockChannel, protocol.buildChannel(mockSocket, mockChannelBuilder, null));
}
}
Expand Up @@ -287,6 +287,6 @@ public void testBuildChannel() throws Exception {
when(mockChannelBuilder.build(mockCipherInputStream, mockCipherOutputStream))
.thenReturn(mockChannel);

assertSame(mockChannel, protocol.buildChannel(mockSocket, mockChannelBuilder));
assertSame(mockChannel, protocol.buildChannel(mockSocket, mockChannelBuilder, null));
}
}
Expand Up @@ -80,7 +80,7 @@ public void testHandshakeFails() throws Exception {
@Test
public void testHandshakeSucceeds() throws Exception {
when(mockProtocol.performHandshake(mockDataOutputStream, mockBufferedInputStream)).thenReturn(true);
when(mockProtocol.buildChannel(mockSocket, mockChannelBuilder)).thenReturn(mockChannel);
when(mockProtocol.buildChannel(mockSocket, mockChannelBuilder, null)).thenReturn(mockChannel);

assertSame(mockChannel, mockProtocol.establishChannel(mockSocket, mockChannelBuilder));
}
Expand Down

0 comments on commit 17aa60d

Please sign in to comment.