Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge pull request #1 from tfennelly/JENKINS-35137
[FIX JENKINS-35137] EventSource reconnect killing old EvenDispatcher and losing channel subs
  • Loading branch information
tfennelly committed May 26, 2016
2 parents 8718846 + 30cb65e commit cb47b17
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 125 deletions.
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "@jenkins-cd/sse-gateway",
"version": "0.0.2",
"version": "0.0.3",
"description": "Client API for the Jenkins SSE Gateway plugin. Browser UI push events from Jenkins.",
"main": "src/main/js/index.js",
"files" : [
Expand Down
36 changes: 33 additions & 3 deletions src/main/java/org/jenkinsci/plugins/ssegateway/Endpoint.java
Expand Up @@ -52,6 +52,7 @@
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import java.io.IOException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -65,7 +66,9 @@
@Extension
public class Endpoint extends CrumbExclusion implements RootAction {

protected static final String SSE_GATEWAY_URL = "/sse-gateway";
public static final String SSE_GATEWAY_URL = "/sse-gateway";
public static final String SSE_LISTEN_URL_PREFIX = SSE_GATEWAY_URL + "/listen/";

private static final Logger LOGGER = Logger.getLogger(Endpoint.class.getName());

public Endpoint() throws ServletException {
Expand Down Expand Up @@ -107,6 +110,30 @@ public String getUrlName() {
return SSE_GATEWAY_URL;
}

@Restricted(DoNotUse.class) // Web only
public HttpResponse doConnect(StaplerRequest request, StaplerResponse response) throws IOException {
String clientId = request.getParameter("clientId");

if (clientId == null) {
throw new IOException("No 'clientId' parameter specified in connect request.");
}

HttpSession session = request.getSession();
EventDispatcher dispatcher = EventDispatcherFactory.getDispatcher(clientId, session);

// If there was already a dispatcher with this ID, then remove
// all subscriptions from it and reuse the instance.
if (dispatcher != null) {
dispatcher.unsubscribeAll();
} else {
// Else create a new instance with this id.
EventDispatcherFactory.newDispatcher(clientId, session);
}

response.setStatus(HttpServletResponse.SC_OK);
return HttpResponses.okJSON();
}

@RequirePOST
@Restricted(DoNotUse.class) // Web only
public HttpResponse doConfigure(StaplerRequest request, StaplerResponse response) throws IOException {
Expand Down Expand Up @@ -170,9 +197,12 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
String requestedResource = getRequestedResourcePath(httpServletRequest);

if (requestedResource.equals(SSE_GATEWAY_URL + "/listen")) {
if (requestedResource.startsWith(SSE_LISTEN_URL_PREFIX)) {
HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse;
EventDispatcherFactory.start(httpServletRequest, httpServletResponse);
String clientId = requestedResource.substring(SSE_LISTEN_URL_PREFIX.length());

clientId = URLDecoder.decode(clientId, "UTF-8");
EventDispatcherFactory.start(clientId, httpServletRequest, httpServletResponse);
return; // Do not allow this request on to Stapler
}
}
Expand Down
Expand Up @@ -64,7 +64,7 @@ public abstract class EventDispatcher implements Serializable {
public static final String SESSION_SYNC_OBJ = "org.jenkinsci.plugins.ssegateway.sse.session.sync";
private static final Logger LOGGER = Logger.getLogger(EventDispatcher.class.getName());

private final String id = UUID.randomUUID().toString();
private String id = null;
private final PubsubBus bus;
private Map<EventFilter, ChannelSubscriber> subscribers = new CopyOnWriteMap.Hash<>();

Expand All @@ -80,9 +80,16 @@ public Map<EventFilter, ChannelSubscriber> getSubscribers() {
}

public final String getId() {
if (id == null) {
throw new IllegalStateException("Call to getId before the ID ewas set.");
}
return id;
}

public void setId(String id) {
this.id = id;
}

public boolean dispatchEvent(String name, String data) throws IOException, ServletException {
HttpServletResponse response = getResponse();

Expand Down
Expand Up @@ -64,25 +64,30 @@ public class EventDispatcherFactory {
}
}

public static EventDispatcher start(HttpServletRequest request, HttpServletResponse response) {
public static EventDispatcher start(@Nonnull String clientId, @Nonnull HttpServletRequest request, @Nonnull HttpServletResponse response) {
try {
HttpSession session = request.getSession();
EventDispatcher instance = newDispatcher(session);
Jenkins jenkins = Jenkins.getInstance();

instance.start(request, response);
instance.setDefaultHeaders();
EventDispatcher dispatcher = EventDispatcherFactory.getDispatcher(clientId, session);

if (dispatcher == null) {
LOGGER.log(Level.WARNING, String.format("Unknown dispatcher client Id '%s'. Creating a new one. Make sure you are calling 'connect' before 'listen'.", clientId));
dispatcher = EventDispatcherFactory.newDispatcher(clientId, session);
}

dispatcher.start(request, response);
dispatcher.setDefaultHeaders();

JSONObject openData = new JSONObject();

openData.put("dispatcher", instance.getId());
openData.put("dispatcher", dispatcher.getId());

if (Functions.getIsUnitTest()) {
openData.put("sessionid", session.getId());
openData.put("cookieName", session.getServletContext().getSessionCookieConfig().getName());

// Crumb needed for testing because we use it to fire off some
// test builds via the POST API.
Jenkins jenkins = Jenkins.getInstance();
CrumbIssuer crumbIssuer = jenkins.getCrumbIssuer();
if (crumbIssuer != null) {
JSONObject crumb = new JSONObject();
Expand All @@ -95,9 +100,9 @@ public static EventDispatcher start(HttpServletRequest request, HttpServletRespo
}
}

instance.dispatchEvent("open", openData.toString());
dispatcher.dispatchEvent("open", openData.toString());

return instance;
return dispatcher;
} catch (Exception e) {
throw new IllegalStateException("Unexpected Exception.", e);
}
Expand All @@ -121,14 +126,16 @@ public synchronized static Map<String, EventDispatcher> getDispatchers(@Nonnull
/**
* Create a new {@link EventDispatcher} instance and attach it to the user session.
*
* @param clientId The dispatcher client Id.
* @param session The {@link HttpSession}.
* @return The new {@link EventDispatcher} instance.
*/
public synchronized static EventDispatcher newDispatcher(@Nonnull HttpSession session) {
public synchronized static EventDispatcher newDispatcher(@Nonnull String clientId, @Nonnull HttpSession session) {
Map<String, EventDispatcher> dispatchers = getDispatchers(session);
try {
EventDispatcher dispatcher = runtimeClass.newInstance();
dispatchers.put(dispatcher.getId(), dispatcher);
dispatcher.setId(clientId);
dispatchers.put(clientId, dispatcher);
return dispatcher;
} catch (Exception e) {
throw new IllegalStateException("Unexpected Exception.", e);
Expand Down
36 changes: 36 additions & 0 deletions src/main/js/ajax.js
@@ -1,5 +1,41 @@
var json = require('./json');

exports.get = function (url, onSuccess) {
var http = new XMLHttpRequest();

http.onreadystatechange = function () {
if (http.readyState === 4) {
if (http.status >= 200 && http.status < 300) {
try {
var responseJSON = JSON.parse(http.responseText);
// The request may have succeeded, but there might have been
// some processing error on the backend and a hudson.util.HttpResponses
// JSON response.
if (responseJSON.status && responseJSON.status === 'error') {
console.error('SSE Gateway error response to '
+ url + ': '
+ responseJSON.message);
}

if (onSuccess) {
onSuccess(responseJSON);
}
} catch (e) {
// Not a JSON response.
}
} else {
console.error('SSE Gateway error to ' + url + ': ');
console.error(http);
}
}
};

http.open('GET', url, true);

http.setRequestHeader('Accept', 'application/json');
http.send();
};

exports.post = function (data, toUrl, jenkinsSessionInfo) {
var http = new XMLHttpRequest();

Expand Down
8 changes: 7 additions & 1 deletion src/main/js/index.js
Expand Up @@ -16,7 +16,13 @@ function noEventSource() {
if (eventSourceSupported) {
var internal = require('./sse-client');

internal.connect();
/**
* Connect the SSE client to the server..
* @param clientId The SSE client ID. This is a scrint but should be unique i.e. something
* not likely to clash with another SSE instance in the same session.
* @param onConnect Optionsal onConnect function.
*/
exports.connect = internal.connect;

/**
* Subscribe to a channel.
Expand Down
58 changes: 37 additions & 21 deletions src/main/js/sse-client.js
Expand Up @@ -3,6 +3,7 @@ var ajax = require('./ajax');
var json = require('./json');
var jenkinsUrl = jsModules.getRootURL();
var eventSource = undefined;
var eventSourceListenerQueue = [];
var jenkinsSessionInfo = undefined;
var subscriptions = [];
var channelListeners = {};
Expand Down Expand Up @@ -30,30 +31,46 @@ function scheduleDoConfigure() {
nextDoConfigureTimeout = setTimeout(doConfigure, 100);
}

exports.connect = function (onConnect) {
exports.connect = function (clientId, onConnect) {
if (eventSource) {
return;
}

if (typeof clientId !== 'string') {
console.error("SSE clientId not specified in 'connect' request.");
return;
}

if (!eventSourceSupported) {
console.warn("This browser does not support EventSource. Where's the polyfill?");
// TODO: Need to add browser poly-fills for stuff like this
// See https://github.com/remy/polyfills/blob/master/EventSource.js
} else {
var listenUrl = jenkinsUrl + 'sse-gateway/listen';
var EventSource = window.EventSource;
var source = new EventSource(listenUrl);

source.addEventListener('open', function (e) {
if (e.data) {
jenkinsSessionInfo = JSON.parse(e.data);
if (onConnect) {
onConnect(jenkinsSessionInfo);
var connectUrl = jenkinsUrl + 'sse-gateway/connect?clientId='
+ encodeURIComponent(clientId);

ajax.get(connectUrl, function () {
var listenUrl = jenkinsUrl + 'sse-gateway/listen/' + encodeURIComponent(clientId);
var EventSource = window.EventSource;
var source = new EventSource(listenUrl);

source.addEventListener('open', function (e) {
if (e.data) {
jenkinsSessionInfo = JSON.parse(e.data);
if (onConnect) {
onConnect(jenkinsSessionInfo);
}
}
}, false);

// Add any listeners that have been requested to be added.
for (var i = 0; i < eventSourceListenerQueue.length; i++) {
var config = eventSourceListenerQueue[i];
source.addEventListener(config.channelName, config.listener, false);
}
}, false);

eventSource = source;
eventSource = source;
});
}
};

Expand All @@ -65,10 +82,6 @@ exports.disconnect = function () {
};

exports.subscribe = function () {
if (!eventSource) {
return undefined;
}

clearDoConfigure();

var channelName;
Expand Down Expand Up @@ -124,10 +137,6 @@ exports.subscribe = function () {
};

exports.unsubscribe = function (callback) {
if (!eventSource) {
return;
}

clearDoConfigure();

// callback is the only mandatory param
Expand Down Expand Up @@ -177,7 +186,14 @@ function addChannelListener(channelName) {
}
};
channelListeners[channelName] = listener;
eventSource.addEventListener(channelName, listener, false);
if (eventSource) {
eventSource.addEventListener(channelName, listener, false);
} else {
eventSourceListenerQueue.push({
channelName: channelName,
listener: listener
});
}
}

function containsAll(object, filter) {
Expand Down

0 comments on commit cb47b17

Please sign in to comment.