Skip to content

Commit 43506b7

Browse files
authored
avoid cache stampede if key is not found. (confluentinc#815)
When the key is not in cache and multiple goroutines call the Serialize method at once, many of them could start calls to SR to put in cache the same schema, this way only the first one makes the call and the rest read the cached result.
1 parent 3e65bd5 commit 43506b7

File tree

1 file changed

+61
-34
lines changed

1 file changed

+61
-34
lines changed

schemaregistry/schemaregistry_client.go

Lines changed: 61 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -265,14 +265,21 @@ func (c *client) Register(subject string, schema SchemaInfo, normalize bool) (id
265265
metadata := SchemaMetadata{
266266
SchemaInfo: schema,
267267
}
268-
err = c.restService.handleRequest(newRequest("POST", versionNormalize, &metadata, url.PathEscape(subject), normalize), &metadata)
269-
if err != nil {
270-
return -1, err
271-
}
272268
c.schemaCacheLock.Lock()
273-
c.schemaCache.Put(cacheKey, metadata.ID)
269+
// another goroutine could have already put it in cache
270+
idValue, ok = c.schemaCache.Get(cacheKey)
271+
if !ok {
272+
err = c.restService.handleRequest(newRequest("POST", versionNormalize, &metadata, url.PathEscape(subject), normalize), &metadata)
273+
if err == nil {
274+
c.schemaCache.Put(cacheKey, metadata.ID)
275+
} else {
276+
metadata.ID = -1
277+
}
278+
} else {
279+
metadata.ID = idValue.(int)
280+
}
274281
c.schemaCacheLock.Unlock()
275-
return metadata.ID, nil
282+
return metadata.ID, err
276283
}
277284

278285
// GetBySubjectAndID returns the schema identified by id
@@ -290,23 +297,29 @@ func (c *client) GetBySubjectAndID(subject string, id int) (schema SchemaInfo, e
290297
}
291298

292299
metadata := SchemaMetadata{}
293-
if len(subject) > 0 {
294-
err = c.restService.handleRequest(newRequest("GET", schemasBySubject, nil, id, url.QueryEscape(subject)), &metadata)
300+
newInfo := &SchemaInfo{}
301+
c.idCacheLock.Lock()
302+
// another goroutine could have already put it in cache
303+
infoValue, ok = c.idCache.Get(cacheKey)
304+
if !ok {
305+
if len(subject) > 0 {
306+
err = c.restService.handleRequest(newRequest("GET", schemasBySubject, nil, id, url.QueryEscape(subject)), &metadata)
307+
} else {
308+
err = c.restService.handleRequest(newRequest("GET", schemas, nil, id), &metadata)
309+
}
310+
if err == nil {
311+
newInfo = &SchemaInfo{
312+
Schema: metadata.Schema,
313+
SchemaType: metadata.SchemaType,
314+
References: metadata.References,
315+
}
316+
c.idCache.Put(cacheKey, newInfo)
317+
}
295318
} else {
296-
err = c.restService.handleRequest(newRequest("GET", schemas, nil, id), &metadata)
297-
}
298-
if err != nil {
299-
return SchemaInfo{}, err
319+
newInfo = infoValue.(*SchemaInfo)
300320
}
301-
newInfo := &SchemaInfo{
302-
Schema: metadata.Schema,
303-
SchemaType: metadata.SchemaType,
304-
References: metadata.References,
305-
}
306-
c.idCacheLock.Lock()
307-
c.idCache.Put(cacheKey, newInfo)
308321
c.idCacheLock.Unlock()
309-
return *newInfo, nil
322+
return *newInfo, err
310323
}
311324

312325
// GetID checks if a schema has been registered with the subject. Returns ID if the registration can be found
@@ -325,18 +338,25 @@ func (c *client) GetID(subject string, schema SchemaInfo, normalize bool) (id in
325338
if ok {
326339
return idValue.(int), nil
327340
}
341+
328342
metadata := SchemaMetadata{
329343
SchemaInfo: schema,
330344
}
331-
332-
err = c.restService.handleRequest(newRequest("POST", subjectsNormalize, &metadata, url.PathEscape(subject), normalize), &metadata)
333-
if err != nil {
334-
return -1, err
335-
}
336345
c.schemaCacheLock.Lock()
337-
c.schemaCache.Put(cacheKey, metadata.ID)
346+
// another goroutine could have already put it in cache
347+
idValue, ok = c.schemaCache.Get(cacheKey)
348+
if !ok {
349+
err = c.restService.handleRequest(newRequest("POST", subjectsNormalize, &metadata, url.PathEscape(subject), normalize), &metadata)
350+
if err == nil {
351+
c.schemaCache.Put(cacheKey, metadata.ID)
352+
} else {
353+
metadata.ID = -1
354+
}
355+
} else {
356+
metadata.ID = idValue.(int)
357+
}
338358
c.schemaCacheLock.Unlock()
339-
return metadata.ID, nil
359+
return metadata.ID, err
340360
}
341361

342362
// GetLatestSchemaMetadata fetches latest version registered with the provided subject
@@ -381,18 +401,25 @@ func (c *client) GetVersion(subject string, schema SchemaInfo, normalize bool) (
381401
if ok {
382402
return versionValue.(int), nil
383403
}
404+
384405
metadata := SchemaMetadata{
385406
SchemaInfo: schema,
386407
}
387-
388-
err = c.restService.handleRequest(newRequest("POST", subjectsNormalize, &metadata, url.PathEscape(subject), normalize), &metadata)
389-
if err != nil {
390-
return -1, err
391-
}
392408
c.versionCacheLock.Lock()
393-
c.versionCache.Put(cacheKey, metadata.Version)
409+
// another goroutine could have already put it in cache
410+
versionValue, ok = c.versionCache.Get(cacheKey)
411+
if !ok {
412+
err = c.restService.handleRequest(newRequest("POST", subjectsNormalize, &metadata, url.PathEscape(subject), normalize), &metadata)
413+
if err == nil {
414+
c.versionCache.Put(cacheKey, metadata.Version)
415+
} else {
416+
metadata.Version = -1
417+
}
418+
} else {
419+
metadata.Version = versionValue.(int)
420+
}
394421
c.versionCacheLock.Unlock()
395-
return metadata.Version, nil
422+
return metadata.Version, err
396423
}
397424

398425
// Fetch all Subjects registered with the schema Registry

0 commit comments

Comments
 (0)