Skip to content

Commit

Permalink
Merge pull request #41 from jenkinsci/network-JENKINS-50597
Browse files Browse the repository at this point in the history
[JENKINS-50597] Network behavior tuning III
  • Loading branch information
carlossg committed Jun 1, 2018
2 parents bb65de8 + d8ca445 commit 2fece88
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 57 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>plugin</artifactId>
<version>3.11</version>
<version>3.12</version>
<relativePath />
</parent>
<groupId>io.jenkins.plugins</groupId>
Expand Down Expand Up @@ -191,7 +191,7 @@
<dependency>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-step-api</artifactId>
<version>2.15</version>
<version>2.16-rc310.04f07c15faaf</version> <!-- TODO https://github.com/jenkinsci/workflow-step-api-plugin/pull/37 -->
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Expand Up @@ -25,6 +25,7 @@
package io.jenkins.plugins.artifact_manager_jclouds;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.AttemptTimeLimiters;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -55,26 +56,30 @@
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import hudson.AbortException;
import hudson.EnvVars;
import hudson.FilePath;
import hudson.Launcher;
import hudson.Util;
import hudson.model.BuildListener;
import hudson.model.Computer;
import hudson.model.Run;
import hudson.model.TaskListener;
import hudson.remoting.VirtualChannel;
import hudson.slaves.WorkspaceList;
import hudson.util.DirScanner;
import hudson.util.io.ArchiverFactory;
import io.jenkins.plugins.artifact_manager_jclouds.BlobStoreProvider.HttpMethod;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import jenkins.MasterToSlaveFileCallable;
import jenkins.model.ArtifactManager;
import jenkins.util.JenkinsJVM;
import jenkins.util.VirtualFile;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.HttpResponseException;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;

Expand Down Expand Up @@ -138,6 +143,33 @@ public void archive(FilePath workspace, Launcher launcher, BuildListener listene
listener.getLogger().printf("Uploaded %s artifact(s) to %s%n", artifactUrls.size(), provider.toURI(provider.getContainer(), getBlobPath("artifacts/")));
}

private static class UploadToBlobStorage extends MasterToSlaveFileCallable<Void> {
private static final long serialVersionUID = 1L;

private final Map<String, URL> artifactUrls; // e.g. "target/x.war", "http://..."
private final TaskListener listener;
// Bind when constructed on the master side; on the agent side, deserialize those values.
private final int stopAfterAttemptNumber = UPLOAD_STOP_AFTER_ATTEMPT_NUMBER;
private final long waitMultiplier = UPLOAD_WAIT_MULTIPLIER;
private final long waitMaximum = UPLOAD_WAIT_MAXIMUM;
private final long timeout = UPLOAD_TIMEOUT;

UploadToBlobStorage(Map<String, URL> artifactUrls, TaskListener listener) {
this.artifactUrls = artifactUrls;
this.listener = listener;
}

@Override
public Void invoke(File f, VirtualChannel channel) throws IOException, InterruptedException {
for (Map.Entry<String, URL> entry : artifactUrls.entrySet()) {
Path local = f.toPath().resolve(entry.getKey());
URL url = entry.getValue();
uploadFile(local, url, listener, stopAfterAttemptNumber, waitMultiplier, waitMaximum, timeout);
}
return null;
}
}

