Skip to content

Commit c031af3

Browse files
committed
Fix #468 - properly close clients when they are unused and no longer cached
1 parent 99e1445 commit c031af3

12 files changed

+273
-125
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ apply from: "https://raw.githubusercontent.com/gocd/gocd-plugin-gradle-task-help
2020

2121
gocdPlugin {
2222
id = 'cd.go.contrib.elasticagent.kubernetes'
23-
pluginVersion = '4.1.0'
23+
pluginVersion = '4.1.1'
2424
goCdVersion = '21.4.0'
2525
name = 'Kubernetes Elastic Agent Plugin'
2626
description = 'Kubernetes Based Elastic Agent Plugins for GoCD'

src/main/java/cd/go/contrib/elasticagent/KubernetesAgentInstances.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,13 @@ private KubernetesInstance createKubernetesInstance(CreateAgentRequest request,
8787
return null;
8888
}
8989

90-
KubernetesClient client = factory.client(settings);
91-
KubernetesInstance instance = kubernetesInstanceFactory.create(request, settings, client, pluginRequest);
92-
consoleLogAppender.accept(String.format("Creating pod: %s", instance.name()));
93-
register(instance);
94-
consoleLogAppender.accept(String.format("Agent pod %s created. Waiting for it to register to the GoCD server.", instance.name()));
95-
96-
return instance;
90+
try (KubernetesClientFactory.CachedClient client = factory.client(settings)) {
91+
KubernetesInstance instance = kubernetesInstanceFactory.create(request, settings, client.get(), pluginRequest);
92+
consoleLogAppender.accept(String.format("Creating pod: %s", instance.name()));
93+
register(instance);
94+
consoleLogAppender.accept(String.format("Agent pod %s created. Waiting for it to register to the GoCD server.", instance.name()));
95+
return instance;
96+
}
9797
}
9898

