Skip to content

Commit 16a2da3

Browse files
authored
Merge pull request #337 from mucahitkurt/use-node-informer
Use Node informer to prevent hitting the API server for all time
2 parents 4160887 + 99a83ac commit 16a2da3

File tree

6 files changed

+47
-30
lines changed

6 files changed

+47
-30
lines changed

cmd/csi-provisioner/csi-provisioner.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343

4444
utilfeature "k8s.io/apiserver/pkg/util/feature"
4545
"k8s.io/client-go/informers"
46+
"k8s.io/client-go/listers/core/v1"
4647
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
4748
utilflag "k8s.io/component-base/cli/flag"
4849
csitrans "k8s.io/csi-translation-lib"
@@ -184,11 +185,13 @@ func main() {
184185
}
185186

186187
var csiNodeLister storagelisters.CSINodeLister
188+
var nodeLister v1.NodeLister
187189
var factory informers.SharedInformerFactory
188190
if ctrl.SupportsTopology(pluginCapabilities) {
189191
// Create informer to prevent hit the API server for all resource request
190192
factory = informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
191193
csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
194+
nodeLister = factory.Core().V1().Nodes().Lister()
192195
}
193196

194197
// Create the provisioner: it implements the Provisioner interface expected by
@@ -207,7 +210,8 @@ func main() {
207210
supportsMigrationFromInTreePluginName,
208211
*strictTopology,
209212
translator,
210-
csiNodeLister)
213+
csiNodeLister,
214+
nodeLister)
211215

212216
provisionController = controller.NewProvisionController(
213217
clientset,
@@ -224,7 +228,7 @@ func main() {
224228
cacheSyncResult := factory.WaitForCacheSync(stopCh)
225229
for _, v := range cacheSyncResult {
226230
if !v {
227-
klog.Fatalf("Failed to sync CsiNodeInformer!")
231+
klog.Fatalf("Failed to sync Informers!")
228232
}
229233
}
230234
}

pkg/controller/controller.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"k8s.io/klog"
4848

4949
"google.golang.org/grpc"
50+
corelisters "k8s.io/client-go/listers/core/v1"
5051
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
5152
)
5253

@@ -197,6 +198,7 @@ type csiProvisioner struct {
197198
strictTopology bool
198199
translator ProvisionerCSITranslator
199200
csiNodeLister storagelisters.CSINodeLister
201+
nodeLister corelisters.NodeLister
200202
}
201203

202204
var _ controller.Provisioner = &csiProvisioner{}
@@ -257,7 +259,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
257259
supportsMigrationFromInTreePluginName string,
258260
strictTopology bool,
259261
translator ProvisionerCSITranslator,
260-
csiNodeLister storagelisters.CSINodeLister) controller.Provisioner {
262+
csiNodeLister storagelisters.CSINodeLister,
263+
nodeLister corelisters.NodeLister) controller.Provisioner {
261264

262265
csiClient := csi.NewControllerClient(grpcClient)
263266
provisioner := &csiProvisioner{
@@ -276,6 +279,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
276279
strictTopology: strictTopology,
277280
translator: translator,
278281
csiNodeLister: csiNodeLister,
282+
nodeLister: nodeLister,
279283
}
280284
return provisioner
281285
}
@@ -504,7 +508,8 @@ func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.
504508
options.StorageClass.AllowedTopologies,
505509
options.SelectedNode,
506510
p.strictTopology,
507-
p.csiNodeLister)
511+
p.csiNodeLister,
512+
p.nodeLister)
508513
if err != nil {
509514
return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err)
510515
}

pkg/controller/controller_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
401401

402402
pluginCaps, controllerCaps := provisionCapabilities()
403403
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test",
404-
5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
404+
5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil)
405405

406406
// Requested PVC with requestedBytes storage
407407
deletePolicy := v1.PersistentVolumeReclaimDelete
@@ -1464,7 +1464,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
14641464

