Skip to content

Commit

Permalink
[JENKINS-38252] Async subs configuration (#11)
Browse files Browse the repository at this point in the history
* Removing synchronization around subscription configuration

Make it async

* Removed unneeded synchronization around channel subs counters

* Capture authentication when the dispatcher is created

Doing it async results in it being done outside the context of a user request, resulting in every subscription being created for ANONYMOUS and therefore many events not being delivered because of permissions restrictions.
  • Loading branch information
tfennelly committed Sep 24, 2016
1 parent 965c833 commit 4c2e428
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 165 deletions.
125 changes: 17 additions & 108 deletions src/main/java/org/jenkinsci/plugins/ssegateway/Endpoint.java
Expand Up @@ -29,10 +29,7 @@
import hudson.util.HttpResponses;
import hudson.util.PluginServletFilter;
import jenkins.model.Jenkins;
import net.sf.json.JSONArray;
import net.sf.json.JSONException;
import net.sf.json.JSONObject;
import org.jenkins.pubsub.EventFilter;
import org.jenkinsci.plugins.ssegateway.sse.EventDispatcher;
import org.jenkinsci.plugins.ssegateway.sse.EventDispatcherFactory;
import org.kohsuke.accmod.Restricted;
Expand All @@ -55,9 +52,6 @@
import java.io.File;
import java.io.IOException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -153,65 +147,27 @@ public HttpResponse doConnect(StaplerRequest request, StaplerResponse response)
@RequirePOST
@Restricted(DoNotUse.class) // Web only
public HttpResponse doConfigure(StaplerRequest request, StaplerResponse response) throws IOException {
HttpSession session = request.getSession();
int failedSubscribes = 0;
String batchId = request.getParameter("batchId");

LOGGER.log(Level.FINE, "Processing configuration request. batchId={0}", batchId);

// We want to ensure that, at one time, only one set of configurations are being applied,
// for a given user session.
synchronized (EventDispatcher.SSEHttpSessionListener.getSessionSyncObj(session)) {
SubscriptionConfig subscriptionConfig = SubscriptionConfig.fromRequest(request);

if (subscriptionConfig.dispatcherId == null) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
return HttpResponses.errorJSON("'dispatcherId' not specified.");
} else if (!subscriptionConfig.hasConfigs()) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
return HttpResponses.errorJSON("No 'subscribe' or 'unsubscribe' configurations provided in configuration request.");
} else {
EventDispatcher dispatcher = EventDispatcherFactory.getDispatcher(subscriptionConfig.dispatcherId, request.getSession());
SubscriptionConfigQueue.SubscriptionConfig subscriptionConfig = SubscriptionConfigQueue.SubscriptionConfig.fromRequest(request);

if (dispatcher == null) {
return HttpResponses.errorJSON("Failed Jenkins SSE Gateway configuration request. Unknown SSE event dispatcher " + subscriptionConfig.dispatcherId);
}

if (subscriptionConfig.unsubscribeAll) {
dispatcher.unsubscribeAll();
}
for (EventFilter filter : subscriptionConfig.unsubscribeSet) {
if (dispatcher.unsubscribe(filter)) {
EventHistoryStore.onChannelUnsubscribe(filter.getChannelName());
}
}
for (EventFilter filter : subscriptionConfig.subscribeSet) {
if (dispatcher.subscribe(filter)) {
EventHistoryStore.onChannelSubscribe(filter.getChannelName());
} else {
failedSubscribes++;
}
}

if (batchId != null) {
try {
JSONObject data = new JSONObject();
data.put("batchId", batchId);
data.put("dispatcherId", dispatcher.getId());
data.put("dispatcherInst", System.identityHashCode(dispatcher));
dispatcher.dispatchEvent("configure", data.toString());
} catch (ServletException e) {
LOGGER.log(Level.SEVERE, "Error sending configuration ACK for batchId=" + batchId, e);
}
}
}
LOGGER.log(Level.FINE, "Processing configuration request. batchId={0}", subscriptionConfig.getBatchId());

if (subscriptionConfig.getDispatcherId() == null) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
return HttpResponses.errorJSON("'dispatcherId' not specified.");
} else if (!subscriptionConfig.hasConfigs()) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
return HttpResponses.errorJSON("No 'subscribe' or 'unsubscribe' configurations provided in configuration request.");
}

// The requests are added to a queue and processed async. A
// status notification will be pushed to the client async.
boolean queuedOkay = SubscriptionConfigQueue.add(subscriptionConfig);

response.setStatus(HttpServletResponse.SC_OK);
if (failedSubscribes == 0) {
if (queuedOkay) {
return HttpResponses.okJSON();
} else {
return HttpResponses.errorJSON("One or more event channel subscriptions were not successful. Check the server logs.");
return HttpResponses.errorJSON("Unable to process channel subscription request at this time.");
}
}

