Skip to content

Commit

Permalink
[FIXED JENKINS-7809] expose child process stdout as InputStream.
Browse files Browse the repository at this point in the history
 I added a mode where one can launch a child process without the built-in pumping support, making it trivial to read until EOF in the distributed environments.

 This concludes the fix.
  • Loading branch information
kohsuke committed Feb 16, 2011
1 parent e8971a5 commit 96dd84b
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 45 deletions.
3 changes: 3 additions & 0 deletions changelog.html
Expand Up @@ -81,6 +81,9 @@
(<a href="http://issues.jenkins-ci.org/browse/JENKINS-7275">issue 7275</a>)
<li class=rfe>
<tt>BuildWrapper</tt>s can now act on the build in progress before the checkout occurs.
<li class=rfe>
Improved the process forking abstractions so that plugins can more easily read from child processes.
(<a href="http://issues.jenkins-ci.org/browse/JENKINS-7809">issue 7809</a>)
</ul>
</div><!--=TRUNK-END=-->

Expand Down
195 changes: 173 additions & 22 deletions core/src/main/java/hudson/Launcher.java
@@ -1,7 +1,7 @@
/*
* The MIT License
*
* Copyright (c) 2004-2009, Sun Microsystems, Inc., Kohsuke Kawaguchi, Stephen Connolly
* Copyright (c) 2004-2009, Sun Microsystems, Inc., Kohsuke Kawaguchi, Stephen Connolly, CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,7 +24,6 @@
package hudson;

import hudson.Proc.LocalProc;
import hudson.Proc.RemoteProc;
import hudson.model.Computer;
import hudson.model.Hudson;
import hudson.model.TaskListener;
Expand All @@ -44,7 +43,9 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
import java.util.List;
Expand Down Expand Up @@ -144,8 +145,19 @@ public final class ProcStarter {
protected boolean[] masks;
protected FilePath pwd;
protected OutputStream stdout = NULL_OUTPUT_STREAM, stderr;
protected InputStream stdin = new NullInputStream(0);
protected InputStream stdin = NULL_INPUT_STREAM;
protected String[] envs;
/**
* True to reverse the I/O direction.
*
* For example, if {@link #reverseStdout}==true, then we expose
* {@link InputStream} from {@link Proc} and expect the client to read from it,
* whereas normally we take {@link OutputStream} via {@link #stdout(OutputStream)}
* and feed stdout into that output.
*
* @since 1.399
*/
protected boolean reverseStdin, reverseStdout, reverseStderr;

