Skip to content

Commit df0521c

Browse files
authored
continue in reader readLoop on success (#774)
if we don't continue here we hit the errcount++ at the end of the loop which forces us to sleep after successful reads.
1 parent 02a2e42 commit df0521c

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

reader.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,6 @@ type ReaderConfig struct {
509509

510510
// Validate method validates ReaderConfig properties.
511511
func (config *ReaderConfig) Validate() error {
512-
513512
if len(config.Brokers) == 0 {
514513
return errors.New("cannot create a new kafka reader with an empty list of broker addresses")
515514
}
@@ -854,7 +853,7 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
854853
}
855854

856855
var errch <-chan error
857-
var creq = commitRequest{
856+
creq := commitRequest{
858857
commits: makeCommits(msgs...),
859858
}
860859

@@ -1283,12 +1282,14 @@ func (r *reader) run(ctx context.Context, offset int64) {
12831282
switch offset, err = r.read(ctx, offset, conn); err {
12841283
case nil:
12851284
errcount = 0
1285+
continue
12861286
case io.EOF:
12871287
// done with this batch of messages...carry on. note that this
12881288
// block relies on the batch repackaging real io.EOF errors as
12891289
// io.UnexpectedEOF. otherwise, we would end up swallowing real
12901290
// errors here.
12911291
errcount = 0
1292+
continue
12921293
case UnknownTopicOrPartition:
12931294
r.withErrorLogger(func(log Logger) {
12941295
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, offset, r.brokers)
@@ -1323,7 +1324,6 @@ func (r *reader) run(ctx context.Context, offset int64) {
13231324

13241325
case OffsetOutOfRange:
13251326
first, last, err := r.readOffsets(conn)
1326-
13271327
if err != nil {
13281328
r.withErrorLogger(func(log Logger) {
13291329
log.Printf("the kafka reader got an error while attempting to determine whether it was reading before the first offset or after the last offset of partition %d of %s: %s", r.partition, r.topic, err)
@@ -1383,7 +1383,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
13831383

13841384
func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) {
13851385
for i := 0; i != len(r.brokers) && conn == nil; i++ {
1386-
var broker = r.brokers[i]
1386+
broker := r.brokers[i]
13871387
var first, last int64
13881388

13891389
t0 := time.Now()
@@ -1532,7 +1532,7 @@ func (r *reader) withErrorLogger(do func(Logger)) {
15321532
// extractTopics returns the unique list of topics represented by the set of
15331533
// provided members
15341534
func extractTopics(members []GroupMember) []string {
1535-
var visited = map[string]struct{}{}
1535+
visited := map[string]struct{}{}
15361536
var topics []string
15371537

15381538
for _, member := range members {

0 commit comments

Comments
 (0)