Skip to content

Commit fb5ba3a

Browse files
authored
🐛 Use syncmap for the resource cache (#1023)
* Use syncmap for the resource cache Signed-off-by: zhujian <[email protected]> * update unit tests Signed-off-by: zhujian <[email protected]> * fix unit test Signed-off-by: zhujian <[email protected]> * use sync.map directly Signed-off-by: zhujian <[email protected]> --------- Signed-off-by: zhujian <[email protected]>
1 parent 8faa1b2 commit fb5ba3a

File tree

4 files changed

+280
-4
lines changed

4 files changed

+280
-4
lines changed
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package apply
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
"sync"
7+
8+
corev1 "k8s.io/api/core/v1"
9+
"k8s.io/apimachinery/pkg/api/meta"
10+
"k8s.io/apimachinery/pkg/runtime"
11+
"k8s.io/apimachinery/pkg/runtime/schema"
12+
"k8s.io/klog/v2"
13+
)
14+
15+
type cachedVersionKey struct {
16+
name string
17+
namespace string
18+
kind schema.GroupKind
19+
}
20+
21+
// record of resource metadata used to determine if its safe to return early from an ApplyFoo
22+
// resourceHash is an ms5 hash of the required in an ApplyFoo that is computed in case the input changes
23+
// resourceVersion is the received resourceVersion from the apiserver in response to an update that is comparable to the GET
24+
type cachedResource struct {
25+
resourceHash, resourceVersion string
26+
}
27+
28+
type resourceCache struct {
29+
cache sync.Map // use syncmap for concurrent access
30+
}
31+
32+
// NewResourceCache creates a new resource cache instance.
33+
// TODO: currently only work agent uses this syncmap cache, consider using this in other components
34+
func NewResourceCache() *resourceCache {
35+
return &resourceCache{
36+
cache: sync.Map{},
37+
}
38+
}
39+
40+
func getResourceMetadata(obj runtime.Object) (schema.GroupKind, string, string, string, error) {
41+
if obj == nil {
42+
return schema.GroupKind{}, "", "", "", fmt.Errorf("nil object has no metadata")
43+
}
44+
metadata, err := meta.Accessor(obj)
45+
if err != nil {
46+
return schema.GroupKind{}, "", "", "", err
47+
}
48+
if metadata == nil || reflect.ValueOf(metadata).IsNil() {
49+
return schema.GroupKind{}, "", "", "", fmt.Errorf("object has no metadata")
50+
}
51+
resourceHash := hashOfResourceStruct(obj)
52+
53+
// retrieve kind, sometimes this can be done via the accesor, sometimes not (depends on the type)
54+
kind := schema.GroupKind{}
55+
gvk := obj.GetObjectKind().GroupVersionKind()
56+
if len(gvk.Kind) > 0 {
57+
kind = gvk.GroupKind()
58+
} else {
59+
if currKind := getCoreGroupKind(obj); currKind != nil {
60+
kind = *currKind
61+
}
62+
}
63+
if len(kind.Kind) == 0 {
64+
return schema.GroupKind{}, "", "", "", fmt.Errorf("unable to determine GroupKind of %T", obj)
65+
}
66+
67+
return kind, metadata.GetName(), metadata.GetNamespace(), resourceHash, nil
68+
}
69+
70+
func getResourceVersion(obj runtime.Object) (string, error) {
71+
if obj == nil {
72+
return "", fmt.Errorf("nil object has no resourceVersion")
73+
}
74+
metadata, err := meta.Accessor(obj)
75+
if err != nil {
76+
return "", err
77+
}
78+
if metadata == nil || reflect.ValueOf(metadata).IsNil() {
79+
return "", fmt.Errorf("object has no metadata")
80+
}
81+
rv := metadata.GetResourceVersion()
82+
if len(rv) == 0 {
83+
return "", fmt.Errorf("missing resourceVersion")
84+
}
85+
86+
return rv, nil
87+
}
88+
89+
func (c *resourceCache) UpdateCachedResourceMetadata(required runtime.Object, actual runtime.Object) {
90+
if c == nil {
91+
return
92+
}
93+
if required == nil || actual == nil {
94+
return
95+
}
96+
kind, name, namespace, resourceHash, err := getResourceMetadata(required)
97+
if err != nil {
98+
return
99+
}
100+
cacheKey := cachedVersionKey{
101+
name: name,
102+
namespace: namespace,
103+
kind: kind,
104+
}
105+
106+
resourceVersion, err := getResourceVersion(actual)
107+
if err != nil {
108+
klog.V(4).Infof("error reading resourceVersion %s:%s:%s %s", name, kind, namespace, err)
109+
return
110+
}
111+
112+
c.cache.Store(cacheKey, cachedResource{resourceHash, resourceVersion})
113+
klog.V(7).Infof("updated resourceVersion of %s:%s:%s %s", name, kind, namespace, resourceVersion)
114+
}
115+
116+
// in the circumstance that an ApplyFoo's 'required' is the same one which was previously
117+
// applied for a given (name, kind, namespace) and the existing resource (if any),
118+
// hasn't been modified since the ApplyFoo last updated that resource, then return true (we don't
119+
// need to reapply the resource). Otherwise return false.
120+
func (c *resourceCache) SafeToSkipApply(required runtime.Object, existing runtime.Object) bool {
121+
if c == nil {
122+
return false
123+
}
124+
if required == nil || existing == nil {
125+
return false
126+
}
127+
kind, name, namespace, resourceHash, err := getResourceMetadata(required)
128+
if err != nil {
129+
return false
130+
}
131+
cacheKey := cachedVersionKey{
132+
name: name,
133+
namespace: namespace,
134+
kind: kind,
135+
}
136+
137+
resourceVersion, err := getResourceVersion(existing)
138+
if err != nil {
139+
return false
140+
}
141+
142+
var versionMatch, hashMatch bool
143+
144+
if value, ok := c.cache.Load(cacheKey); ok {
145+
if cached, ok := value.(cachedResource); ok {
146+
versionMatch = cached.resourceVersion == resourceVersion
147+
hashMatch = cached.resourceHash == resourceHash
148+
if versionMatch && hashMatch {
149+
klog.V(4).Infof("found matching resourceVersion & manifest hash")
150+
return true
151+
}
152+
}
153+
}
154+
155+
return false
156+
}
157+
158+
// TODO find way to create a registry of these based on struct mapping or some such that forces users to get this right
159+
//
160+
// for creating an ApplyGeneric
161+
// Perhaps a struct containing the apply function and the getKind
162+
func getCoreGroupKind(obj runtime.Object) *schema.GroupKind {
163+
switch obj.(type) {
164+
case *corev1.Namespace:
165+
return &schema.GroupKind{
166+
Kind: "Namespace",
167+
}
168+
case *corev1.Service:
169+
return &schema.GroupKind{
170+
Kind: "Service",
171+
}
172+
case *corev1.Pod:
173+
return &schema.GroupKind{
174+
Kind: "Pod",
175+
}
176+
case *corev1.ServiceAccount:
177+
return &schema.GroupKind{
178+
Kind: "ServiceAccount",
179+
}
180+
case *corev1.ConfigMap:
181+
return &schema.GroupKind{
182+
Kind: "ConfigMap",
183+
}
184+
case *corev1.Secret:
185+
return &schema.GroupKind{
186+
Kind: "Secret",
187+
}
188+
default:
189+
return nil
190+
}
191+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package apply
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
8+
"k8s.io/apimachinery/pkg/runtime/schema"
9+
)
10+
11+
func TestCache(t *testing.T) {
12+
cache := NewResourceCache()
13+
if cache == nil {
14+
t.Fatal("expected non-nil resource cache")
15+
}
16+
17+
obj := &unstructured.Unstructured{}
18+
obj.SetGroupVersionKind(schema.GroupVersionKind{
19+
Group: "apps",
20+
Version: "v1",
21+
Kind: "Deployment",
22+
})
23+
obj.SetResourceVersion("12345")
24+
25+
obj.SetName("test")
26+
obj.SetNamespace("default")
27+
28+
// Test UpdateCachedResourceMetadata
29+
cache.UpdateCachedResourceMetadata(obj, obj)
30+
31+
// Test SafeToSkipApply
32+
if !cache.SafeToSkipApply(obj, obj) {
33+
t.Fatal("expected SafeToSkipApply to return true for identical objects")
34+
}
35+
36+
// Test SafeToSkipApply with different objects
37+
obj2 := &unstructured.Unstructured{}
38+
obj2.SetGroupVersionKind(schema.GroupVersionKind{
39+
Group: "apps",
40+
Version: "v1",
41+
Kind: "Deployment",
42+
})
43+
obj2.SetName("test2")
44+
if cache.SafeToSkipApply(obj, obj2) {
45+
t.Fatal("expected SafeToSkipApply to return false for different objects")
46+
}
47+
48+
obj3 := obj.DeepCopy()
49+
obj3.SetResourceVersion("54321")
50+
if cache.SafeToSkipApply(obj, obj3) {
51+
t.Fatal("expected SafeToSkipApply to return false for objects with different resource versions")
52+
}
53+
cache.UpdateCachedResourceMetadata(obj, obj3)
54+
if !cache.SafeToSkipApply(obj, obj3) {
55+
t.Fatal("expected SafeToSkipApply to return true after updating cache with new resource version")
56+
}
57+
}
58+
59+
func TestCurrentReadWriteCache(t *testing.T) {
60+
// cache := resourceapply.NewResourceCache()
61+
cache := NewResourceCache()
62+
if cache == nil {
63+
t.Fatal("expected non-nil resource cache")
64+
}
65+
66+
for i := range 1000 {
67+
go func() {
68+
obj := &unstructured.Unstructured{}
69+
obj.SetGroupVersionKind(schema.GroupVersionKind{
70+
Group: "apps",
71+
Version: "v1",
72+
Kind: "Deployment",
73+
})
74+
75+
obj.SetNamespace("default")
76+
obj.SetName("test")
77+
obj.SetResourceVersion(fmt.Sprintf("12345%d", i))
78+
79+
cache.UpdateCachedResourceMetadata(obj, obj)
80+
cache.SafeToSkipApply(obj, obj)
81+
}()
82+
}
83+
84+
// if the code can run here without panic, it means the cache is thread-safe
85+
t.Log("Cache operations completed without panic")
86+
}

