Skip to content

Commit

Permalink
Merge pull request #47 from svanoort/defer-action-write-mk2
Browse files Browse the repository at this point in the history
Granular flownode persistence and new node storage engine [JENKINS-47172] [JENKINS-47173]
  • Loading branch information
svanoort committed Jan 22, 2018
2 parents c0c4bbe + 3643c96 commit 5f8c043
Show file tree
Hide file tree
Showing 13 changed files with 899 additions and 95 deletions.
18 changes: 16 additions & 2 deletions pom.xml
Expand Up @@ -33,7 +33,7 @@
</parent>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-support</artifactId>
<version>2.17-SNAPSHOT</version>
<version>2.17-durability-v4-SNAPSHOT</version> <!-- Temp version to isolate work, remove before merge -->
<packaging>hpi</packaging>
<name>Pipeline: Supporting APIs</name>
<url>https://wiki.jenkins.io/display/JENKINS/Pipeline+Supporting+APIs+Plugin</url>
Expand Down Expand Up @@ -78,7 +78,8 @@
<dependency>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-api</artifactId>
<version>2.22</version>
<!-- Temp version until https://github.com/jenkinsci/workflow-api-plugin/pull/55 gets released -->
<version>2.25-durability-beta-2</version>
</dependency>
<dependency>
<groupId>org.jenkins-ci.plugins</groupId>
Expand Down Expand Up @@ -195,4 +196,17 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.jenkins-ci.tools</groupId>
<artifactId>maven-hpi-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<!-- Update with release -->
<compatibleSinceVersion>2.17-durability-v4-SNAPSHOT</compatibleSinceVersion>
</configuration>
</plugin>
</plugins>
</build>
</project>
Expand Up @@ -61,7 +61,7 @@ public abstract class DefaultStepContext extends StepContext {
T value = doGet(key);
if (key == EnvVars.class) {
Run<?,?> run = get(Run.class);
EnvironmentAction a = run.getAction(EnvironmentAction.class);
EnvironmentAction a = run == null ? null : run.getAction(EnvironmentAction.class);
EnvVars customEnvironment = a != null ? a.getEnvironment() : run.getEnvironment(get(TaskListener.class));
return key.cast(EnvironmentExpander.getEffectiveEnvironment(customEnvironment, (EnvVars) value, get(EnvironmentExpander.class)));
} else if (key == Launcher.class) {
Expand Down
@@ -0,0 +1,40 @@
package org.jenkinsci.plugins.workflow.support;

import hudson.XmlFile;
import hudson.util.XStream2;

import javax.annotation.Nonnull;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.StandardOpenOption;

/**
* Utilities to assist with IO and in some cases improve performance specifically for pipeline.
*/
public class PipelineIOUtils {
/**
* Convenience method to transparently write data directly or atomicly using {@link hudson.util.AtomicFileWriter}.
* @param toWrite Object to write to file
* @param location File to write object to
* @param xstream xstream to use for output
* @param atomicWrite If true, do an atomic write, otherwise do a direct write to file.
* @throws IOException
*/
public static void writeByXStream(@Nonnull Object toWrite, @Nonnull File location, @Nonnull XStream2 xstream, boolean atomicWrite) throws IOException {
if (atomicWrite) {
XmlFile file = new XmlFile(xstream, location);
file.write(toWrite);
} else {
try(OutputStream os = new BufferedOutputStream(
Files.newOutputStream(location.toPath(), StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))){
xstream.toXMLUTF8(toWrite, os); // No atomic nonsense, just write and write and write!
} catch (InvalidPathException ipe) {
throw new IOException(ipe);
}
}
}
}
Expand Up @@ -46,6 +46,7 @@
import org.apache.commons.jelly.XMLOutput;
import org.jenkinsci.plugins.workflow.actions.FlowNodeAction;
import org.jenkinsci.plugins.workflow.actions.LogAction;
import org.jenkinsci.plugins.workflow.actions.PersistentAction;
import org.jenkinsci.plugins.workflow.flow.FlowExecutionOwner;
import org.jenkinsci.plugins.workflow.flow.GraphListener;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
Expand All @@ -58,7 +59,7 @@
*
* @author Kohsuke Kawaguchi
*/
public class LogActionImpl extends LogAction implements FlowNodeAction {
public class LogActionImpl extends LogAction implements FlowNodeAction, PersistentAction {

private static final Logger LOGGER = Logger.getLogger(LogActionImpl.class.getName());

Expand Down
@@ -0,0 +1,249 @@
/*
* The MIT License
*
* Copyright (c) 2017, CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package org.jenkinsci.plugins.workflow.support.storage;

import hudson.Util;
import hudson.model.Action;
import hudson.util.IOUtils;
import hudson.util.XStream2;
import org.jenkinsci.plugins.workflow.actions.FlowNodeAction;
import org.jenkinsci.plugins.workflow.flow.FlowExecution;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.flow.FlowDurabilityHint;
import org.jenkinsci.plugins.workflow.support.PipelineIOUtils;

import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

/**
* {@link FlowNodeStorage} implementation that stores all the {@link FlowNode}s together in one file for efficient bulk I/O
*
* <p>This defers persisting until {@link #flush()} is called (or until we flush individual nodes explicitly or by
* storing them without specifying delayWritingActions=true. It also doesn't use the atomic write operations.
*
* Performance characteristics: much better use of the filesystem and far more efficient read/write if you do it all at once.
* HOWEVER, if you insist on explicitly writing out each node, this reverts to overall O(n^2) performance, where n is node count.
*
* For these reasons, this implementation should <strong>only</strong> be used where {@link FlowDurabilityHint#isPersistWithEveryStep()}
* is <strong>false</strong>.
*/
public class BulkFlowNodeStorage extends FlowNodeStorage {
private final File dir;

private final FlowExecution exec;

/** Lazy-loaded mapping. */
private transient HashMap<String, Tag> nodes = null;

/** If true, we've been modified since last flush. */
private boolean isModified = false;

File getStoreFile() throws IOException {
return new File(dir, "flowNodeStore.xml");
}

public BulkFlowNodeStorage(FlowExecution exec, File dir) {
this.exec = exec;
this.dir = dir;
this.nodes = null;
this.setAvoidAtomicWrite(true);
}

/** Loads the nodes listing, lazily - so loading the {@link FlowExecution} doesn't trigger a more complex load. */
HashMap<String, Tag> getOrLoadNodes() throws IOException {
if (nodes == null) {
if (dir.exists()) {
File storeFile = getStoreFile();
if (storeFile.exists()) {
HashMap<String, Tag> roughNodes = null;
try {
roughNodes = (HashMap<String, Tag>) (XSTREAM.fromXML(getStoreFile()));
} catch (Exception ex) {
nodes = new HashMap<String, Tag>();
throw new IOException("Failed to read nodes", ex);
}
if (roughNodes == null) {
nodes = new HashMap<String, Tag>();
throw new IOException("Unable to load nodes, invalid data");
}
for (Tag t : roughNodes.values()) {
FlowNode fn = t.node;
try {
FlowNode$exec.set(fn, exec);
} catch (IllegalAccessException e) {
throw (IllegalAccessError) new IllegalAccessError("Failed to set owner").initCause(e);
}
t.storeActions();
for (FlowNodeAction a : Util.filter(t.actions(), FlowNodeAction.class)) {
a.onLoad(fn);
}
}
nodes = roughNodes;
} else {
nodes = new HashMap<String, Tag>();
}
} else {
IOUtils.mkdirs(dir);
nodes = new HashMap<String, Tag>();
}
}
return nodes;
}

@Override
@CheckForNull
public FlowNode getNode(@Nonnull String id) throws IOException {
Tag t = getOrLoadNodes().get(id);
return (t != null) ? t.node : null;
}

public void storeNode(@Nonnull FlowNode n, boolean delayWritingActions) throws IOException {
Tag t = getOrLoadNodes().get(n.getId());
if (t != null) {
t.node = n;
List<Action> act = n.getActions();
t.actions = act.toArray(new Action[act.size()]);
} else {
getOrLoadNodes().put(n.getId(), new Tag(n, n.getActions()));
}
isModified = true;
if (!delayWritingActions) {
flush();
}
}

@Override
public void storeNode(FlowNode n) throws IOException {
flushNode(n);
}

/**
* Persists a single FlowNode to disk (if not already persisted).
* @param n Node to persist
* @throws IOException
*/
@Override
public void flushNode(@Nonnull FlowNode n) throws IOException {
storeNode(n, false);
}

/** Force persisting any nodes that had writing deferred */
@Override
public void flush() throws IOException {
if (nodes != null && isModified) {
if (!dir.exists()) {
IOUtils.mkdirs(dir);
}
PipelineIOUtils.writeByXStream(nodes, getStoreFile(), XSTREAM, !this.isAvoidAtomicWrite());

isModified = false;
}
}

public List<Action> loadActions(@Nonnull FlowNode node) throws IOException {
Tag t = getOrLoadNodes().get(node.getId());
return (t != null) ? t.actions() : Collections.<Action>emptyList();
}

/**
* Just stores this one node
*/
public void saveActions(@Nonnull FlowNode node, @Nonnull List<Action> actions) throws IOException {
HashMap<String, Tag> map = getOrLoadNodes();
Tag t = map.get(node.getId());
if (t != null) {
t.node = node;
List<Action> act = node.getActions();
t.actions = act.toArray(new Action[act.size()]);
} else {
map.put(node.getId(), new Tag(node, actions));
}
isModified = true;
}

/** Have we written everything to disk that we need to, or is there something waiting to be written */
public boolean isPersistedFully() {
return !isModified;
}


/**
* To group node and their actions together into one object.
*/
private static class Tag {
/* @Nonnull except perhaps after deserialization */ FlowNode node;
private @CheckForNull Action[] actions;

private Tag(@Nonnull FlowNode node, @Nonnull List<Action> actions) {
this.node = node;
this.actions = actions.isEmpty() ? null : actions.toArray(new Action[actions.size()]);
}

private void storeActions() { // We've already loaded the actions, may as well store them to the FlowNode
try {
FlowNode_setActions.invoke(this.node, actions());
} catch (InvocationTargetException|IllegalAccessException ex) {
throw new RuntimeException(ex);
}
}

public @Nonnull List<Action> actions() {
return actions != null ? Arrays.asList(actions) : Collections.<Action>emptyList();
}
}

public static final XStream2 XSTREAM = new XStream2();

private static final Field FlowNode$exec;
private static final Method FlowNode_setActions;

static {
// Aliases reduce the amount of data persisted to disk
XSTREAM.alias("Tag", Tag.class);
// Maybe alias for UninstantiatedDescribable too, if we add a structs dependency
XSTREAM.aliasPackage("cps.n", "org.jenkinsci.plugins.workflow.cps.nodes");
XSTREAM.aliasPackage("wf.a", "org.jenkinsci.plugins.workflow.actions");
XSTREAM.aliasPackage("s.a", "org.jenkinsci.plugins.workflow.support.actions");
XSTREAM.aliasPackage("cps.a", "org.jenkinsci.plugins.workflow.cps.actions");
try {
// Ugly, but we do not want public getters and setters for internal state on FlowNodes.
FlowNode$exec = FlowNode.class.getDeclaredField("exec");
FlowNode$exec.setAccessible(true);
FlowNode_setActions = FlowNode.class.getDeclaredMethod("setActions", List.class);
FlowNode_setActions.setAccessible(true);
} catch (NoSuchFieldException|NoSuchMethodException e) {
throw new Error(e);
}
}
}

0 comments on commit 5f8c043

Please sign in to comment.