Skip to content

fix(core): add retry to alter and run function #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion alter_v25.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func (d *Dgraph) SetSchema(ctx context.Context, nsName string, schema string) er

func (d *Dgraph) doAlter(ctx context.Context, req *apiv25.AlterRequest) error {
_, err := doWithRetryLogin(ctx, d, func(dc apiv25.DgraphClient) (*apiv25.AlterResponse, error) {
return dc.Alter(d.getContext(ctx), req)
return RetryWithExponentialBackoff(func() (*apiv25.AlterResponse, error) {
return dc.Alter(d.getContext(ctx), req)
})
})
return err
}
8 changes: 6 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*

Check failure on line 1 in client.go

View check run for this annotation

Trunk.io / Trunk Check

gofmt

Incorrect formatting, autoformat by running 'trunk fmt'
* SPDX-FileCopyrightText: © Hypermode Inc. <[email protected]>
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down Expand Up @@ -180,12 +180,16 @@
// Deprecated: use DropAllNamespaces, DropAll, DropData, DropPredicate, DropType, SetSchema instead.
func (d *Dgraph) Alter(ctx context.Context, op *api.Operation) error {
dc := d.anyClient()
_, err := dc.Alter(d.getContext(ctx), op)
_, err := RetryWithExponentialBackoff(func() (*api.Payload, error) {
return dc.Alter(d.getContext(ctx), op)
})
if isJwtExpired(err) {
if err := d.retryLogin(ctx); err != nil {
return err
}
_, err = dc.Alter(d.getContext(ctx), op)
_, err = RetryWithExponentialBackoff(func() (*api.Payload, error) {
return dc.Alter(d.getContext(ctx), op)
})
}
return err
}
Expand Down
4 changes: 3 additions & 1 deletion ns_v25.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func (d *Dgraph) RunDQLWithVars(ctx context.Context, nsName string, q string,
req := &apiv25.RunDQLRequest{NsName: nsName, DqlQuery: q, Vars: vars,
ReadOnly: topts.readOnly, BestEffort: topts.bestEffort}
return doWithRetryLogin(ctx, d, func(dc apiv25.DgraphClient) (*apiv25.RunDQLResponse, error) {
return dc.RunDQL(d.getContext(ctx), req)
return RetryWithExponentialBackoff(func() (*apiv25.RunDQLResponse, error) {
return dc.RunDQL(d.getContext(ctx), req)
})
})
}

Expand Down
38 changes: 38 additions & 0 deletions retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package dgo

Check failure on line 1 in retry.go

View check run for this annotation

Trunk.io / Trunk Check

gofmt

Incorrect formatting, autoformat by running 'trunk fmt'

import (
"strings"
"time"
)

// IsRetryable determines if an error is retryable. Replace this with your own logic.
func IsRetryable(err error) bool {
return strings.Contains(err.Error(), "504 (Gateway Timeout)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Golang errors.Is and and errors.As are the preferred way to compare errors. Comparing error strings is very error prone. When using GRPC we can use its helper functions, so to achieve the same result we can do the following:

import (
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

func IsRetryable(err error) bool {
	return status.Code(err) == codes.Unavailable
}

}

const (
MaxAttempts = 5
InitialBackoff = 100 * time.Millisecond
)

// RetryWithExponentialBackoff retries the given operation with exponential backoff.
// Uses package-level defaults for isRetryable, maxAttempts, and initialBackoff.
// The operation should return a value of type T and an error.
func RetryWithExponentialBackoff[T any](op func() (T, error)) (T, error) {
var zero T
backoff := InitialBackoff
for attempt := 1; attempt <= MaxAttempts; attempt++ {
result, err := op()
if err == nil {
return result, nil
}
if !IsRetryable(err) {
return zero, err
}
if attempt < MaxAttempts {
time.Sleep(backoff)
backoff *= 2
}
}
return op() // last attempt
}
8 changes: 6 additions & 2 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ func (txn *Txn) Do(ctx context.Context, req *api.Request) (*api.Response, error)
}

var responseHeaders metadata.MD
resp, err := txn.dc.Query(ctx, req, grpc.Header(&responseHeaders))
resp, err := RetryWithExponentialBackoff(func() (*api.Response, error) {
return txn.dc.Query(ctx, req, grpc.Header(&responseHeaders))
})
appendHdr(&responseHeaders, resp)

if isJwtExpired(err) {
Expand All @@ -187,7 +189,9 @@ func (txn *Txn) Do(ctx context.Context, req *api.Request) (*api.Response, error)

ctx = txn.dg.getContext(ctx)
var responseHeaders metadata.MD
resp, err = txn.dc.Query(ctx, req, grpc.Header(&responseHeaders))
resp, err = RetryWithExponentialBackoff(func() (*api.Response, error) {
return txn.dc.Query(ctx, req, grpc.Header(&responseHeaders))
})
appendHdr(&responseHeaders, resp)
}

Expand Down
Loading