pkg/work/spoke/apply/update_apply.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ func NewUpdateApply(dynamicClient dynamic.Interface, kubeclient kubernetes.Inter
3636
kubeclient: kubeclient,
3737
apiExtensionClient: apiExtensionClient,
3838
// TODO we did not gc resources in cache, which may cause more memory usage. It
39-
// should be refactored using own cache implementation in the future.
40-
staticResourceCache: resourceapply.NewResourceCache(),
39+
// should be refactored in the future.
40+
staticResourceCache: NewResourceCache(),
4141
}
4242
}
4343

pkg/work/spoke/apply/update_apply_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"testing"
66

7-
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
87
corev1 "k8s.io/api/core/v1"
98
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
109
fakeapiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
@@ -325,7 +324,7 @@ func TestApplyUnstructred(t *testing.T) {
325324

326325
c.required.SetOwnerReferences([]metav1.OwnerReference{c.owner})
327326
syncContext := testingcommon.NewFakeSyncContext(t, "test")
328-
cache := resourceapply.NewResourceCache()
327+
cache := NewResourceCache()
329328
cache.UpdateCachedResourceMetadata(c.required, c.existing)
330329
_, _, err := applier.applyUnstructured(
331330
context.TODO(), c.required, c.gvr, syncContext.Recorder(), cache)

0 commit comments

Comments
 (0)