9999
private boolean isAgentCreatedForJob(Long jobId) {
@@ -110,8 +110,9 @@ private boolean isAgentCreatedForJob(Long jobId) {
110110
public void terminate(String agentId, PluginSettings settings) {
111111
KubernetesInstance instance = instances.get(agentId);
112112
if (instance != null) {
113-
KubernetesClient client = factory.client(settings);
114-
instance.terminate(client);
113+
try (KubernetesClientFactory.CachedClient client = factory.client(settings)) {
114+
instance.terminate(client.get());
115+
}
115116
} else {
116117
LOG.warn(format("Requested to terminate an instance that does not exist {0}.", agentId));
117118
}
@@ -152,16 +153,18 @@ public void refreshAll(PluginSettings properties) {
152153
LOG.debug("[Refresh Instances] Syncing k8s elastic agent pod information for cluster {}.", properties);
153154
PodList list = null;
154155
try {
155-
KubernetesClient client = factory.client(properties);
156-
list = client.pods().list();
156+
try (KubernetesClientFactory.CachedClient client = factory.client(properties)) {
157+
list = client.get().pods().list();
158+
}
157159
} catch (Exception e) {
158160
LOG.error("Error occurred while trying to list kubernetes pods:", e);
159161

160162
if (e.getCause() instanceof SocketTimeoutException) {
161163
LOG.error("Error caused due to SocketTimeoutException. This generally happens due to stale kubernetes client. Clearing out existing kubernetes client and creating a new one!");
162164
factory.clearOutExistingClient();
163-
KubernetesClient client = factory.client(properties);
164-
list = client.pods().list();
165+
try (KubernetesClientFactory.CachedClient client = factory.client(properties)) {
166+
list = client.get().pods().list();
167+
}
165168
}
166169
}
167170

@@ -195,23 +198,23 @@ public void register(KubernetesInstance instance) {
195198
private KubernetesAgentInstances unregisteredAfterTimeout(PluginSettings settings, Agents knownAgents) throws Exception {
196199
Duration period = settings.getAutoRegisterPeriod();
197200
KubernetesAgentInstances unregisteredInstances = new KubernetesAgentInstances();
198-
KubernetesClient client = factory.client(settings);
199-
200-
for (String instanceName : instances.keySet()) {
201-
if (knownAgents.containsAgentWithId(instanceName)) {
202-
continue;
203-
}
201+
try (KubernetesClientFactory.CachedClient client = factory.client(settings)) {
202+
for (String instanceName : instances.keySet()) {
203+
if (knownAgents.containsAgentWithId(instanceName)) {
204+
continue;
205+
}
204206

205-
Pod pod = getPod(client, instanceName);
206-
if (pod == null) {
207-
LOG.debug(String.format("[server-ping] Pod with name %s is already deleted.", instanceName));
208-
continue;
209-
}
207+
Pod pod = getPod(client.get(), instanceName);
208+
if (pod == null) {
209+
LOG.debug(String.format("[server-ping] Pod with name %s is already deleted.", instanceName));
210+
continue;
211+
}
210212

211-
Instant createdAt = Constants.KUBERNETES_POD_CREATION_TIME_FORMAT.parse(pod.getMetadata().getCreationTimestamp(), Instant::from);
213+
Instant createdAt = Constants.KUBERNETES_POD_CREATION_TIME_FORMAT.parse(pod.getMetadata().getCreationTimestamp(), Instant::from);
212214

213-
if (clock.now().isAfter(createdAt.plus(period))) {
214-
unregisteredInstances.register(kubernetesInstanceFactory.fromKubernetesPod(pod));
215+
if (clock.now().isAfter(createdAt.plus(period))) {
216+
unregisteredInstances.register(kubernetesInstanceFactory.fromKubernetesPod(pod));
217+
}
215218
}
216219
}
217220

src/main/java/cd/go/contrib/elasticagent/KubernetesClientFactory.java

Lines changed: 67 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
2222

2323
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicInteger;
2425

2526
import static cd.go.contrib.elasticagent.KubernetesPlugin.LOG;
2627
import static cd.go.contrib.elasticagent.utils.Util.isBlank;
@@ -29,13 +30,13 @@
2930

3031
public class KubernetesClientFactory {
3132
private static final KubernetesClientFactory KUBERNETES_CLIENT_FACTORY = new KubernetesClientFactory();
32-
private final Clock clock;
33-
private KubernetesClient client;
34-
private PluginSettings clusterProfileConfigurations;
35-
private long clientCreatedTime;
36-
private long kubernetesClientRecycleIntervalInMinutes = -1;
3733
public static final String CLIENT_RECYCLE_SYSTEM_PROPERTY_KEY = "go.kubernetes.elastic-agent.plugin.client.recycle.interval.in.minutes";
3834

35+
private final Clock clock;
36+
37+
private volatile CachedClient client;
38+
private volatile long kubernetesClientRecycleIntervalInMinutes = -1;
39+
3940
KubernetesClientFactory() {
4041
this(Clock.DEFAULT);
4142
}
@@ -50,32 +51,30 @@ public static KubernetesClientFactory instance() {
5051
return KUBERNETES_CLIENT_FACTORY;
5152
}
5253

53-
public synchronized KubernetesClient client(PluginSettings clusterProfileConfigurations) {
54+
public synchronized CachedClient client(PluginSettings clusterProfileConfigurations) {
5455
clearOutClientOnTimer();
55-
if (clusterProfileConfigurations.equals(this.clusterProfileConfigurations) && this.client != null) {
56+
if (this.client != null && clusterProfileConfigurations.equals(this.client.clusterProfileConfigurations)) {
5657
LOG.debug("Using previously created client.");
58+
this.client.leases.incrementAndGet();
5759
return this.client;
5860
}
5961

6062
LOG.debug(format("Creating a new client because {0}.", (client == null) ? "client is null" : "cluster profile configurations has changed"));
61-
this.clusterProfileConfigurations = clusterProfileConfigurations;
63+
clearOutExistingClient();
6264
this.client = createClientFor(clusterProfileConfigurations);
63-
this.clientCreatedTime = this.clock.now().toEpochMilli();
6465
LOG.debug("New client is created.");
6566

6667
return this.client;
6768
}
6869

6970
private void clearOutClientOnTimer() {
70-
long currentTime = this.clock.now().toEpochMilli();
71-
long differenceInMinutes = TimeUnit.MILLISECONDS.toMinutes(currentTime - this.clientCreatedTime);
72-
if (differenceInMinutes > getKubernetesClientRecycleInterval()) {
71+
if (client != null && TimeUnit.MILLISECONDS.toMinutes(this.clock.now().toEpochMilli() - this.client.clientCreatedTime) > getKubernetesClientRecycleInterval()) {
7372
LOG.info("Recycling kubernetes client on timer...");
7473
clearOutExistingClient();
7574
}
7675
}
7776

78-
private KubernetesClient createClientFor(PluginSettings pluginSettings) {
77+
private CachedClient createClientFor(PluginSettings pluginSettings) {
7978
Config config = Config.autoConfigure(null);
8079

8180
setIfNotBlank(config::setMasterUrl, pluginSettings.getClusterUrl());
@@ -84,13 +83,12 @@ private KubernetesClient createClientFor(PluginSettings pluginSettings) {
8483
setIfNotBlank(config::setCaCertData, pluginSettings.getCaCertData());
8584
config.setRequestTimeout(pluginSettings.getClusterRequestTimeout());
8685

87-
return new KubernetesClientBuilder().withConfig(config).build();
86+
return new CachedClient(new KubernetesClientBuilder().withConfig(config).build(), pluginSettings);
8887
}
8988

90-
public void clearOutExistingClient() {
89+
public synchronized void clearOutExistingClient() {
9190
if (this.client != null) {
92-
LOG.debug("Terminating existing kubernetes client...");
93-
this.client.close();
91+
this.client.closeIfUnused();
9492
this.client = null;
9593
}
9694
}
@@ -109,12 +107,63 @@ private long getKubernetesClientRecycleInterval() {
109107
}
110108

111109
try {
112-
this.kubernetesClientRecycleIntervalInMinutes = Integer.valueOf(property);
110+
this.kubernetesClientRecycleIntervalInMinutes = Integer.parseInt(property);
113111
} catch (Exception e) {
114112
//set default value to 10 minutes when parsing user input fails
115113
this.kubernetesClientRecycleIntervalInMinutes = 10;
116114
}
117115

118116
return this.kubernetesClientRecycleIntervalInMinutes;
119117
}
118+
119+
public class CachedClient implements AutoCloseable {
120+
121+
private final KubernetesClient client;
122+
private final PluginSettings clusterProfileConfigurations;
123+
private final AtomicInteger leases = new AtomicInteger(1);
124+
private final long clientCreatedTime;
125+
private volatile boolean closed;
126+
127+
CachedClient(KubernetesClient client, PluginSettings clusterProfileConfigurations) {
128+
this.client = client;
129+
this.clusterProfileConfigurations = clusterProfileConfigurations;
130+
this.clientCreatedTime = KubernetesClientFactory.this.clock.now().toEpochMilli();
131+
}
132+
133+
public KubernetesClient get() {
134+
return client;
135+
}
136+
137+
public int leases() {
138+
return leases.get();
139+
}
140+
141+
public boolean isClosed() {
142+
return closed;
143+
}
144+
145+
@Override
146+
public void close() {
147+
releaseLease();
148+
}
149+
150+
private void releaseLease() {
151+
// Close the client only if it is not the same as the one we have cached
152+
if (leases.decrementAndGet() == 0 && this != KubernetesClientFactory.this.client && !closed) {
153+
closeUnderlyingClient();
154+
}
155+
}
156+
157+
public void closeIfUnused() {
158+
if (leases() == 0 && !closed) {
159+
closeUnderlyingClient();
160+
}
161+
}
162+
163+
private void closeUnderlyingClient() {
164+
LOG.debug("Terminating existing kubernetes client...");
165+
client.close();
166+
closed = true;
167+
}
168+
}
120169
}

src/main/java/cd/go/contrib/elasticagent/executors/AgentStatusReportExecutor.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,26 +55,26 @@ public GoPluginApiResponse execute() {
5555
String elasticAgentId = request.getElasticAgentId();
5656
JobIdentifier jobIdentifier = request.getJobIdentifier();
5757
LOG.info(format("[status-report] Generating status report for agent: {0} with job: {1}", elasticAgentId, jobIdentifier));
58-
KubernetesClient client = factory.client(request.clusterProfileProperties());
58+
try (KubernetesClientFactory.CachedClient client = factory.client(request.clusterProfileProperties())) {
59+
try {
60+
Pod pod;
61+
if (!isBlank(elasticAgentId)) {
62+
pod = findPodUsingElasticAgentId(elasticAgentId, client.get());
63+
} else {
64+
pod = findPodUsingJobIdentifier(jobIdentifier, client.get());
65+
}
5966

60-
try {
61-
Pod pod;
62-
if (!isBlank(elasticAgentId)) {
63-
pod = findPodUsingElasticAgentId(elasticAgentId, client);
64-
} else {
65-
pod = findPodUsingJobIdentifier(jobIdentifier, client);
66-
}
67-
68-
KubernetesElasticAgent elasticAgent = KubernetesElasticAgent.fromPod(client, pod, jobIdentifier);
67+
KubernetesElasticAgent elasticAgent = KubernetesElasticAgent.fromPod(client.get(), pod, jobIdentifier);
6968

70-
final String statusReportView = statusReportViewBuilder.build(statusReportViewBuilder.getTemplate("agent-status-report.template.ftlh"), elasticAgent);
69+
final String statusReportView = statusReportViewBuilder.build(statusReportViewBuilder.getTemplate("agent-status-report.template.ftlh"), elasticAgent);
7170

72-
final JsonObject responseJSON = new JsonObject();
73-
responseJSON.addProperty("view", statusReportView);
71+
final JsonObject responseJSON = new JsonObject();
72+
responseJSON.addProperty("view", statusReportView);
7473

75-
return DefaultGoPluginApiResponse.success(responseJSON.toString());
76-
} catch (Exception e) {
77-
return StatusReportGenerationErrorHandler.handle(statusReportViewBuilder, e);
74+
return DefaultGoPluginApiResponse.success(responseJSON.toString());
75+
} catch (Exception e) {
76+
return StatusReportGenerationErrorHandler.handle(statusReportViewBuilder, e);
77+
}
7878
}
7979
}
8080

src/main/java/cd/go/contrib/elasticagent/executors/ClusterStatusReportExecutor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.thoughtworks.go.plugin.api.response.DefaultGoPluginApiResponse;
2626
import com.thoughtworks.go.plugin.api.response.GoPluginApiResponse;
2727
import freemarker.template.Template;
28-
import io.fabric8.kubernetes.client.KubernetesClient;
2928

3029
import static cd.go.contrib.elasticagent.KubernetesPlugin.LOG;
3130

@@ -49,8 +48,10 @@ public ClusterStatusReportExecutor(ClusterStatusReportRequest request, PluginSta
4948
public GoPluginApiResponse execute() {
5049
try {
5150
LOG.info("[status-report] Generating status report.");
52-
KubernetesClient client = factory.client(request.clusterProfileProperties());
53-
final KubernetesCluster kubernetesCluster = new KubernetesCluster(client);
51+
final KubernetesCluster kubernetesCluster;
52+
try (KubernetesClientFactory.CachedClient client = factory.client(request.clusterProfileProperties())) {
53+
kubernetesCluster = new KubernetesCluster(client.get());
54+
}
5455
final Template template = statusReportViewBuilder.getTemplate("status-report.template.ftlh");
5556
final String statusReportView = statusReportViewBuilder.build(template, kubernetesCluster);
5657

src/test/java/cd/go/contrib/elasticagent/KubernetesAgentInstancesIntegrationTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import io.fabric8.kubernetes.client.dsl.PodResource;
2525
import org.junit.jupiter.api.BeforeEach;
2626
import org.junit.jupiter.api.Test;
27+
import org.mockito.Answers;
2728
import org.mockito.ArgumentCaptor;
2829
import org.mockito.Mock;
29-
import org.mockito.Mockito;
3030
import org.mockito.stubbing.Answer;
3131

3232
import java.util.ArrayList;
@@ -45,7 +45,7 @@
4545

4646
public class KubernetesAgentInstancesIntegrationTest {
4747

48-
@Mock
48+
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
4949
private KubernetesClientFactory mockedKubernetesClientFactory;
5050

5151
@Mock
@@ -72,7 +72,7 @@ public class KubernetesAgentInstancesIntegrationTest {
7272
public void setUp() {
7373
openMocks(this);
7474
kubernetesAgentInstances = new KubernetesAgentInstances(mockedKubernetesClientFactory);
75-
when(mockedKubernetesClientFactory.client(any())).thenReturn(mockKubernetesClient);
75+
when(mockedKubernetesClientFactory.client(any()).get()).thenReturn(mockKubernetesClient);
7676

7777
when(pods.resource(any(Pod.class))).thenAnswer((Answer<PodResource>) invocation -> {
7878
Object[] args = invocation.getArguments();

src/test/java/cd/go/contrib/elasticagent/KubernetesAgentInstancesTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.fabric8.kubernetes.client.dsl.PodResource;
2727
import org.junit.jupiter.api.BeforeEach;
2828
import org.junit.jupiter.api.Test;
29+
import org.mockito.Answers;
2930
import org.mockito.InOrder;
3031
import org.mockito.Mock;
3132

@@ -43,7 +44,7 @@
4344
import static org.mockito.MockitoAnnotations.openMocks;
4445

4546
public class KubernetesAgentInstancesTest {
46-
@Mock
47+
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
4748
KubernetesClientFactory factory;
4849

4950
@Mock
@@ -79,7 +80,7 @@ public void setUp() {
7980
when(mockCreateAgentRequest.properties()).thenReturn(testProperties);
8081
when(mockPluginSettings.getMaxPendingPods()).thenReturn(10);
8182
when(mockPluginSettings.getClusterRequestTimeout()).thenReturn(10000);
82-
when(factory.client(mockPluginSettings)).thenReturn(mockKubernetesClient);
83+
when(factory.client(mockPluginSettings).get()).thenReturn(mockKubernetesClient);
8384
JobIdentifier jobId = new JobIdentifier("test", 1L, "Test pipeline", "test name", "1", "test job", 100L);
8485
when(mockCreateAgentRequest.jobIdentifier()).thenReturn(jobId);
8586

0 commit comments

Comments
 (0)