Skip to content

Commit

Permalink
[JENKINS-38381] Prototype API to receive asynchronous notifications o…
Browse files Browse the repository at this point in the history
…f process output or exit code.
  • Loading branch information
jglick committed Sep 29, 2016
1 parent e174ce7 commit eaa6b02
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 42 deletions.
14 changes: 7 additions & 7 deletions pom.xml
Expand Up @@ -3,7 +3,8 @@
<parent>
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>plugin</artifactId>
<version>2.7</version>
<version>2.15</version>
<relativePath/>
</parent>
<artifactId>durable-task</artifactId>
<version>1.13-SNAPSHOT</version>
Expand All @@ -14,26 +15,25 @@
<licenses>
<license>
<name>MIT License</name>
<url>http://www.opensource.org/licenses/mit-license.php</url>
<url>https://opensource.org/licenses/MIT</url>
</license>
</licenses>

<properties>
<jenkins.version>1.609.1</jenkins.version>
<java.level>6</java.level>
<findbugs.failOnError>false</findbugs.failOnError>
<jenkins.version>1.642.3</jenkins.version>
<hpi-plugin.version>1.120</hpi-plugin.version>
</properties>

<repositories>
<repository>
<id>repo.jenkins-ci.org</id>
<url>http://repo.jenkins-ci.org/public/</url>
<url>https://repo.jenkins-ci.org/public/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>repo.jenkins-ci.org</id>
<url>http://repo.jenkins-ci.org/public/</url>
<url>https://repo.jenkins-ci.org/public/</url>
</pluginRepository>
</pluginRepositories>

Expand Down
Expand Up @@ -179,25 +179,17 @@ private synchronized int pid(FilePath ws) throws IOException, InterruptedExcepti
return pid;
}

