Skip to content

Commit fbaf15f

Browse files
chore(source/crd): improve coverage, fix deprecations (#5489)
1 parent 17de613 commit fbaf15f

File tree

2 files changed

+175
-25
lines changed

2 files changed

+175
-25
lines changed

source/crd.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"k8s.io/apimachinery/pkg/watch"
2727
"k8s.io/client-go/tools/cache"
2828

29+
"sigs.k8s.io/external-dns/source/annotations"
30+
2931
log "github.com/sirupsen/logrus"
3032
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3133
"k8s.io/apimachinery/pkg/labels"
@@ -95,7 +97,7 @@ func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiS
9597
}
9698

9799
scheme := runtime.NewScheme()
98-
addKnownTypes(scheme, groupVersion)
100+
_ = addKnownTypes(scheme, groupVersion)
99101

100102
config.GroupVersion = &groupVersion
101103
config.APIPath = "/apis"
@@ -120,14 +122,14 @@ func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFi
120122
}
121123
if startInformer {
122124
// external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any
123-
// missed or dropped events are handled. specify a resync period 0 to avoid unnecessary sync handler invocations.
125+
// missed or dropped events are handled. specify resync period 0 to avoid unnecessary sync handler invocations.
124126
informer := cache.NewSharedInformer(
125127
&cache.ListWatch{
126-
ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) {
127-
return sourceCrd.List(context.TODO(), &lo)
128+
ListWithContextFunc: func(ctx context.Context, lo metav1.ListOptions) (result runtime.Object, err error) {
129+
return sourceCrd.List(ctx, &lo)
128130
},
129-
WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) {
130-
return sourceCrd.watch(context.TODO(), &lo)
131+
WatchFuncWithContext: func(ctx context.Context, lo metav1.ListOptions) (watch.Interface, error) {
132+
return sourceCrd.watch(ctx, &lo)
131133
},
132134
},
133135
&apiv1alpha1.DNSEndpoint{},
@@ -138,13 +140,13 @@ func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFi
138140
return &sourceCrd, nil
139141
}
140142

141-
func (cs *crdSource) AddEventHandler(ctx context.Context, handler func()) {
143+
func (cs *crdSource) AddEventHandler(_ context.Context, handler func()) {
142144
if cs.informer != nil {
143145
log.Debug("Adding event handler for CRD")
144146
// Right now there is no way to remove event handler from informer, see:
145147
// https://github.com/kubernetes/kubernetes/issues/79610
146148
informer := *cs.informer
147-
informer.AddEventHandler(
149+
_, _ = informer.AddEventHandler(
148150
cache.ResourceEventHandlerFuncs{
149151
AddFunc: func(obj interface{}) {
150152
handler()
@@ -183,18 +185,18 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
183185
// Make sure that all endpoints have targets for A or CNAME type
184186
crdEndpoints := []*endpoint.Endpoint{}
185187
for _, ep := range dnsEndpoint.Spec.Endpoints {
186-
if (ep.RecordType == "CNAME" || ep.RecordType == "A" || ep.RecordType == "AAAA") && len(ep.Targets) < 1 {
188+
if (ep.RecordType == endpoint.RecordTypeCNAME || ep.RecordType == endpoint.RecordTypeA || ep.RecordType == endpoint.RecordTypeAAAA) && len(ep.Targets) < 1 {
187189
log.Warnf("Endpoint %s with DNSName %s has an empty list of targets", dnsEndpoint.Name, ep.DNSName)
188190
continue
189191
}
190192

191193
illegalTarget := false
192194
for _, target := range ep.Targets {
193-
if ep.RecordType != "NAPTR" && strings.HasSuffix(target, ".") {
195+
if ep.RecordType != endpoint.RecordTypeNAPTR && strings.HasSuffix(target, ".") {
194196
illegalTarget = true
195197
break
196198
}
197-
if ep.RecordType == "NAPTR" && !strings.HasSuffix(target, ".") {
199+
if ep.RecordType == endpoint.RecordTypeNAPTR && !strings.HasSuffix(target, ".") {
198200
illegalTarget = true
199201
break
200202
}
@@ -270,15 +272,10 @@ func (cs *crdSource) UpdateStatus(ctx context.Context, dnsEndpoint *apiv1alpha1.
270272

271273
// filterByAnnotations filters a list of dnsendpoints by a given annotation selector.
272274
func (cs *crdSource) filterByAnnotations(dnsendpoints *apiv1alpha1.DNSEndpointList) (*apiv1alpha1.DNSEndpointList, error) {
273-
labelSelector, err := metav1.ParseToLabelSelector(cs.annotationFilter)
275+
selector, err := annotations.ParseFilter(cs.annotationFilter)
274276
if err != nil {
275277
return nil, err
276278
}
277-
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
278-
if err != nil {
279-
return nil, err
280-
}
281-
282279
// empty filter returns original list
283280
if selector.Empty() {
284281
return dnsendpoints, nil

source/crd_test.go

Lines changed: 161 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,22 @@ import (
2424
"io"
2525
"net/http"
2626
"strings"
27+
"sync/atomic"
2728
"testing"
29+
"time"
2830

2931
"github.com/stretchr/testify/require"
3032
"github.com/stretchr/testify/suite"
3133
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3235
"k8s.io/apimachinery/pkg/labels"
3336
"k8s.io/apimachinery/pkg/runtime"
3437
"k8s.io/apimachinery/pkg/runtime/schema"
3538
"k8s.io/apimachinery/pkg/runtime/serializer"
3639
"k8s.io/client-go/rest"
3740
"k8s.io/client-go/rest/fake"
38-
41+
"k8s.io/client-go/tools/cache"
42+
cachetesting "k8s.io/client-go/tools/cache/testing"
3943
apiv1alpha1 "sigs.k8s.io/external-dns/apis/v1alpha1"
4044
"sigs.k8s.io/external-dns/endpoint"
4145
)
@@ -83,7 +87,6 @@ func fakeRESTClient(endpoints []*endpoint.Endpoint, apiVersion, kind, namespace,
8387
codecFactory := serializer.WithoutConversionCodecFactory{
8488
CodecFactory: serializer.NewCodecFactory(scheme),
8589
}
86-
8790
client := &fake.RESTClient{
8891
GroupVersion: groupVersion,
8992
VersionedAPIPath: "/apis/" + apiVersion,
@@ -103,7 +106,10 @@ func fakeRESTClient(endpoints []*endpoint.Endpoint, apiVersion, kind, namespace,
103106
decoder := json.NewDecoder(req.Body)
104107

105108
var body apiv1alpha1.DNSEndpoint
106-
decoder.Decode(&body)
109+
err := decoder.Decode(&body)
110+
if err != nil {
111+
return nil, err
112+
}
107113
dnsEndpoint.Status.ObservedGeneration = body.Status.ObservedGeneration
108114
return &http.Response{StatusCode: http.StatusOK, Header: defaultHeader(), Body: objBody(codec, dnsEndpoint)}, nil
109115
default:
@@ -475,22 +481,22 @@ func testCRDSourceEndpoints(t *testing.T) {
475481
restClient := fakeRESTClient(ti.endpoints, ti.registeredAPIVersion, ti.registeredKind, ti.registeredNamespace, "test", ti.annotations, ti.labels, t)
476482
groupVersion, err := schema.ParseGroupVersion(ti.apiVersion)
477483
require.NoError(t, err)
484+
require.NotNil(t, groupVersion)
478485

479486
scheme := runtime.NewScheme()
480-
require.NoError(t, addKnownTypes(scheme, groupVersion))
487+
err = addKnownTypes(scheme, groupVersion)
488+
require.NoError(t, err)
481489

482490
labelSelector, err := labels.Parse(ti.labelFilter)
483491
require.NoError(t, err)
484492

485493
// At present, client-go's fake.RESTClient (used by crd_test.go) is known to cause race conditions when used
486494
// with informers: https://github.com/kubernetes/kubernetes/issues/95372
487495
// So don't start the informer during testing.
488-
startInformer := false
489-
490-
cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme, startInformer)
496+
cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme, false)
491497
require.NoError(t, err)
492498

493-
receivedEndpoints, err := cs.Endpoints(context.Background())
499+
receivedEndpoints, err := cs.Endpoints(t.Context())
494500
if ti.expectError {
495501
require.Errorf(t, err, "Received err %v", err)
496502
} else {
@@ -511,7 +517,133 @@ func testCRDSourceEndpoints(t *testing.T) {
511517
}
512518
}
513519

520+
func TestCRDSource_NoInformer(t *testing.T) {
521+
cs := &crdSource{informer: nil}
522+
called := false
523+
524+
cs.AddEventHandler(context.Background(), func() { called = true })
525+
require.False(t, called, "handler must not be called when informer is nil")
526+
}
527+
528+
func TestCRDSource_AddEventHandler_Add(t *testing.T) {
529+
ctx := t.Context()
530+
watcher, cs := helperCreateWatcherWithInformer(t)
531+
532+
var counter atomic.Int32
533+
cs.AddEventHandler(ctx, func() {
534+
counter.Add(1)
535+
})
536+
537+
obj := &unstructured.Unstructured{}
538+
obj.SetName("test")
539+
540+
watcher.Add(obj)
541+
542+
require.Eventually(t, func() bool {
543+
return counter.Load() == 1
544+
}, time.Second, 10*time.Millisecond)
545+
}
546+
547+
func TestCRDSource_AddEventHandler_Update(t *testing.T) {
548+
ctx := t.Context()
549+
watcher, cs := helperCreateWatcherWithInformer(t)
550+
551+
var counter atomic.Int32
552+
cs.AddEventHandler(ctx, func() {
553+
counter.Add(1)
554+
})
555+
556+
obj := unstructured.Unstructured{}
557+
obj.SetName("test")
558+
obj.SetNamespace("default")
559+
obj.SetUID("9be5b64e-3ee9-11f0-88ee-1eb95c6fd730")
560+
561+
watcher.Add(&obj)
562+
563+
require.Eventually(t, func() bool {
564+
return len(watcher.Items) == 1
565+
}, time.Second, 10*time.Millisecond)
566+
567+
modified := obj.DeepCopy()
568+
modified.SetLabels(map[string]string{"new-label": "this"})
569+
watcher.Modify(modified)
570+
571+
require.Eventually(t, func() bool {
572+
return len(watcher.Items) == 1
573+
}, time.Second, 10*time.Millisecond)
574+
575+
require.Eventually(t, func() bool {
576+
return counter.Load() == 2
577+
}, time.Second, 10*time.Millisecond)
578+
}
579+
580+
func TestCRDSource_AddEventHandler_Delete(t *testing.T) {
581+
ctx := t.Context()
582+
watcher, cs := helperCreateWatcherWithInformer(t)
583+
584+
var counter atomic.Int32
585+
cs.AddEventHandler(ctx, func() {
586+
counter.Add(1)
587+
})
588+
589+
obj := &unstructured.Unstructured{}
590+
obj.SetName("test")
591+
592+
watcher.Delete(obj)
593+
594+
require.Eventually(t, func() bool {
595+
return counter.Load() == 1
596+
}, time.Second, 10*time.Millisecond)
597+
}
598+
599+
func TestCRDSource_Watch(t *testing.T) {
600+
scheme := runtime.NewScheme()
601+
err := apiv1alpha1.AddToScheme(scheme)
602+
require.NoError(t, err)
603+
604+
var watchCalled bool
605+
606+
codecFactory := serializer.WithoutConversionCodecFactory{
607+
CodecFactory: serializer.NewCodecFactory(scheme),
608+
}
609+
610+
versionApiPath := fmt.Sprintf("/apis/%s", apiv1alpha1.GroupVersion.String())
611+
612+
client := &fake.RESTClient{
613+
GroupVersion: apiv1alpha1.GroupVersion,
614+
VersionedAPIPath: versionApiPath,
615+
NegotiatedSerializer: codecFactory,
616+
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
617+
if req.URL.Path == fmt.Sprintf("%s/namespaces/test-ns/dnsendpoints", versionApiPath) &&
618+
req.URL.Query().Get("watch") == "true" {
619+
watchCalled = true
620+
return &http.Response{
621+
StatusCode: http.StatusOK,
622+
Header: make(http.Header),
623+
}, nil
624+
}
625+
t.Errorf("unexpected request: %v", req.URL)
626+
return nil, fmt.Errorf("unexpected request: %v", req.URL)
627+
}),
628+
}
629+
630+
cs := &crdSource{
631+
crdClient: client,
632+
namespace: "test-ns",
633+
crdResource: "dnsendpoints",
634+
codec: runtime.NewParameterCodec(scheme),
635+
}
636+
637+
opts := &metav1.ListOptions{}
638+
639+
_, err = cs.watch(t.Context(), opts)
640+
require.NoError(t, err)
641+
require.True(t, watchCalled)
642+
require.True(t, opts.Watch)
643+
}
644+
514645
func validateCRDResource(t *testing.T, src Source, expectError bool) {
646+
t.Helper()
515647
cs := src.(*crdSource)
516648
result, err := cs.List(context.Background(), &metav1.ListOptions{})
517649
if expectError {
@@ -526,3 +658,24 @@ func validateCRDResource(t *testing.T, src Source, expectError bool) {
526658
}
527659
}
528660
}
661+
662+
func helperCreateWatcherWithInformer(t *testing.T) (*cachetesting.FakeControllerSource, crdSource) {
663+
t.Helper()
664+
ctx := t.Context()
665+
666+
watcher := cachetesting.NewFakeControllerSource()
667+
668+
informer := cache.NewSharedInformer(watcher, &unstructured.Unstructured{}, 0)
669+
670+
go informer.RunWithContext(ctx)
671+
672+
require.Eventually(t, func() bool {
673+
return cache.WaitForCacheSync(ctx.Done(), informer.HasSynced)
674+
}, time.Second, 10*time.Millisecond)
675+
676+
cs := &crdSource{
677+
informer: &informer,
678+
}
679+
680+
return watcher, *cs
681+
}

0 commit comments

Comments
 (0)