@Override
public boolean delete() throws IOException, InterruptedException {
String blobPath = getBlobPath("");
Expand Down Expand Up @@ -197,6 +229,10 @@ private static final class Stash extends MasterToSlaveFileCallable<Integer> {
private final boolean useDefaultExcludes;
private final String tempDir;
private final TaskListener listener;
private final int stopAfterAttemptNumber = UPLOAD_STOP_AFTER_ATTEMPT_NUMBER;
private final long waitMultiplier = UPLOAD_WAIT_MULTIPLIER;
private final long waitMaximum = UPLOAD_WAIT_MAXIMUM;
private final long timeout = UPLOAD_TIMEOUT;

Stash(URL url, String includes, String excludes, boolean useDefaultExcludes, String tempDir, TaskListener listener) throws IOException {
this.url = url;
Expand All @@ -222,7 +258,7 @@ public Integer invoke(File f, VirtualChannel channel) throws IOException, Interr
throw new IOException(e);
}
if (count > 0) {
uploadFile(tmp, url, listener);
uploadFile(tmp, url, listener, stopAfterAttemptNumber, waitMultiplier, waitMaximum, timeout);
}
return count;
} finally {
Expand Down Expand Up @@ -323,55 +359,62 @@ private BlobStoreContext getContext() throws IOException {
return provider.getContext();
}

private static class UploadToBlobStorage extends MasterToSlaveFileCallable<Void> {
private static final long serialVersionUID = 1L;

private final Map<String, URL> artifactUrls; // e.g. "target/x.war", "http://..."
private final TaskListener listener;

UploadToBlobStorage(Map<String, URL> artifactUrls, TaskListener listener) {
this.artifactUrls = artifactUrls;
this.listener = listener;
}

@Override
public Void invoke(File f, VirtualChannel channel) throws IOException, InterruptedException {
for (Map.Entry<String, URL> entry : artifactUrls.entrySet()) {
Path local = f.toPath().resolve(entry.getKey());
URL url = entry.getValue();
uploadFile(local, url, listener);
}
return null;
private static final class HTTPAbortException extends AbortException {
final int code;
HTTPAbortException(int code, String message) {
super(message);
this.code = code;
}
}

/**
* Number of upload attempts of nonfatal errors before giving up.
*/
static int UPLOAD_STOP_AFTER_ATTEMPT_NUMBER = Integer.getInteger(JCloudsArtifactManager.class.getName() + ".UPLOAD_STOP_AFTER_ATTEMPT_NUMBER", 10);
/**
* Initial number of milliseconds between first and second upload attempts.
* Subsequent ones increase exponentially.
* Note that this is not a <em>randomized</em> exponential backoff;
* and the base of the exponent is hard-coded to 2.
*/
static long UPLOAD_WAIT_MULTIPLIER = Long.getLong(JCloudsArtifactManager.class.getName() + ".UPLOAD_WAIT_MULTIPLIER", 100);
/**
* Maximum number of seconds between upload attempts.
*/
static long UPLOAD_WAIT_MAXIMUM = Long.getLong(JCloudsArtifactManager.class.getName() + ".UPLOAD_WAIT_MAXIMUM", 300);
/**
* Number of seconds to permit a single upload attempt to take.
*/
static long UPLOAD_TIMEOUT = Long.getLong(JCloudsArtifactManager.class.getName() + ".UPLOAD_TIMEOUT", /* 15m */15 * 60);

private static final ExecutorService executors = JenkinsJVM.isJenkinsJVM() ? Computer.threadPoolForRemoting : Executors.newCachedThreadPool();

/**
* Upload a file to a URL
*/
@SuppressWarnings("Convert2Lambda") // bogus use of generics (type variable should have been on class); cannot be made into a lambda
private static void uploadFile(Path f, URL url, final TaskListener listener) throws IOException {
private static void uploadFile(Path f, URL url, final TaskListener listener, int stopAfterAttemptNumber, long waitMultiplier, long waitMaximum, long timeout) throws IOException, InterruptedException {
String urlSafe = url.toString().replaceFirst("[?].+$", "?…");
try {
Predicate<Throwable> nonfatal = x -> x instanceof IOException && (!(x instanceof HttpResponseException) || ((HttpResponseException) x).getStatusCode() >= 500);
AtomicReference<Throwable> lastError = new AtomicReference<>();
RetryerBuilder.<Void>newBuilder().
retryIfException(nonfatal).
retryIfException(x -> x instanceof IOException && (!(x instanceof HTTPAbortException) || ((HTTPAbortException) x).code >= 500) || x instanceof UncheckedTimeoutException).
withRetryListener(new RetryListener() {
@Override
public <Void> void onRetry(Attempt<Void> attempt) {
if (attempt.hasException()) {
Throwable t = attempt.getExceptionCause();
if (nonfatal.apply(t)) {
listener.getLogger().println("Retrying upload after: " + t);
}
lastError.set(attempt.getExceptionCause());
}
}
}).
// TODO all scalars configurable via system property
withStopStrategy(StopStrategies.stopAfterAttempt(10)).
// Note that this is not a _randomized_ exponential backoff; and the base of the exponent is hard-coded to 2.
withWaitStrategy(WaitStrategies.exponentialWait(100, 5, TimeUnit.MINUTES)).
// TODO withAttemptTimeLimiter(…).
withStopStrategy(StopStrategies.stopAfterAttempt(stopAfterAttemptNumber)).
withWaitStrategy(WaitStrategies.exponentialWait(waitMultiplier, waitMaximum, TimeUnit.SECONDS)).
withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(timeout, TimeUnit.SECONDS, executors)).
build().call(() -> {
Throwable t = lastError.get();
if (t != null) {
listener.getLogger().println("Retrying upload after: " + (t instanceof AbortException ? t.getMessage() : t.toString()));
}
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setDoOutput(true);
connection.setRequestMethod("PUT");
Expand All @@ -385,7 +428,7 @@ public <Void> void onRetry(Attempt<Void> attempt) {
try (InputStream err = connection.getErrorStream()) {
diag = err != null ? IOUtils.toString(err, connection.getContentEncoding()) : null;
}
throw new HttpResponseException(responseCode, String.format("Failed to upload %s to %s, response: %d %s, body: %s", f.toAbsolutePath(), urlSafe, responseCode, connection.getResponseMessage(), diag));
throw new HTTPAbortException(responseCode, String.format("Failed to upload %s to %s, response: %d %s, body: %s", f.toAbsolutePath(), urlSafe, responseCode, connection.getResponseMessage(), diag));
}
return null;
});
Expand All @@ -395,6 +438,8 @@ public <Void> void onRetry(Attempt<Void> attempt) {
throw (IOException) x2;
} else if (x2 instanceof RuntimeException) {
throw (RuntimeException) x2;
} else if (x2 instanceof InterruptedException) {
throw (InterruptedException) x2;
} else { // Error?
throw new RuntimeException(x);
}
Expand Down
Expand Up @@ -34,17 +34,16 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.http.ConnectionClosedException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpVersion;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.bootstrap.HttpServer;
import org.apache.http.impl.bootstrap.ServerBootstrap;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpRequestHandler;
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
Expand Down Expand Up @@ -72,17 +71,17 @@ public String getPrefix() {
public String getContainer() {
return "container";
}
private static final Map<String, Integer> fails = new ConcurrentHashMap<>();

private static final Map<String, HttpRequestHandler> specialHandlers = new ConcurrentHashMap<>();

/**
* Requests that the <em>next</em> HTTP access to a particular presigned URL should fail with a 4xx/5xx error.
* Requests that the <em>next</em> HTTP access to a particular presigned URL should behave specially.
* @param method upload or download
* @param key the blob’s {@link StorageMetadata#getName}
* @param code the status code, or 0 to just make the request fail without sending a proper response
* @param handler what to do instead
*/
static void failIn(HttpMethod method, String key, int code) {
fails.put(method + ":" + key, code);
static void speciallyHandle(HttpMethod method, String key, HttpRequestHandler handler) {
specialHandlers.put(method + ":" + key, handler);
}

@Override
Expand All @@ -98,13 +97,9 @@ public synchronized BlobStoreContext getContext() throws IOException {
}
String container = m.group(1);
String key = m.group(2);
Integer failure = fails.remove(method + ":" + key);
if (failure != null) {
if (failure == 0) {
throw new IllegalStateException("Refusing to even send a status code for " + container + ":" + key);
}
response.setStatusLine(new BasicStatusLine(HttpVersion.HTTP_1_0, failure, "simulated " + failure + " failure"));
response.setEntity(new StringEntity("Detailed explanation of " + failure + "."));
HttpRequestHandler specialHandler = specialHandlers.remove(method + ":" + key);
if (specialHandler != null) {
specialHandler.handle(request, response, _context);
return;
}
BlobStore blobStore = context.getBlobStore();
Expand Down Expand Up @@ -136,7 +131,13 @@ public synchronized BlobStoreContext getContext() throws IOException {
}
}
}).
setExceptionLogger(x -> LOGGER.log(Level.INFO, "error thrown in HTTP service", x)).
setExceptionLogger(x -> {
if (x instanceof ConnectionClosedException) {
LOGGER.info(x.toString());
} else {
LOGGER.log(Level.INFO, "error thrown in HTTP service", x);
}
}).
create();
server.start();
baseURL = new URL("http://" + server.getInetAddress().getHostName() + ":" + server.getLocalPort() + "/");
Expand Down

0 comments on commit 2fece88

Please sign in to comment.