Skip to content

Commit

Permalink
[FIXED JENKINS-29571] Make CpsFlowExecution.owner transient (JENKINS-…
Browse files Browse the repository at this point in the history
…27704) to avoid problems after renames/moves.

Originally-Committed-As: b7014e66b8b064794efea6c78f35160c5250cc2e
  • Loading branch information
jglick committed Jul 22, 2015
1 parent 7ef7a8a commit e4d1b62
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 26 deletions.
Expand Up @@ -37,7 +37,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.XStreamException;
import com.thoughtworks.xstream.converters.Converter;
import com.thoughtworks.xstream.converters.MarshallingContext;
import com.thoughtworks.xstream.converters.UnmarshallingContext;
Expand Down Expand Up @@ -201,7 +200,7 @@ public class CpsFlowExecution extends FlowExecution {
/*package*/ /*final*/ Map<String,String> loadedScripts = new HashMap<String, String>();

private final boolean sandbox;
private /*almost final*/ FlowExecutionOwner owner;
private transient /*almost final*/ FlowExecutionOwner owner;

/**
* Loading of the program is asynchronous because it requires us to re-obtain stateful objects.
Expand All @@ -225,10 +224,12 @@ public class CpsFlowExecution extends FlowExecution {
* Start nodes that have been created, whose {@link BlockEndNode} is not yet created.
*/
@GuardedBy("this")
/*package*/ final Stack<BlockStartNode> startNodes = new Stack<BlockStartNode>();
/*package*/ /* almost final*/ Stack<BlockStartNode> startNodes = new Stack<BlockStartNode>();
private transient List<String> startNodesSerial; // used only between unmarshal and onLoad

@GuardedBy("this")
private final NavigableMap<Integer,FlowHead> heads = new TreeMap();
private /* almost final*/ NavigableMap<Integer,FlowHead> heads = new TreeMap<Integer,FlowHead>();
private transient Map<Integer,String> headsSerial; // used only between unmarshal and onLoad

private final AtomicInteger iota = new AtomicInteger();

Expand Down Expand Up @@ -383,7 +384,23 @@ public int iota() {
}

@Override
public void onLoad() {
public void onLoad(FlowExecutionOwner owner) throws IOException {
this.owner = owner;
storage = createStorage();
// heads could not be restored in unmarshal, so doing that now:
heads = new TreeMap<Integer,FlowHead>();
for (Map.Entry<Integer,String> entry : headsSerial.entrySet()) {
FlowHead h = new FlowHead(this, entry.getKey());
h.setForDeserialize(storage.getNode(entry.getValue()));
heads.put(h.getId(), h);
}
headsSerial = null;
// Same for startNodes:
startNodes = new Stack<BlockStartNode>();
for (String id : startNodesSerial) {
startNodes.add((BlockStartNode) storage.getNode(id));
}
startNodesSerial = null;
try {
if (!isComplete())
loadProgramAsync(getProgramDataFile());
Expand Down Expand Up @@ -766,12 +783,10 @@ void notifyListeners(FlowNode node) {
// so that the execution gets suspended while we are getting serialized

public static final class ConverterImpl implements Converter {
private final XStream xs;
private final ReflectionProvider ref;
private final Mapper mapper;

public ConverterImpl(XStream xs) {
this.xs = xs;
this.ref = xs.getReflectionProvider();
this.mapper = xs.getMapper();
}
Expand All @@ -786,7 +801,6 @@ public void marshal(Object source, HierarchicalStreamWriter w, MarshallingContex
writeChild(w, context, "result", e.result, Result.class);
writeChild(w, context, "script", e.script, String.class);
writeChild(w, context, "loadedScripts", e.loadedScripts, Map.class);
writeChild(w, context, "owner", e.owner, Object.class);
if (e.user != null) {
writeChild(w, context, "user", e.user, String.class);
}
Expand Down Expand Up @@ -814,17 +828,15 @@ private <T> void writeChild(HierarchicalStreamWriter w, MarshallingContext conte
}

public Object unmarshal(HierarchicalStreamReader reader, final UnmarshallingContext context) {
try {
CpsFlowExecution result;
if (context.currentObject()!=null) {
result = (CpsFlowExecution) context.currentObject();
} else {
result = (CpsFlowExecution) ref.newInstance(CpsFlowExecution.class);
}

FlowNodeStorage storage = result.storage;
Stack<BlockStartNode> startNodes = new Stack<BlockStartNode>();
Map<Integer,FlowHead> heads = new TreeMap<Integer, FlowHead>();
result.startNodesSerial = new ArrayList<String>();
result.headsSerial = new TreeMap<Integer,String>();

while (reader.hasMoreChildren()) {
reader.moveDown();
Expand All @@ -843,39 +855,29 @@ public Object unmarshal(HierarchicalStreamReader reader, final UnmarshallingCont
setField(result, "loadedScripts", loadedScripts);
} else
if (nodeName.equals("owner")) {
FlowExecutionOwner owner = (FlowExecutionOwner) readChild(reader, context, Object.class, result);
setField(result, "owner", owner);
setField(result, "storage", storage = result.createStorage());
readChild(reader, context, Object.class, result); // for compatibility; discarded
} else
if (nodeName.equals("user")) {
String user = readChild(reader, context, String.class, result);
setField(result, "user", user);
} else
if (nodeName.equals("head")) {
String[] head = readChild(reader, context, String.class, result).split(":");
FlowHead h = new FlowHead(result, Integer.parseInt(head[0]));
h.setForDeserialize(storage.getNode(head[1]));
heads.put(h.getId(), h);
result.headsSerial.put(Integer.parseInt(head[0]), head[1]);
} else
if (nodeName.equals("iota")) {
Integer iota = readChild(reader, context, Integer.class, result);
setField(result, "iota", new AtomicInteger(iota));
} else
if (nodeName.equals("start")) {
String id = readChild(reader, context, String.class, result);
startNodes.add((BlockStartNode) storage.getNode(id));
result.startNodesSerial.add(id);
}

reader.moveUp();
}

setField(result, "startNodes", startNodes);
setField(result, "heads", heads);

return result;
} catch (IOException e) {
throw new XStreamException("Failed to read back CpsFlowExecution",e);
}
}

private void setField(CpsFlowExecution result, String fieldName, Object value) {
Expand Down
Expand Up @@ -24,6 +24,7 @@

package org.jenkinsci.plugins.workflow.cps;

import org.jenkinsci.plugins.workflow.flow.FlowExecutionOwner;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import static org.junit.Assert.*;
import org.junit.Test;
Expand All @@ -46,8 +47,9 @@ public class CpsFlowDefinition2Test extends AbstractCpsFlowTest {

assertFalse("Expected the execution to be suspended but it has completed", exec.isComplete());

FlowExecutionOwner owner = exec.getOwner();
exec = roundtripXStream(exec); // poor man's simulation of Jenkins restart
exec.onLoad();
exec.onLoad(owner);

// now resume workflow execution
SemaphoreStep.success("watch/1", null);
Expand Down

0 comments on commit e4d1b62

Please sign in to comment.