Skip to content

Commit

Permalink
[JENKINS-51413, JENKINS-51414] Implement basic master-agent communica…
Browse files Browse the repository at this point in the history
…tion, master command invocation (#2)

* Implement basic master-agent communication

* Command transport implementation for Kafka

* Implement kafka producer and consumer connection pool

* Reorganize and fix build problem

* Refactor packagings
  • Loading branch information
pvtuan10 committed Jun 6, 2018
1 parent 9d2cd9a commit 64e5ce0
Show file tree
Hide file tree
Showing 24 changed files with 969 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -38,3 +38,6 @@ buildNumber.properties

# Avoid ignoring Maven wrapper jar file (.jar files are usually ignored)
!/.mvn/wrapper/maven-wrapper.jar

# plugin dependencies
work/
67 changes: 65 additions & 2 deletions agent/pom.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
Expand All @@ -14,4 +14,67 @@
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Remoting Kafka Plugin Agent</name>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>io.jenkins.plugins.remotingkafka.Agent</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
<version>2.33</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.jenkins-ci</groupId>
<artifactId>kafka-client-lib</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.jenkins-ci.main</groupId>
<artifactId>remoting</artifactId>
<version>${remoting.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
</dependencies>
</project>
3 changes: 3 additions & 0 deletions agent/run.sh
@@ -0,0 +1,3 @@
#!/usr/bin/env bash
# The script is used for manual testing of the agent.
java -jar target/remoting-kafka-agent-1.0-SNAPSHOT-jar-with-dependencies.jar -name test -master http://localhost:9090 -kafkaURL 127.0.0.1:9092
83 changes: 83 additions & 0 deletions agent/src/main/java/io/jenkins/plugins/remotingkafka/Agent.java
@@ -0,0 +1,83 @@
package io.jenkins.plugins.remotingkafka;

import hudson.remoting.Command;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;

import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.util.Arrays;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Agent {
private static final Logger LOGGER = Logger.getLogger(Agent.class.getName());

private final Options options;

public Agent(Options options) {
this.options = options;
}

public static void main(String... args) throws InterruptedException, IOException {
Options options = new Options();
Agent agent = new Agent(options);

CmdLineParser p = new CmdLineParser(options);
try {
p.parseArgument(args);
} catch (CmdLineException e) {
LOGGER.log(Level.SEVERE, "CmdLineException occurred during parseArgument", e);
p.printUsage(System.out);
System.exit(-1);
}

if (options.help) {
p.printUsage(System.out);
System.exit(0);
}

if (options.name == null) {
try {
agent.options.name = InetAddress.getLocalHost().getCanonicalHostName();
} catch (IOException e) {
LOGGER.severe("Failed to lookup the canonical hostname of this agent, please check system settings.");
LOGGER.severe("If not possible to resolve please specify a node name using the '-name' option");
System.exit(-1);
}
}

URL url = new URL(options.master);
String consumerTopic = url.getHost() + "-" + url.getPort() + "-" + options.name
+ KafkaConstants.CONNECT_SUFFIX;

// Consumer properties.
Properties consumerProps = new Properties();
consumerProps.put(KafkaConstants.BOOTSTRAP_SERVERS, options.kafkaURL);
consumerProps.put(KafkaConstants.GROUP_ID, "testID");
consumerProps.put(KafkaConstants.ENABLE_AUTO_COMMIT, "false");
consumerProps.put(KafkaConstants.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(KafkaConstants.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumerPool.getInstance().init(4, consumerProps);
KafkaConsumer<String, byte[]> consumer = KafkaConsumerPool.getInstance().getByteConsumer();
consumer.subscribe(Arrays.asList(consumerTopic));
LOGGER.info("Subscribed to topic: " + consumerTopic);
Command cmd = null;
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(0);
for (ConsumerRecord<String, byte[]> record : records) {
if (record.key().equals("launch")) {
consumer.commitSync();
cmd = (Command) SerializationUtils.deserialize(record.value());
LOGGER.info("Received a command cmd=" + cmd);
}
}
}
}
}
18 changes: 18 additions & 0 deletions agent/src/main/java/io/jenkins/plugins/remotingkafka/Options.java
@@ -0,0 +1,18 @@
package io.jenkins.plugins.remotingkafka;

import org.kohsuke.args4j.Option;

public class Options {

@Option(name = "-name", usage = "Name of the agent")
public String name;

@Option(name = "-master", usage = "The complete target Jenkins URL like 'http://server:8080/jenkins/'.")
public String master;

@Option(name = "-help", aliases = "--help", usage = "Show the help screen")
public boolean help;

@Option(name = "-kafkaURL", usage = "Kafka host and port address identifier")
public String kafkaURL;
}
15 changes: 15 additions & 0 deletions commands.sh
@@ -0,0 +1,15 @@
#!/usr/bin/env bash
# Commands used in Kafka.

# Delete a topic:
kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic test

# Create a topic:
kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 1 --topic test

# Consumer console:
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--from-beginning
31 changes: 31 additions & 0 deletions kafka-client-lib/pom.xml
@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>remoting-kafka-plugin</artifactId>
<groupId>org.jenkins-ci</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>kafka-client-lib</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Kafka Client Module</name>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.jenkins-ci.main</groupId>
<artifactId>remoting</artifactId>
<version>${remoting.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
@@ -0,0 +1,31 @@
package io.jenkins.plugins.remotingkafka;

public class KafkaConstants {
public static final String CONNECT_SUFFIX = "-connect";
// Common configs.
public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";

// Producer configs. TODO - add more configs.
public static final String KEY_SERIALIZER = "key.serializer";
public static final String VALUE_SERIALIZER = "value.serializer";
public static final String ACKS = "acks";
public static final String BUFFER_MEMORY = "buffer.memory";
public static final String COMPRESSION_TYPE = "compression.type";
public static final String RETRIES = "retries";
public static final String SSL_KEY_PASSWORD = "ssl.key.password";
public static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location";
public static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password";
public static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location";
public static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password";
public static final String BATCH_SIZE = "batch.size";
public static final String CLIENT_ID = "client.id";
public static final String CONNECTIONS_MAX_IDLE = "connections.max.idle.ms";
public static final String LINGER = "linger.ms";
public static final String MAX_BLOCK = "max.block.ms";

// Consumer configs. TODO - add more configs.
public static final String GROUP_ID = "group.id";
public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit";
public static final String KEY_DESERIALIZER = "key.deserializer";
public static final String VALUE_DESERIALIZER = "value.deserializer";
}
@@ -0,0 +1,65 @@
package io.jenkins.plugins.remotingkafka;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Pooling mechanism to reuse Kafka consumers.
*/
public class KafkaConsumerPool {
private static final Logger LOGGER = Logger.getLogger(KafkaConsumerPool.class.getName());
private static final Object instanceLock = new Object();
private static final Object poolLock = new Object();
private static volatile KafkaConsumerPool instance;
private LinkedBlockingQueue<KafkaConsumer<String, byte[]>> byteConsumerPool;
private Properties byteConsumerProps;

private KafkaConsumerPool() {
byteConsumerPool = new LinkedBlockingQueue<>();
}

public static KafkaConsumerPool getInstance() {
if (instance == null) {
synchronized (instanceLock) {
if (instance == null) {
instance = new KafkaConsumerPool();
}
}
}
return instance;
}

public void init(int poolSize, Properties byteConsumerProps) {
if (byteConsumerPool.isEmpty()) {
this.byteConsumerProps = byteConsumerProps;
for (int i = 0; i < poolSize; i++) {
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(byteConsumerProps);
byteConsumerPool.add(consumer);
}
}
}

public KafkaConsumer<String, byte[]> getByteConsumer() {
synchronized (poolLock) {
while (byteConsumerPool.isEmpty()) {
try {
poolLock.wait();
} catch (InterruptedException e) {
LOGGER.log(Level.SEVERE, "InterruptedException while getting a byte consumer", e);
}
}
return byteConsumerPool.poll();
}
}

public void releaseByteConsumer() {
synchronized (poolLock) {
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(byteConsumerProps);
byteConsumerPool.add(consumer);
}
}
}
@@ -0,0 +1,31 @@
package io.jenkins.plugins.remotingkafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;

import java.util.Properties;
import java.util.logging.Logger;

public class KafkaProducerClient {
private static final Logger LOGGER = Logger.getLogger(KafkaProducerClient.class.getName());
private static KafkaProducerClient instance;
private Producer<String, byte[]> byteProducer;

private KafkaProducerClient() {

}

public static KafkaProducerClient getInstance() {
if (instance == null) {
instance = new KafkaProducerClient();
}
return instance;
}

public Producer<String, byte[]> getByteProducer(Properties producerProps) {
if (byteProducer == null) {
byteProducer = new KafkaProducer<>(producerProps);
}
return byteProducer;
}
}

0 comments on commit 64e5ce0

Please sign in to comment.