public ProcStarter cmds(String... args) {
return cmds(Arrays.asList(args));
Expand Down Expand Up @@ -266,6 +278,50 @@ public String[] envs() {
return envs;
}

/**
* Indicates that the caller will pump {@code stdout} from the child process
* via {@link Proc#getStdout()} (whereas by default you call {@link #stdout(OutputStream)}
* and let Jenkins pump stdout into your {@link OutputStream} of choosing.
*
* <p>
* When this method is called, {@link Proc#getStdout()} will read the combined output
* of {@code stdout/stderr} from the child process, unless {@link #readStderr()} is called
* separately, which lets the caller read those two streams separately.
*
* @since 1.399
*/
public ProcStarter readStdout() {
reverseStdout = true;
stdout = stderr = null;
return this;
}

/**
* In addition to the effect of {@link #readStdout()}, indicate that the caller will pump {@code stderr}
* from the child process separately from {@code stdout}. The stderr will be readable from
* {@link Proc#getStderr()} while {@link Proc#getStdout()} reads from stdout.
*
* @since 1.399
*/
public ProcStarter readStderr() {
reverseStdout = true;
reverseStderr = true;
return this;
}

/**
* Indicates that the caller will directly write to the child process {@code stin} }via
* {@link Proc#getStdin()} (whereas by default you call {@link #stdin(InputStream)}
* and let Jenkins pump your {@link InputStream} of choosing to stdin.
* @since 1.399
*/
public ProcStarter writeStdin() {
reverseStdin = true;
stdin = null;
return this;
}


/**
* Starts the new process as configured.
*/
Expand All @@ -284,7 +340,11 @@ public int join() throws IOException, InterruptedException {
* Copies a {@link ProcStarter}.
*/
public ProcStarter copy() {
return new ProcStarter().cmds(commands).pwd(pwd).masks(masks).stdin(stdin).stdout(stdout).stderr(stderr).envs(envs);
ProcStarter rhs = new ProcStarter().cmds(commands).pwd(pwd).masks(masks).stdin(stdin).stdout(stdout).stderr(stderr).envs(envs);
rhs.reverseStdin = this.reverseStdin;
rhs.reverseStderr = this.reverseStderr;
rhs.reverseStdout = this.reverseStdout;
return rhs;
}
}

Expand Down Expand Up @@ -635,7 +695,11 @@ public Proc launch(ProcStarter ps) throws IOException {
for ( int idx = 0 ; idx < jobCmd.length; idx++ )
jobCmd[idx] = jobEnv.expand(ps.commands.get(idx));

return new LocalProc(jobCmd, Util.mapToEnv(jobEnv), ps.stdin, ps.stdout, ps.stderr, toFile(ps.pwd));
return new LocalProc(jobCmd, Util.mapToEnv(jobEnv),
ps.reverseStdin ?LocalProc.SELFPUMP_INPUT:ps.stdin,
ps.reverseStdout?LocalProc.SELFPUMP_OUTPUT:ps.stdout,
ps.reverseStderr?LocalProc.SELFPUMP_OUTPUT:ps.stderr,
toFile(ps.pwd));
}

private File toFile(FilePath f) {
Expand Down Expand Up @@ -716,10 +780,14 @@ public RemoteLauncher(TaskListener listener, VirtualChannel channel, boolean isU
public Proc launch(ProcStarter ps) throws IOException {
final OutputStream out = ps.stdout == null ? null : new RemoteOutputStream(new CloseProofOutputStream(ps.stdout));
final OutputStream err = ps.stderr==null ? null : new RemoteOutputStream(new CloseProofOutputStream(ps.stderr));
final InputStream in = ps.stdin==null ? null : new RemoteInputStream(ps.stdin);
final InputStream in = (ps.stdin==null || ps.stdin==NULL_INPUT_STREAM) ? null : new RemoteInputStream(ps.stdin,false);
final String workDir = ps.pwd==null ? null : ps.pwd.getRemote();

return new RemoteProc(getChannel().callAsync(new RemoteLaunchCallable(ps.commands, ps.masks, ps.envs, in, out, err, workDir, listener)));
try {
return new ProcImpl(getChannel().call(new RemoteLaunchCallable(ps.commands, ps.masks, ps.envs, in, ps.reverseStdin, out, ps.reverseStdout, err, ps.reverseStderr, workDir, listener)));
} catch (InterruptedException e) {
throw (IOException)new InterruptedIOException().initCause(e);
}
}

public Channel launchChannel(String[] cmd, OutputStream err, FilePath _workDir, Map<String,String> envOverrides) throws IOException, InterruptedException {
Expand Down Expand Up @@ -762,9 +830,64 @@ public Void call() throws RuntimeException {

private static final long serialVersionUID = 1L;
}

public static final class ProcImpl extends Proc {
private final RemoteProcess process;
private final IOTriplet io;

public ProcImpl(RemoteProcess process) {
this.process = process;
this.io = process.getIOtriplet();
}

@Override
public void kill() throws IOException, InterruptedException {
process.kill();
}

@Override
public int join() throws IOException, InterruptedException {
return process.join();
}

@Override
public boolean isAlive() throws IOException, InterruptedException {
return process.isAlive();
}

@Override
public InputStream getStdout() {
return io.stdout;
}

@Override
public InputStream getStderr() {
return io.stderr;
}

@Override
public OutputStream getStdin() {
return io.stdin;
}
}
}

private static class IOTriplet implements Serializable {
InputStream stdout,stderr;
OutputStream stdin;
private static final long serialVersionUID = 1L;
}
/**
* Remoting interface of a remote process
*/
static interface RemoteProcess {
int join() throws InterruptedException, IOException;
void kill() throws IOException, InterruptedException;
boolean isAlive() throws IOException, InterruptedException;
IOTriplet getIOtriplet();
}

private static class RemoteLaunchCallable implements Callable<Integer,IOException> {
private static class RemoteLaunchCallable implements Callable<RemoteProcess,IOException> {
private final List<String> cmd;
private final boolean[] masks;
private final String[] env;
Expand All @@ -773,8 +896,9 @@ private static class RemoteLaunchCallable implements Callable<Integer,IOExceptio
private final OutputStream err;
private final String workDir;
private final TaskListener listener;
private final boolean reverseStdin, reverseStdout, reverseStderr;

RemoteLaunchCallable(List<String> cmd, boolean[] masks, String[] env, InputStream in, OutputStream out, OutputStream err, String workDir, TaskListener listener) {
RemoteLaunchCallable(List<String> cmd, boolean[] masks, String[] env, InputStream in, boolean reverseStdin, OutputStream out, boolean reverseStdout, OutputStream err, boolean reverseStderr, String workDir, TaskListener listener) {
this.cmd = new ArrayList<String>(cmd);
this.masks = masks;
this.env = env;
Expand All @@ -783,26 +907,51 @@ private static class RemoteLaunchCallable implements Callable<Integer,IOExceptio
this.err = err;
this.workDir = workDir;
this.listener = listener;
this.reverseStdin = reverseStdin;
this.reverseStdout = reverseStdout;
this.reverseStderr = reverseStderr;
}

public Integer call() throws IOException {
public RemoteProcess call() throws IOException {
Launcher.ProcStarter ps = new LocalLauncher(listener).launch();
ps.cmds(cmd).masks(masks).envs(env).stdin(in).stdout(out).stderr(err);
if(workDir!=null) ps.pwd(workDir);
if (reverseStdin) ps.writeStdin();
if (reverseStdout) ps.readStdout();
if (reverseStderr) ps.readStderr();

Proc p = ps.start();
try {
return p.join();
} catch (InterruptedException e) {
return -1;
} finally {
// make sure I/O is delivered to the remote before we return
try {
Channel.current().syncIO();
} catch (Throwable _) {
// this includes a failure to sync, slave.jar too old, etc
final Proc p = ps.start();

return Channel.current().export(RemoteProcess.class,new RemoteProcess() {
public int join() throws InterruptedException, IOException {
try {
return p.join();
} finally {
// make sure I/O is delivered to the remote before we return
try {
Channel.current().syncIO();
} catch (Throwable _) {
// this includes a failure to sync, slave.jar too old, etc
}
}
}
}

public void kill() throws IOException, InterruptedException {
p.kill();
}

public boolean isAlive() throws IOException, InterruptedException {
return p.isAlive();
}

public IOTriplet getIOtriplet() {
IOTriplet r = new IOTriplet();
if (reverseStdout) r.stdout = new RemoteInputStream(p.getStdout());
if (reverseStderr) r.stderr = new RemoteInputStream(p.getStderr());
if (reverseStdin) r.stdin = new RemoteOutputStream(p.getStdin());
return r;
}
});
}

private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -873,5 +1022,7 @@ private static EnvVars inherit(Map<String,String> overrides) {
*/
public static boolean showFullPath = false;

private static final NullInputStream NULL_INPUT_STREAM = new NullInputStream(0);

private static final Logger LOGGER = Logger.getLogger(Launcher.class.getName());
}

0 comments on commit 96dd84b

Please sign in to comment.