14651465
pluginCaps, controllerCaps := provisionCapabilities()
14661466
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn,
1467-
nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, csitrans.New(), nil)
1467+
nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, csitrans.New(), nil, nil)
14681468

14691469
out := &csi.CreateVolumeResponse{
14701470
Volume: &csi.Volume{
@@ -2031,7 +2031,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
20312031

20322032
pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
20332033
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn,
2034-
client, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
2034+
client, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil)
20352035

20362036
out := &csi.CreateVolumeResponse{
20372037
Volume: &csi.Volume{
@@ -2201,11 +2201,11 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
22012201

22022202
clientSet := fakeclientset.NewSimpleClientset(nodes, csiNodes)
22032203

2204-
csiNodeLister, stopChan := csiNodeLister(clientSet, t)
2204+
csiNodeLister, nodeLister, stopChan := listers(clientSet)
22052205
defer close(stopChan)
22062206

22072207
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5,
2208-
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), csiNodeLister)
2208+
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), csiNodeLister, nodeLister)
22092209

22102210
pv, err := csiProvisioner.Provision(controller.ProvisionOptions{
22112211
StorageClass: &storagev1.StorageClass{},
@@ -2260,7 +2260,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) {
22602260
clientSet := fakeclientset.NewSimpleClientset()
22612261
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
22622262
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5,
2263-
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
2263+
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil)
22642264

22652265
out := &csi.CreateVolumeResponse{
22662266
Volume: &csi.Volume{
@@ -2440,7 +2440,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) {
24402440

24412441
pluginCaps, controllerCaps := provisionCapabilities()
24422442
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5,
2443-
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
2443+
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil)
24442444

24452445
err = csiProvisioner.Delete(tc.persistentVolume)
24462446
if tc.expectErr && err == nil {
@@ -3096,7 +3096,7 @@ func TestProvisionFromPVC(t *testing.T) {
30963096
}
30973097

30983098
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn,
3099-
nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
3099+
nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil)
31003100

31013101
pv, err := csiProvisioner.Provision(tc.volOpts)
31023102
if tc.expectErr && err == nil {
@@ -3175,7 +3175,7 @@ func TestProvisionWithMigration(t *testing.T) {
31753175
pluginCaps, controllerCaps := provisionCapabilities()
31763176
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner",
31773177
"test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps,
3178-
inTreePluginName, false, mockTranslator, nil)
3178+
inTreePluginName, false, mockTranslator, nil, nil)
31793179

31803180
// Set up return values (AnyTimes to avoid overfitting on implementation)
31813181

@@ -3335,7 +3335,7 @@ func TestDeleteMigration(t *testing.T) {
33353335
pluginCaps, controllerCaps := provisionCapabilities()
33363336
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner",
33373337
"test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "",
3338-
false, mockTranslator, nil)
3338+
false, mockTranslator, nil, nil)
33393339

33403340
// Set mock return values (AnyTimes to avoid overfitting on implementation details)
33413341
mockTranslator.EXPECT().IsPVMigratable(gomock.Any()).Return(tc.expectTranslation).AnyTimes()

pkg/controller/topology.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/apimachinery/pkg/util/version"
3535
utilfeature "k8s.io/apiserver/pkg/util/feature"
3636
"k8s.io/client-go/kubernetes"
37+
corelisters "k8s.io/client-go/listers/core/v1"
3738
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
3839
"k8s.io/klog"
3940
)
@@ -152,7 +153,8 @@ func GenerateAccessibilityRequirements(
152153
allowedTopologies []v1.TopologySelectorTerm,
153154
selectedNode *v1.Node,
154155
strictTopology bool,
155-
csiNodeLister storagelisters.CSINodeLister) (*csi.TopologyRequirement, error) {
156+
csiNodeLister storagelisters.CSINodeLister,
157+
nodeLister corelisters.NodeLister) (*csi.TopologyRequirement, error) {
156158
requirement := &csi.TopologyRequirement{}
157159

158160
var (
@@ -219,7 +221,7 @@ func GenerateAccessibilityRequirements(
219221
requisiteTerms = flatten(allowedTopologies)
220222
} else {
221223
// Aggregate existing topologies in nodes across the entire cluster.
222-
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode, csiNodeLister)
224+
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode, csiNodeLister, nodeLister)
223225
if err != nil {
224226
return nil, err
225227
}
@@ -309,7 +311,8 @@ func aggregateTopologies(
309311
kubeClient kubernetes.Interface,
310312
driverName string,
311313
selectedCSINode *storage.CSINode,
312-
csiNodeLister storagelisters.CSINodeLister) ([]topologyTerm, error) {
314+
csiNodeLister storagelisters.CSINodeLister,
315+
nodeLister corelisters.NodeLister) ([]topologyTerm, error) {
313316

314317
// 1. Determine topologyKeys to use for aggregation
315318
var topologyKeys []string
@@ -371,15 +374,14 @@ func aggregateTopologies(
371374
if err != nil {
372375
return nil, err
373376
}
374-
// TODO (#144): use informers
375-
nodes, err := kubeClient.CoreV1().Nodes().List(metav1.ListOptions{LabelSelector: selector})
377+
nodes, err := nodeLister.List(selector)
376378
if err != nil {
377379
return nil, fmt.Errorf("error listing nodes: %v", err)
378380
}
379381

380382
var terms []topologyTerm
381-
for _, node := range nodes.Items {
382-
term, _ := getTopologyFromNode(&node, topologyKeys)
383+
for _, node := range nodes {
384+
term, _ := getTopologyFromNode(node, topologyKeys)
383385
terms = append(terms, term)
384386
}
385387
if len(terms) == 0 {
@@ -508,7 +510,7 @@ func getTopologyFromNode(node *v1.Node, topologyKeys []string) (term topologyTer
508510
return term, false
509511
}
510512

511-
func buildTopologyKeySelector(topologyKeys []string) (string, error) {
513+
func buildTopologyKeySelector(topologyKeys []string) (labels.Selector, error) {
512514
var expr []metav1.LabelSelectorRequirement
513515
for _, key := range topologyKeys {
514516
expr = append(expr, metav1.LabelSelectorRequirement{
@@ -523,10 +525,10 @@ func buildTopologyKeySelector(topologyKeys []string) (string, error) {
523525

524526
selector, err := metav1.LabelSelectorAsSelector(&labelSelector)
525527
if err != nil {
526-
return "", fmt.Errorf("error parsing topology keys selector: %v", err)
528+
return nil, fmt.Errorf("error parsing topology keys selector: %v", err)
527529
}
528530

529-
return selector.String(), nil
531+
return selector, nil
530532
}
531533

532534
func (t topologyTerm) clone() topologyTerm {

pkg/controller/topology_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/apimachinery/pkg/util/sets"
2828
"k8s.io/client-go/informers"
2929
fakeclientset "k8s.io/client-go/kubernetes/fake"
30+
corelisters "k8s.io/client-go/listers/core/v1"
3031
"k8s.io/client-go/listers/storage/v1beta1"
3132
"k8s.io/kubernetes/pkg/apis/core/helper"
3233
)
@@ -392,7 +393,7 @@ func TestStatefulSetSpreading(t *testing.T) {
392393

393394
kubeClient := fakeclientset.NewSimpleClientset(nodes, csiNodes)
394395

395-
csiNodeLister, stopChan := csiNodeLister(kubeClient, t)
396+
csiNodeLister, nodeLister, stopChan := listers(kubeClient)
396397
defer close(stopChan)
397398

398399
for name, tc := range testcases {
@@ -410,6 +411,7 @@ func TestStatefulSetSpreading(t *testing.T) {
410411
nil,
411412
strictTopology,
412413
csiNodeLister,
414+
nodeLister,
413415
)
414416

415417
if err != nil {
@@ -804,6 +806,7 @@ func TestAllowedTopologies(t *testing.T) {
804806
nil, /* selectedNode */
805807
strictTopology,
806808
nil,
809+
nil,
807810
)
808811

809812
if err != nil {
@@ -1080,7 +1083,7 @@ func TestTopologyAggregation(t *testing.T) {
10801083

10811084
kubeClient := fakeclientset.NewSimpleClientset(nodes, csiNodes)
10821085

1083-
csiNodeLister, stopChan := csiNodeLister(kubeClient, t)
1086+
csiNodeLister, nodeLister, stopChan := listers(kubeClient)
10841087
defer close(stopChan)
10851088

10861089
var selectedNode *v1.Node
@@ -1095,6 +1098,7 @@ func TestTopologyAggregation(t *testing.T) {
10951098
selectedNode,
10961099
strictTopology,
10971100
csiNodeLister,
1101+
nodeLister,
10981102
)
10991103

11001104
if tc.expectError {
@@ -1332,7 +1336,7 @@ func TestPreferredTopologies(t *testing.T) {
13321336
kubeClient := fakeclientset.NewSimpleClientset(nodes, csiNodes)
13331337
selectedNode := &nodes.Items[0]
13341338

1335-
csiNodeLister, stopChan := csiNodeLister(kubeClient, t)
1339+
csiNodeLister, nodeLister, stopChan := listers(kubeClient)
13361340
defer close(stopChan)
13371341

13381342
requirements, err := GenerateAccessibilityRequirements(
@@ -1343,6 +1347,7 @@ func TestPreferredTopologies(t *testing.T) {
13431347
selectedNode,
13441348
strictTopology,
13451349
csiNodeLister,
1350+
nodeLister,
13461351
)
13471352

13481353
if tc.expectError {
@@ -1545,11 +1550,12 @@ func requisiteEqual(t1, t2 []*csi.Topology) bool {
15451550
return unchecked.Len() == 0
15461551
}
15471552

1548-
func csiNodeLister(kubeClient *fakeclientset.Clientset, t *testing.T) (v1beta1.CSINodeLister, chan struct{}) {
1553+
func listers(kubeClient *fakeclientset.Clientset) (v1beta1.CSINodeLister, corelisters.NodeLister, chan struct{}) {
15491554
factory := informers.NewSharedInformerFactory(kubeClient, ResyncPeriodOfCsiNodeInformer)
15501555
stopChan := make(chan struct{})
15511556
csiNodeLister := factory.Storage().V1beta1().CSINodes().Lister()
1557+
nodeLister := factory.Core().V1().Nodes().Lister()
15521558
factory.Start(stopChan)
15531559
factory.WaitForCacheSync(stopChan)
1554-
return csiNodeLister, stopChan
1560+
return csiNodeLister, nodeLister, stopChan
15551561
}

vendor/modules.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ k8s.io/apiserver/pkg/util/feature
246246
# k8s.io/client-go v0.0.0-20190918200256-06eb1244587a => k8s.io/client-go v0.0.0-20190918200256-06eb1244587a
247247
k8s.io/client-go/informers
248248
k8s.io/client-go/kubernetes
249+
k8s.io/client-go/listers/core/v1
249250
k8s.io/client-go/listers/storage/v1beta1
250251
k8s.io/client-go/rest
251252
k8s.io/client-go/tools/clientcmd
@@ -416,7 +417,6 @@ k8s.io/client-go/listers/batch/v2alpha1
416417
k8s.io/client-go/listers/certificates/v1beta1
417418
k8s.io/client-go/listers/coordination/v1
418419
k8s.io/client-go/listers/coordination/v1beta1
419-
k8s.io/client-go/listers/core/v1
420420
k8s.io/client-go/listers/events/v1beta1
421421
k8s.io/client-go/listers/extensions/v1beta1
422422
k8s.io/client-go/listers/networking/v1

0 commit comments

Comments
 (0)