@Override public Integer exitStatus(FilePath workspace, Launcher launcher) throws IOException, InterruptedException {
Integer status = super.exitStatus(workspace, launcher);
if (status != null) {
return status;
}
@Override protected Integer specialExitStatus(FilePath workspace, Launcher launcher) throws IOException, InterruptedException {
int _pid = pid(workspace);
if (_pid > 0 && !ProcessLiveness.isAlive(workspace.getChannel(), _pid, launcher)) {
// it looks like the process has disappeared. one last check to make sure it's not a result of a race condition,
// then if we still don't have the exit code, use fake exit code to distinguish from 0 (success) and 1+ (observed failure)
// it looks like the process has disappeared; use fake exit code to distinguish from 0 (success) and 1+ (observed failure)
// TODO would be better to have exitStatus accept a TaskListener so we could print an informative message
status = super.exitStatus(workspace, launcher);
if (status == null) {
status = -1;
}
return status;
return -1;
} else if (_pid == 0 && /* compatibility */ startTime > 0 && System.currentTimeMillis() - startTime > 1000 * LAUNCH_FAILURE_TIMEOUT) {
return -2; // apparently never started
} else {
return null;
}
return null;
}

@Override public String getDiagnostics(FilePath workspace, Launcher launcher) throws IOException, InterruptedException {
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/org/jenkinsci/plugins/durabletask/Controller.java
Expand Up @@ -42,6 +42,19 @@
*/
public abstract class Controller implements Serializable {

/**
* Begins watching the process asynchronously, so that the master may receive notification when output is available or the process has exited.
* This should be called as soon as the process is launched, and thereafter whenever reconnecting to the agent.
* You should not call {@link #writeLog} in this case; you do not need to call {@link #exitStatus(FilePath, Launcher)} frequently,
* though it is advisable to still call it occasionally to verify that the process is still running.
* @param workspace the workspace in use
* @param handler a remotable callback
* @throws UnsupportedOperationException when this mode is not available, so you must fall back to polling {@link #writeLog} and {@link #exitStatus(FilePath, Launcher)}
*/
public void watch(@Nonnull FilePath workspace, @Nonnull Handler handler) throws IOException, InterruptedException, UnsupportedOperationException {
throw new UnsupportedOperationException("Asynchronous mode is not implemented in " + getClass().getName());
}

/**
* Obtains any new task log output.
* Could use a serializable field to keep track of how much output has been previously written.
Expand Down
Expand Up @@ -53,8 +53,9 @@ public abstract class DurableTask extends AbstractDescribableImpl<DurableTask> i
public abstract Controller launch(EnvVars env, FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException;

/**
* Requests that standard output of the task be captured rather than streamed to {@link Controller#writeLog}.
* If so, you may call {@link Controller#getOutput}.
* Requests that standard output of the task be captured rather than streamed.
* If you use {@link Controller#watch}, standard output will not be sent to {@link Handler#output}; it will be included in {@link Handler#exited} instead.
* Otherwise (using polling mode), standard output will not be sent to {@link Controller#writeLog}; call {@link Controller#getOutput} to collect.
* Standard error should still be streamed to the log.
* Should be called prior to {@link #launch} to take effect.
* @throws UnsupportedOperationException if this implementation does not support that mode
Expand Down
Expand Up @@ -30,19 +30,31 @@
import hudson.Util;
import hudson.model.TaskListener;
import hudson.remoting.Channel;
import hudson.remoting.DaemonThreadFactory;
import hudson.remoting.NamingThreadFactory;
import hudson.remoting.RemoteOutputStream;
import hudson.remoting.VirtualChannel;
import hudson.slaves.WorkspaceList;
import hudson.util.LogTaskListener;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckForNull;
import jenkins.MasterToSlaveFileCallable;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.CountingInputStream;

/**
* A task which forks some external command and then waits for log and status files to be updated/created.
Expand Down Expand Up @@ -79,6 +91,10 @@ protected FileMonitoringController doLaunch(FilePath workspace, Launcher launche
throw new AbstractMethodError("override either doLaunch or launchWithCookie");
}

/**
* Tails a log file and watches for an exit status file.
* Must be remotable so that {@link #watch} can transfer the implementation.
*/
protected static class FileMonitoringController extends Controller {

/** Absolute path of {@link #controlDir(FilePath)}. */
Expand All @@ -91,6 +107,7 @@ protected static class FileMonitoringController extends Controller {

/**
* Byte offset in the file that has been reported thus far.
* Only used if {@link #writeLog(FilePath, OutputStream)} is used; not used for {@link #watch}.
*/
private long lastLocation;

Expand Down Expand Up @@ -130,7 +147,6 @@ private static class WriteLog extends MasterToSlaveFileCallable<Long> {
if (toRead > Integer.MAX_VALUE) { // >2Gb of output at once is unlikely
throw new IOException("large reads not yet implemented");
}
// TODO is this efficient for large amounts of output? Would it be better to stream data, or return a byte[] from the callable?
byte[] buf = new byte[(int) toRead];
raf.readFully(buf);
sink.write(buf);
Expand All @@ -144,22 +160,42 @@ private static class WriteLog extends MasterToSlaveFileCallable<Long> {
}
}

// TODO would be more efficient to allow API to consolidate writeLog with exitStatus (save an RPC call)
@Override public Integer exitStatus(FilePath workspace, Launcher launcher) throws IOException, InterruptedException {
FilePath status = getResultFile(workspace);
if (status.exists()) {
try {
return Integer.parseInt(status.readToString().trim());
} catch (NumberFormatException x) {
throw new IOException("corrupted content in " + status + ": " + x, x);
}
return readStatus(status);
} else {
return null;
Integer code = specialExitStatus(workspace, launcher);
if (code != null) {
// recheck normal file to defend against race conditions
if (status.exists()) {
return readStatus(status);
}
// Make sure that an exitStatus with a decorated launcher will ultimately result in Handler.exited being called
// and the task idled, even if the result file was never created normally:
status.write(Integer.toString(code), null);
}
return code;
}
}

private int readStatus(FilePath status) throws IOException, InterruptedException {
try {
return Integer.parseInt(status.readToString().trim());
} catch (NumberFormatException x) {
throw new IOException("corrupted content in " + status + ": " + x, x);
}
}

/**
* A way to provide specialized exit statuses other than watching {@link #getResultFile}.
* @return a possible exit status, or null for the default behavior
*/
protected @CheckForNull Integer specialExitStatus(FilePath workspace, Launcher launcher) throws IOException, InterruptedException {
return null;
}

@Override public byte[] getOutput(FilePath workspace, Launcher launcher) throws IOException, InterruptedException {
// TODO could perhaps be more efficient for large files to send a MasterToSlaveFileCallable<byte[]>
return IOUtils.toByteArray(getOutputFile(workspace).read());
}

Expand Down Expand Up @@ -229,7 +265,96 @@ public FilePath getOutputFile(FilePath workspace) throws IOException, Interrupte
}
}

@Override public void watch(FilePath workspace, Handler handler) throws IOException, InterruptedException, ClassCastException {
workspace.actAsync(new StartWatching(this, handler));
}

/**
* File in which a last-read position is stored if {@link #watch} is used.
*/
public FilePath getLastLocationFile(FilePath workspace) throws IOException, InterruptedException {
return controlDir(workspace).child("last-location.txt");
}

private static final long serialVersionUID = 1L;
}

private static ScheduledExecutorService watchService;
private synchronized static ScheduledExecutorService watchService() {
if (watchService == null) {
watchService = new /*ErrorLogging*/ScheduledThreadPoolExecutor(5, new NamingThreadFactory(new DaemonThreadFactory(), "FileMonitoringTask watcher"));
}
return watchService;
}

private static class StartWatching extends MasterToSlaveFileCallable<Void> {

private static final long serialVersionUID = 1L;

private final FileMonitoringController controller;
private final Handler handler;

StartWatching(FileMonitoringController controller, Handler handler) {
this.controller = controller;
this.handler = handler;
}

@Override public Void invoke(File workspace, VirtualChannel channel) throws IOException, InterruptedException {
watchService().submit(new Watcher(controller, new FilePath(workspace), handler));
return null;
}

}

private static class Watcher implements Runnable {

// note that LOGGER here is going to the agent log, not master log
private static final Launcher localLauncher = new Launcher.LocalLauncher(new LogTaskListener(LOGGER, Level.INFO));

private final FileMonitoringController controller;
private final FilePath workspace;
private final Handler handler;

Watcher(FileMonitoringController controller, FilePath workspace, Handler handler) {
this.controller = controller;
this.workspace = workspace;
this.handler = handler;
}

@Override public void run() {
try {
Integer exitStatus = controller.exitStatus(workspace, localLauncher); // check before collecting output, in case the process is just now finishing
long lastLocation = 0;
FilePath lastLocationFile = controller.getLastLocationFile(workspace);
if (lastLocationFile.exists()) {
lastLocation = Long.parseLong(lastLocationFile.readToString());
}
FilePath logFile = controller.getLogFile(workspace);
long len = logFile.length();
if (len > lastLocation) {
assert !logFile.isRemote();
try (FileChannel ch = FileChannel.open(Paths.get(logFile.getRemote()), StandardOpenOption.READ)) {
CountingInputStream cis = new CountingInputStream(Channels.newInputStream(ch.position(lastLocation)));
handler.output(cis);
lastLocationFile.write(Long.toString(lastLocation + cis.getByteCount()), null);
}
}
if (exitStatus != null) {
byte[] output;
if (controller.getOutputFile(workspace).exists()) {
output = controller.getOutput(workspace, localLauncher);
} else {
output = null;
}
handler.exited(exitStatus, output);
} else {
watchService().schedule(this, 100, TimeUnit.MILLISECONDS); // TODO consider an adaptive timeout as in DurableTaskStep.Execution in polling mode
}
} catch (Exception x) {
LOGGER.log(Level.WARNING, "giving up on watching " + controller.controlDir, x);
}
}

}

}
67 changes: 67 additions & 0 deletions src/main/java/org/jenkinsci/plugins/durabletask/Handler.java
@@ -0,0 +1,67 @@
/*
* The MIT License
*
* Copyright 2016 CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package org.jenkinsci.plugins.durabletask;

import hudson.FilePath;
import hudson.Launcher;
import hudson.remoting.VirtualChannel;
import java.io.InputStream;
import java.io.Serializable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* A remote handler which may be sent to an agent and handle process output and results.
* If it needs to communicate with the master, you may use {@link VirtualChannel#export}.
* @see Controller#watch
*/
public abstract class Handler implements Serializable {

/**
* Notification that new process output is available.
* <p>Should only be called when at least one byte is available.
* Whatever bytes are actually read will not be offered on the next call, if there is one; there is no need to close the stream.
* <p>There is no guarantee that output is offered in the form of complete lines of text,
* though in the typical case of line-oriented output it is likely that it will end in a newline.
* <p>Buffering is the responsibility of the caller, and {@link InputStream#markSupported} may be false.
* @param stream a way to read process output which has not already been handled
* @throws Exception if anything goes wrong, this watch is deactivated
*/
public abstract void output(@Nonnull InputStream stream) throws Exception;

/**
* Notification that the process has exited or vanished.
* {@link #output} should have been called with any final uncollected output.
* <p>Any metadata associated with the process may be deleted after this call completes, rendering subsequent {@link Controller} calls unsatisfiable.
* <p>Note that unlike {@link Controller#exitStatus(FilePath, Launcher)}, no specialized {@link Launcher} is available on the agent,
* so if there are specialized techniques for determining process liveness they will not be considered here;
* you still need to occasionally poll for an exit status from the master.
* @param code the exit code, if known (0 conventionally represents success); may be negative for anomalous conditions such as a missing process
* @param output standard output captured, if {@link DurableTask#captureOutput} was called; else null
* @throws Exception if anything goes wrong, this watch is deactivated
*/
public abstract void exited(int code, @Nullable byte[] output) throws Exception;

}

0 comments on commit eaa6b02

Please sign in to comment.