Expand All @@ -224,6 +180,7 @@ private static class SSEListenChannelFilter implements Filter {

@Override
public void init(FilterConfig filterConfig) throws ServletException {
SubscriptionConfigQueue.start();
}

@Override
Expand Down Expand Up @@ -251,60 +208,12 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo

@Override
public void destroy() {
SubscriptionConfigQueue.stop();
}
}

private static String getRequestedResourcePath(HttpServletRequest httpServletRequest) {
return httpServletRequest.getRequestURI().substring(httpServletRequest.getContextPath().length());
}

private static class SubscriptionConfig {
private String dispatcherId;
private List<EventFilter> subscribeSet = Collections.emptyList();
private List<EventFilter> unsubscribeSet = Collections.emptyList();
private boolean unsubscribeAll = false;

private static SubscriptionConfig fromRequest(StaplerRequest request) throws IOException {
JSONObject payload = Util.readJSONPayload(request);
SubscriptionConfig config = new SubscriptionConfig();

config.dispatcherId = payload.optString("dispatcherId", null);
if (config.dispatcherId != null) {
config.subscribeSet = extractFilterSet(payload, "subscribe");
config.unsubscribeSet = extractFilterSet(payload, "unsubscribe");
if (config.unsubscribeSet.isEmpty()) {
String unsubscribe = payload.optString("unsubscribe", null);
if ("*".equals(unsubscribe) || "all".equalsIgnoreCase(unsubscribe)) {
config.unsubscribeAll = true;
}
}
}

return config;
}

private static List<EventFilter> extractFilterSet(JSONObject payload, String key) {
JSONArray jsonObjs = payload.optJSONArray(key);

if (jsonObjs != null && !jsonObjs.isEmpty()) {
List<EventFilter> filterSet = new ArrayList<>();
for (int i = 0; i < jsonObjs.size(); i++) {
try {
JSONObject jsonObj = jsonObjs.getJSONObject(i);
EventFilter filter = (EventFilter) jsonObj.toBean(EventFilter.class);
filterSet.add(filter);
} catch (JSONException e) {
LOGGER.log(Level.SEVERE, "Invalid SSE payload. Expecting an array of JSON Objects for property " + key, e);
}
}
return filterSet;
}

return Collections.emptyList();
}

public boolean hasConfigs() {
return !(subscribeSet.isEmpty() && unsubscribeSet.isEmpty()) || unsubscribeAll;
}
}
}
Expand Up @@ -148,29 +148,18 @@ static void store(@Nonnull Message message) {
}
}

public static synchronized void onChannelSubscribe(@Nonnull String channelName) {
public static void onChannelSubscribe(@Nonnull String channelName) {
if (historyRoot == null) {
return;
}

AtomicInteger counter = getChannelSubsCounter(channelName);
counter.incrementAndGet();

EventHistoryLogger logger = channelLoggers.get(channelName);
if (logger == null) {
logger = new EventHistoryLogger(counter);
PubsubBus.getBus().subscribe(channelName, logger, ACL.SYSTEM, null);
channelLoggers.put(channelName, logger);
}
getChannelSubsCounter(channelName).incrementAndGet();
}

public static synchronized void onChannelUnsubscribe(@Nonnull String channelName) {
public static void onChannelUnsubscribe(@Nonnull String channelName) {
if (historyRoot == null) {
return;
}

AtomicInteger counter = getChannelSubsCounter(channelName);
counter.decrementAndGet();
getChannelSubsCounter(channelName).decrementAndGet();
}

static int getChannelEventCount(@Nonnull String channelName) throws IOException {
Expand Down Expand Up @@ -283,11 +272,26 @@ public void run() {
}
}

private static synchronized AtomicInteger getChannelSubsCounter(@Nonnull String channelName) {
private static AtomicInteger getChannelSubsCounter(@Nonnull String channelName) {
AtomicInteger counter = channelSubsCounters.get(channelName);
if (counter == null) {
counter = newChannelSubsCounter(channelName);
}
return counter;
}

private static synchronized AtomicInteger newChannelSubsCounter(@Nonnull String channelName) {
AtomicInteger counter = channelSubsCounters.get(channelName);
if (counter == null) {
counter = new AtomicInteger(0);
channelSubsCounters.put(channelName, counter);

EventHistoryLogger logger = channelLoggers.get(channelName);
if (logger == null) {
logger = new EventHistoryLogger(counter);
PubsubBus.getBus().subscribe(channelName, logger, ACL.SYSTEM, null);
channelLoggers.put(channelName, logger);
}
}
return counter;
}
Expand Down

0 comments on commit 4c2e428

Please sign in to comment.