Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[JENKINS-51413, JENKINS-51414] Implement basic master-agent communica…
…tion, master command invocation (#2) * Implement basic master-agent communication * Command transport implementation for Kafka * Implement kafka producer and consumer connection pool * Reorganize and fix build problem * Refactor packagings
- Loading branch information
Showing
24 changed files
with
969 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
#!/usr/bin/env bash | ||
# The script is used for manual testing of the agent. | ||
java -jar target/remoting-kafka-agent-1.0-SNAPSHOT-jar-with-dependencies.jar -name test -master http://localhost:9090 -kafkaURL 127.0.0.1:9092 |
83 changes: 83 additions & 0 deletions
83
agent/src/main/java/io/jenkins/plugins/remotingkafka/Agent.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package io.jenkins.plugins.remotingkafka; | ||
|
||
import hudson.remoting.Command; | ||
import org.apache.commons.lang3.SerializationUtils; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
import org.kohsuke.args4j.CmdLineException; | ||
import org.kohsuke.args4j.CmdLineParser; | ||
|
||
import java.io.IOException; | ||
import java.net.InetAddress; | ||
import java.net.URL; | ||
import java.util.Arrays; | ||
import java.util.Properties; | ||
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
|
||
public class Agent { | ||
private static final Logger LOGGER = Logger.getLogger(Agent.class.getName()); | ||
|
||
private final Options options; | ||
|
||
public Agent(Options options) { | ||
this.options = options; | ||
} | ||
|
||
public static void main(String... args) throws InterruptedException, IOException { | ||
Options options = new Options(); | ||
Agent agent = new Agent(options); | ||
|
||
CmdLineParser p = new CmdLineParser(options); | ||
try { | ||
p.parseArgument(args); | ||
} catch (CmdLineException e) { | ||
LOGGER.log(Level.SEVERE, "CmdLineException occurred during parseArgument", e); | ||
p.printUsage(System.out); | ||
System.exit(-1); | ||
} | ||
|
||
if (options.help) { | ||
p.printUsage(System.out); | ||
System.exit(0); | ||
} | ||
|
||
if (options.name == null) { | ||
try { | ||
agent.options.name = InetAddress.getLocalHost().getCanonicalHostName(); | ||
} catch (IOException e) { | ||
LOGGER.severe("Failed to lookup the canonical hostname of this agent, please check system settings."); | ||
LOGGER.severe("If not possible to resolve please specify a node name using the '-name' option"); | ||
System.exit(-1); | ||
} | ||
} | ||
|
||
URL url = new URL(options.master); | ||
String consumerTopic = url.getHost() + "-" + url.getPort() + "-" + options.name | ||
+ KafkaConstants.CONNECT_SUFFIX; | ||
|
||
// Consumer properties. | ||
Properties consumerProps = new Properties(); | ||
consumerProps.put(KafkaConstants.BOOTSTRAP_SERVERS, options.kafkaURL); | ||
consumerProps.put(KafkaConstants.GROUP_ID, "testID"); | ||
consumerProps.put(KafkaConstants.ENABLE_AUTO_COMMIT, "false"); | ||
consumerProps.put(KafkaConstants.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); | ||
consumerProps.put(KafkaConstants.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); | ||
KafkaConsumerPool.getInstance().init(4, consumerProps); | ||
KafkaConsumer<String, byte[]> consumer = KafkaConsumerPool.getInstance().getByteConsumer(); | ||
consumer.subscribe(Arrays.asList(consumerTopic)); | ||
LOGGER.info("Subscribed to topic: " + consumerTopic); | ||
Command cmd = null; | ||
while (true) { | ||
ConsumerRecords<String, byte[]> records = consumer.poll(0); | ||
for (ConsumerRecord<String, byte[]> record : records) { | ||
if (record.key().equals("launch")) { | ||
consumer.commitSync(); | ||
cmd = (Command) SerializationUtils.deserialize(record.value()); | ||
LOGGER.info("Received a command cmd=" + cmd); | ||
} | ||
} | ||
} | ||
} | ||
} |
18 changes: 18 additions & 0 deletions
18
agent/src/main/java/io/jenkins/plugins/remotingkafka/Options.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package io.jenkins.plugins.remotingkafka; | ||
|
||
import org.kohsuke.args4j.Option; | ||
|
||
public class Options { | ||
|
||
@Option(name = "-name", usage = "Name of the agent") | ||
public String name; | ||
|
||
@Option(name = "-master", usage = "The complete target Jenkins URL like 'http://server:8080/jenkins/'.") | ||
public String master; | ||
|
||
@Option(name = "-help", aliases = "--help", usage = "Show the help screen") | ||
public boolean help; | ||
|
||
@Option(name = "-kafkaURL", usage = "Kafka host and port address identifier") | ||
public String kafkaURL; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
#!/usr/bin/env bash | ||
# Commands used in Kafka. | ||
|
||
# Delete a topic: | ||
kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic test | ||
|
||
# Create a topic: | ||
kafka-topics.sh --create --zookeeper zookeeper:2181 \ | ||
--replication-factor 1 --partitions 1 --topic test | ||
|
||
# Consumer console: | ||
kafka-console-consumer.sh \ | ||
--bootstrap-server localhost:9092 \ | ||
--topic my-topic \ | ||
--from-beginning |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xmlns="http://maven.apache.org/POM/4.0.0" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>remoting-kafka-plugin</artifactId> | ||
<groupId>org.jenkins-ci</groupId> | ||
<version>1.0-SNAPSHOT</version> | ||
<relativePath>../pom.xml</relativePath> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>kafka-client-lib</artifactId> | ||
<version>1.0-SNAPSHOT</version> | ||
<name>Kafka Client Module</name> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>kafka-clients</artifactId> | ||
<version>1.1.0</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.jenkins-ci.main</groupId> | ||
<artifactId>remoting</artifactId> | ||
<version>${remoting.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
</project> |
31 changes: 31 additions & 0 deletions
31
kafka-client-lib/src/main/java/io/jenkins/plugins/remotingkafka/KafkaConstants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package io.jenkins.plugins.remotingkafka; | ||
|
||
public class KafkaConstants { | ||
public static final String CONNECT_SUFFIX = "-connect"; | ||
// Common configs. | ||
public static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; | ||
|
||
// Producer configs. TODO - add more configs. | ||
public static final String KEY_SERIALIZER = "key.serializer"; | ||
public static final String VALUE_SERIALIZER = "value.serializer"; | ||
public static final String ACKS = "acks"; | ||
public static final String BUFFER_MEMORY = "buffer.memory"; | ||
public static final String COMPRESSION_TYPE = "compression.type"; | ||
public static final String RETRIES = "retries"; | ||
public static final String SSL_KEY_PASSWORD = "ssl.key.password"; | ||
public static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location"; | ||
public static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password"; | ||
public static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location"; | ||
public static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password"; | ||
public static final String BATCH_SIZE = "batch.size"; | ||
public static final String CLIENT_ID = "client.id"; | ||
public static final String CONNECTIONS_MAX_IDLE = "connections.max.idle.ms"; | ||
public static final String LINGER = "linger.ms"; | ||
public static final String MAX_BLOCK = "max.block.ms"; | ||
|
||
// Consumer configs. TODO - add more configs. | ||
public static final String GROUP_ID = "group.id"; | ||
public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit"; | ||
public static final String KEY_DESERIALIZER = "key.deserializer"; | ||
public static final String VALUE_DESERIALIZER = "value.deserializer"; | ||
} |
65 changes: 65 additions & 0 deletions
65
kafka-client-lib/src/main/java/io/jenkins/plugins/remotingkafka/KafkaConsumerPool.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package io.jenkins.plugins.remotingkafka; | ||
|
||
import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
|
||
import java.util.Properties; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
|
||
/** | ||
* Pooling mechanism to reuse Kafka consumers. | ||
*/ | ||
public class KafkaConsumerPool { | ||
private static final Logger LOGGER = Logger.getLogger(KafkaConsumerPool.class.getName()); | ||
private static final Object instanceLock = new Object(); | ||
private static final Object poolLock = new Object(); | ||
private static volatile KafkaConsumerPool instance; | ||
private LinkedBlockingQueue<KafkaConsumer<String, byte[]>> byteConsumerPool; | ||
private Properties byteConsumerProps; | ||
|
||
private KafkaConsumerPool() { | ||
byteConsumerPool = new LinkedBlockingQueue<>(); | ||
} | ||
|
||
public static KafkaConsumerPool getInstance() { | ||
if (instance == null) { | ||
synchronized (instanceLock) { | ||
if (instance == null) { | ||
instance = new KafkaConsumerPool(); | ||
} | ||
} | ||
} | ||
return instance; | ||
} | ||
|
||
public void init(int poolSize, Properties byteConsumerProps) { | ||
if (byteConsumerPool.isEmpty()) { | ||
this.byteConsumerProps = byteConsumerProps; | ||
for (int i = 0; i < poolSize; i++) { | ||
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(byteConsumerProps); | ||
byteConsumerPool.add(consumer); | ||
} | ||
} | ||
} | ||
|
||
public KafkaConsumer<String, byte[]> getByteConsumer() { | ||
synchronized (poolLock) { | ||
while (byteConsumerPool.isEmpty()) { | ||
try { | ||
poolLock.wait(); | ||
} catch (InterruptedException e) { | ||
LOGGER.log(Level.SEVERE, "InterruptedException while getting a byte consumer", e); | ||
} | ||
} | ||
return byteConsumerPool.poll(); | ||
} | ||
} | ||
|
||
public void releaseByteConsumer() { | ||
synchronized (poolLock) { | ||
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(byteConsumerProps); | ||
byteConsumerPool.add(consumer); | ||
} | ||
} | ||
} |
31 changes: 31 additions & 0 deletions
31
kafka-client-lib/src/main/java/io/jenkins/plugins/remotingkafka/KafkaProducerClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package io.jenkins.plugins.remotingkafka; | ||
|
||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
|
||
import java.util.Properties; | ||
import java.util.logging.Logger; | ||
|
||
public class KafkaProducerClient { | ||
private static final Logger LOGGER = Logger.getLogger(KafkaProducerClient.class.getName()); | ||
private static KafkaProducerClient instance; | ||
private Producer<String, byte[]> byteProducer; | ||
|
||
private KafkaProducerClient() { | ||
|
||
} | ||
|
||
public static KafkaProducerClient getInstance() { | ||
if (instance == null) { | ||
instance = new KafkaProducerClient(); | ||
} | ||
return instance; | ||
} | ||
|
||
public Producer<String, byte[]> getByteProducer(Properties producerProps) { | ||
if (byteProducer == null) { | ||
byteProducer = new KafkaProducer<>(producerProps); | ||
} | ||
return byteProducer; | ||
} | ||
} |
Oops, something went wrong.