Skip to content

Commit

Permalink
Merge pull request #34 from jenkinsci/network-JENKINS-50597
Browse files Browse the repository at this point in the history
[JENKINS-50597] Network behavior tuning
  • Loading branch information
carlossg committed May 28, 2018
2 parents cb85963 + 073804d commit d4ff575
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 37 deletions.
11 changes: 11 additions & 0 deletions pom.xml
Expand Up @@ -159,6 +159,17 @@
<version>1.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
Expand Down
Expand Up @@ -24,6 +24,7 @@

package io.jenkins.plugins.artifact_manager_jclouds;

import com.github.rholder.retry.Attempt;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -37,6 +38,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -48,6 +50,11 @@
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jenkinsci.plugins.workflow.flow.StashManager;

import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import hudson.AbortException;
import hudson.EnvVars;
import hudson.FilePath;
Expand All @@ -61,10 +68,12 @@
import hudson.util.DirScanner;
import hudson.util.io.ArchiverFactory;
import io.jenkins.plugins.artifact_manager_jclouds.BlobStoreProvider.HttpMethod;
import java.util.concurrent.TimeUnit;
import jenkins.MasterToSlaveFileCallable;
import jenkins.model.ArtifactManager;
import jenkins.util.VirtualFile;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.HttpResponseException;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;

Expand Down Expand Up @@ -124,7 +133,7 @@ public void archive(FilePath workspace, Launcher launcher, BuildListener listene
artifactUrls.put(entry.getValue(), provider.toExternalURL(blob, HttpMethod.PUT));
}

workspace.act(new UploadToBlobStorage(artifactUrls));
workspace.act(new UploadToBlobStorage(artifactUrls, listener));
listener.getLogger().printf("Uploaded %s artifact(s) to %s%n", artifactUrls.size(), provider.toURI(provider.getContainer(), getBlobPath("artifacts/")));
}

Expand Down Expand Up @@ -168,7 +177,7 @@ public void stash(String name, FilePath workspace, Launcher launcher, EnvVars en
Blob blob = blobStore.blobBuilder(path).build();
blob.getMetadata().setContainer(provider.getContainer());
URL url = provider.toExternalURL(blob, HttpMethod.PUT);
int count = workspace.act(new Stash(url, includes, excludes, useDefaultExcludes, WorkspaceList.tempDir(workspace).getRemote()));
int count = workspace.act(new Stash(url, includes, excludes, useDefaultExcludes, WorkspaceList.tempDir(workspace).getRemote(), listener));
if (count == 0 && !allowEmpty) {
throw new AbortException("No files included in stash");
}
Expand All @@ -181,13 +190,15 @@ private static final class Stash extends MasterToSlaveFileCallable<Integer> {
private final String includes, excludes;
private final boolean useDefaultExcludes;
private final String tempDir;
private final TaskListener listener;

Stash(URL url, String includes, String excludes, boolean useDefaultExcludes, String tempDir) throws IOException {
Stash(URL url, String includes, String excludes, boolean useDefaultExcludes, String tempDir, TaskListener listener) throws IOException {
this.url = url;
this.includes = includes;
this.excludes = excludes;
this.useDefaultExcludes = useDefaultExcludes;
this.tempDir = tempDir;
this.listener = listener;
}

@Override
Expand All @@ -205,7 +216,7 @@ public Integer invoke(File f, VirtualChannel channel) throws IOException, Interr
throw new IOException(e);
}
if (count > 0) {
uploadFile(tmp, url);
uploadFile(tmp, url, listener);
}
return count;
} finally {
Expand Down Expand Up @@ -304,19 +315,19 @@ private static class UploadToBlobStorage extends MasterToSlaveFileCallable<Void>
private static final long serialVersionUID = 1L;

private final Map<String, URL> artifactUrls; // e.g. "target/x.war", "http://..."
private final TaskListener listener;

UploadToBlobStorage(Map<String, URL> artifactUrls) {
UploadToBlobStorage(Map<String, URL> artifactUrls, TaskListener listener) {
this.artifactUrls = artifactUrls;
this.listener = listener;
}

@Override
public Void invoke(File f, VirtualChannel channel) throws IOException, InterruptedException {
for (Map.Entry<String, URL> entry : artifactUrls.entrySet()) {
Path local = f.toPath().resolve(entry.getKey());
URL url = entry.getValue();
LOGGER.log(Level.FINE, "Uploading {0} to {1}",
new String[] { local.toAbsolutePath().toString(), url.toString() });
uploadFile(local, url);
uploadFile(local, url, listener);
}
return null;
}
Expand All @@ -325,23 +336,53 @@ public Void invoke(File f, VirtualChannel channel) throws IOException, Interrupt
/**
* Upload a file to a URL
*/
private static void uploadFile(Path f, URL url) throws IOException {
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setDoOutput(true);
connection.setRequestMethod("PUT");
connection.setFixedLengthStreamingMode(Files.size(f)); // prevent loading file in memory
try (OutputStream out = connection.getOutputStream()) {
Files.copy(f, out);
}
int responseCode = connection.getResponseCode();
@SuppressWarnings("Convert2Lambda") // bogus use of generics (type variable should have been on class); cannot be made into a lambda
private static void uploadFile(Path f, URL url, final TaskListener listener) throws IOException {
String urlSafe = url.toString().replaceFirst("[?].+$", "?…");
if (responseCode < 200 || responseCode >= 300) {
String diag;
try (InputStream err = connection.getErrorStream()) {
diag = err != null ? IOUtils.toString(err, connection.getContentEncoding()) : null;
try {
RetryerBuilder.<Void>newBuilder().
retryIfException(x -> x instanceof IOException && (!(x instanceof HttpResponseException) || ((HttpResponseException) x).getStatusCode() >= 500)).
withRetryListener(new RetryListener() {
@Override
public <Void> void onRetry(Attempt<Void> attempt) {
if (attempt.hasException()) {
listener.getLogger().println("Retrying upload after: " + attempt.getExceptionCause());
}
}
}).
// TODO all scalars configurable via system property
withStopStrategy(StopStrategies.stopAfterAttempt(10)).
// Note that this is not a _randomized_ exponential backoff; and the base of the exponent is hard-coded to 2.
withWaitStrategy(WaitStrategies.exponentialWait(100, 5, TimeUnit.MINUTES)).
// TODO withAttemptTimeLimiter(…).
build().call(() -> {
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setDoOutput(true);
connection.setRequestMethod("PUT");
connection.setFixedLengthStreamingMode(Files.size(f)); // prevent loading file in memory
try (OutputStream out = connection.getOutputStream()) {
Files.copy(f, out);
}
int responseCode = connection.getResponseCode();
if (responseCode < 200 || responseCode >= 300) {
String diag;
try (InputStream err = connection.getErrorStream()) {
diag = err != null ? IOUtils.toString(err, connection.getContentEncoding()) : null;
}
throw new HttpResponseException(responseCode, String.format("Failed to upload %s to %s, response: %d %s, body: %s", f.toAbsolutePath(), urlSafe, responseCode, connection.getResponseMessage(), diag));
}
return null;
});
} catch (ExecutionException | RetryException x) { // *sigh*, checked exceptions
Throwable x2 = x.getCause();
if (x2 instanceof IOException) {
throw (IOException) x2;
} else if (x2 instanceof RuntimeException) {
throw (RuntimeException) x2;
} else { // Error?
throw new RuntimeException(x);
}
throw new IOException(String.format("Failed to upload %s to %s, response: %d %s, body: %s", f.toAbsolutePath(), urlSafe, responseCode, connection.getResponseMessage(), diag));
}
LOGGER.log(Level.FINE, "Uploaded {0} to {1}: {2}", new Object[] { f.toAbsolutePath(), urlSafe, responseCode });
}

}
Expand Up @@ -27,13 +27,13 @@
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.http.ExceptionLogger;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
Expand All @@ -45,7 +45,6 @@
import org.apache.http.impl.bootstrap.ServerBootstrap;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.protocol.HttpContext;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
Expand Down Expand Up @@ -74,15 +73,16 @@ public String getContainer() {
return "container";
}

private static final Set<String> fails = new ConcurrentHashSet<>();
private static final Map<String, Integer> fails = new ConcurrentHashMap<>();

/**
* Requests that the <em>next</em> HTTP access to a particular presigned URL should fail with a 500 error.
* Requests that the <em>next</em> HTTP access to a particular presigned URL should fail with a 4xx/5xx error.
* @param method upload or download
* @param key the blob’s {@link StorageMetadata#getName}
* @param code the status code, or 0 to just make the request fail without sending a proper response
*/
static void failIn(HttpMethod method, String key) {
fails.add(method + ":" + key);
static void failIn(HttpMethod method, String key, int code) {
fails.put(method + ":" + key, code);
}

@Override
Expand All @@ -98,9 +98,13 @@ public synchronized BlobStoreContext getContext() throws IOException {
}
String container = m.group(1);
String key = m.group(2);
if (fails.remove(method + ":" + key)) {
response.setStatusLine(new BasicStatusLine(HttpVersion.HTTP_1_0, 500, "simulated failure"));
response.setEntity(new StringEntity("Detailed explanation."));
Integer failure = fails.remove(method + ":" + key);
if (failure != null) {
if (failure == 0) {
throw new IllegalStateException("Refusing to even send a status code for " + container + ":" + key);
}
response.setStatusLine(new BasicStatusLine(HttpVersion.HTTP_1_0, failure, "simulated " + failure + " failure"));
response.setEntity(new StringEntity("Detailed explanation of " + failure + "."));
return;
}
BlobStore blobStore = context.getBlobStore();
Expand Down Expand Up @@ -132,7 +136,7 @@ public synchronized BlobStoreContext getContext() throws IOException {
}
}
}).
setExceptionLogger(ExceptionLogger.STD_ERR).
setExceptionLogger(x -> LOGGER.log(Level.INFO, "error thrown in HTTP service", x)).
create();
server.start();
baseURL = new URL("http://" + server.getInetAddress().getHostName() + ":" + server.getLocalPort() + "/");
Expand Down
Expand Up @@ -68,13 +68,33 @@ public void configureManager() throws Exception {
}

@Test
public void exceptionArchiving() throws Exception {
public void unrecoverableExceptionArchiving() throws Exception {
WorkflowJob p = r.createProject(WorkflowJob.class, "p");
r.createSlave("remote", null, null);
MockBlobStore.failIn(BlobStoreProvider.HttpMethod.PUT, "p/1/artifacts/f");
MockBlobStore.failIn(BlobStoreProvider.HttpMethod.PUT, "p/1/artifacts/f", 400);
p.setDefinition(new CpsFlowDefinition("node('remote') {writeFile file: 'f', text: '.'; archiveArtifacts 'f'}", true));
WorkflowRun b = r.assertBuildStatus(Result.FAILURE, p.scheduleBuild2(0));
r.assertLogContains("/container/p/1/artifacts/f?…, response: 500 simulated failure, body: Detailed explanation.", b);
r.assertLogContains("/container/p/1/artifacts/f?…, response: 400 simulated 400 failure, body: Detailed explanation of 400.", b);
}

@Test
public void recoverableExceptionArchiving() throws Exception {
WorkflowJob p = r.createProject(WorkflowJob.class, "p");
r.createSlave("remote", null, null);
MockBlobStore.failIn(BlobStoreProvider.HttpMethod.PUT, "p/1/artifacts/f", 500);
p.setDefinition(new CpsFlowDefinition("node('remote') {writeFile file: 'f', text: '.'; archiveArtifacts 'f'}", true));
WorkflowRun b = r.buildAndAssertSuccess(p);
r.assertLogContains("/container/p/1/artifacts/f?…, response: 500 simulated 500 failure, body: Detailed explanation of 500.", b);
}

@Test
public void networkExceptionArchiving() throws Exception {
WorkflowJob p = r.createProject(WorkflowJob.class, "p");
r.createSlave("remote", null, null);
MockBlobStore.failIn(BlobStoreProvider.HttpMethod.PUT, "p/1/artifacts/f", 0);
p.setDefinition(new CpsFlowDefinition("node('remote') {writeFile file: 'f', text: '.'; archiveArtifacts 'f'}", true));
WorkflowRun b = r.buildAndAssertSuccess(p);
// currently prints a ‘java.net.SocketException: Connection reset’ but not sure if we really care
}

}

0 comments on commit d4ff575

Please sign in to comment.