Skip to content

Commit 294fbdb

Browse files
enable errorlint linter and fix issues (#914)
1 parent fb1504a commit 294fbdb

19 files changed

+153
-129
lines changed

.golangci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
linters:
22
enable:
33
- bodyclose
4+
- errorlint
45
- goconst
56
- godot
67
- gofmt
@@ -10,5 +11,4 @@ linters:
1011
disable:
1112
# Temporarily disabling so it can be addressed in a dedicated PR.
1213
- errcheck
13-
- errorlint
1414
- goerr113

client_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kafka
33
import (
44
"bytes"
55
"context"
6+
"errors"
67
"io"
78
"math/rand"
89
"net"
@@ -262,7 +263,7 @@ func TestClientProduceAndConsume(t *testing.T) {
262263
for {
263264
r, err := res.Records.ReadRecord()
264265
if err != nil {
265-
if err != io.EOF {
266+
if !errors.Is(err, io.EOF) {
266267
t.Fatal(err)
267268
}
268269
break

compress/snappy/go-xerial-snappy/snappy_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package snappy
22

33
import (
44
"bytes"
5+
"errors"
56
"testing"
67
)
78

@@ -92,7 +93,7 @@ func TestSnappyDecodeMalformedTruncatedHeader(t *testing.T) {
9293
for i := 0; i < len(xerialHeader); i++ {
9394
buf := make([]byte, i)
9495
copy(buf, xerialHeader[:i])
95-
if _, err := Decode(buf); err != ErrMalformed {
96+
if _, err := Decode(buf); !errors.Is(err, ErrMalformed) {
9697
t.Errorf("expected ErrMalformed got %v", err)
9798
}
9899
}
@@ -104,7 +105,7 @@ func TestSnappyDecodeMalformedTruncatedSize(t *testing.T) {
104105
for _, size := range sizes {
105106
buf := make([]byte, size)
106107
copy(buf, xerialHeader)
107-
if _, err := Decode(buf); err != ErrMalformed {
108+
if _, err := Decode(buf); !errors.Is(err, ErrMalformed) {
108109
t.Errorf("expected ErrMalformed got %v", err)
109110
}
110111
}
@@ -116,7 +117,7 @@ func TestSnappyDecodeMalformedBNoData(t *testing.T) {
116117
copy(buf, xerialHeader)
117118
// indicate that there's one byte of data to be read
118119
buf[len(buf)-1] = 1
119-
if _, err := Decode(buf); err != ErrMalformed {
120+
if _, err := Decode(buf); !errors.Is(err, ErrMalformed) {
120121
t.Errorf("expected ErrMalformed got %v", err)
121122
}
122123
}
@@ -128,7 +129,7 @@ func TestSnappyMasterDecodeFailed(t *testing.T) {
128129
buf[len(buf)-2] = 1
129130
// A payload which will not decode
130131
buf[len(buf)-1] = 1
131-
if _, err := Decode(buf); err == ErrMalformed || err == nil {
132+
if _, err := Decode(buf); errors.Is(err, ErrMalformed) || err == nil {
132133
t.Errorf("unexpected err: %v", err)
133134
}
134135
}

conn.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,11 +1231,10 @@ func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID
12311231

12321232
func (c *Conn) readResponse(size int, res interface{}) error {
12331233
size, err := read(&c.rbuf, size, res)
1234-
switch err.(type) {
1235-
case Error:
1236-
var e error
1237-
if size, e = discardN(&c.rbuf, size, size); e != nil {
1238-
err = e
1234+
if err != nil {
1235+
var kafkaError Error
1236+
if errors.As(err, &kafkaError) {
1237+
size, err = discardN(&c.rbuf, size, size)
12391238
}
12401239
}
12411240
return expectZeroSize(size, err)
@@ -1294,9 +1293,8 @@ func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func
12941293
}
12951294

12961295
if err = read(deadline, size); err != nil {
1297-
switch err.(type) {
1298-
case Error:
1299-
default:
1296+
var kafkaError Error
1297+
if !errors.As(err, &kafkaError) {
13001298
c.conn.Close()
13011299
}
13021300
}

conn_test.go

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -389,9 +389,11 @@ func testConnWrite(t *testing.T, conn *Conn) {
389389
func testConnCloseAndWrite(t *testing.T, conn *Conn) {
390390
conn.Close()
391391

392-
switch _, err := conn.Write([]byte("Hello World!")); err.(type) {
393-
case *net.OpError:
394-
default:
392+
_, err := conn.Write([]byte("Hello World!"))
393+
394+
// expect a network error
395+
var netOpError *net.OpError
396+
if !errors.As(err, &netOpError) {
395397
t.Error(err)
396398
}
397399
}
@@ -489,7 +491,7 @@ func testConnSeekDontCheck(t *testing.T, conn *Conn) {
489491
t.Error("bad offset:", offset)
490492
}
491493

492-
if _, err := conn.ReadMessage(1024); err != OffsetOutOfRange {
494+
if _, err := conn.ReadMessage(1024); !errors.Is(err, OffsetOutOfRange) {
493495
t.Error("unexpected error:", err)
494496
}
495497
}
@@ -659,13 +661,15 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
659661
_, err := conn.findCoordinator(findCoordinatorRequestV0{
660662
CoordinatorKey: groupID,
661663
})
662-
switch err {
663-
case nil:
664+
if err != nil {
665+
if errors.Is(err, GroupCoordinatorNotAvailable) {
666+
time.Sleep(250 * time.Millisecond)
667+
continue
668+
} else {
669+
t.Fatalf("unable to find coordinator for group: %v", err)
670+
}
671+
} else {
664672
return
665-
case GroupCoordinatorNotAvailable:
666-
time.Sleep(250 * time.Millisecond)
667-
default:
668-
t.Fatalf("unable to find coordinator for group: %v", err)
669673
}
670674
}
671675

@@ -690,15 +694,18 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32,
690694
},
691695
},
692696
})
693-
switch err {
694-
case nil:
697+
if err != nil {
698+
if errors.Is(err, NotCoordinatorForGroup) {
699+
time.Sleep(250 * time.Millisecond)
700+
continue
701+
} else {
702+
t.Fatalf("bad joinGroup: %s", err)
703+
}
704+
} else {
695705
return
696-
case NotCoordinatorForGroup:
697-
time.Sleep(250 * time.Millisecond)
698-
default:
699-
t.Fatalf("bad joinGroup: %s", err)
700706
}
701707
}
708+
702709
return
703710
}
704711

@@ -742,12 +749,11 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) {
742749
}
743750
response, err := conn.findCoordinator(findCoordinatorRequestV0{CoordinatorKey: groupID})
744751
if err != nil {
745-
switch err {
746-
case GroupCoordinatorNotAvailable:
752+
if errors.Is(err, GroupCoordinatorNotAvailable) {
747753
continue
748-
default:
749-
t.Fatalf("bad findCoordinator: %s", err)
750754
}
755+
756+
t.Fatalf("bad findCoordinator: %s", err)
751757
}
752758

753759
if response.Coordinator.NodeID == 0 {

consumergroup.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -523,19 +523,21 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
523523
return
524524
case <-ticker.C:
525525
ops, err := g.conn.readPartitions(topic)
526-
switch err {
527-
case nil, UnknownTopicOrPartition:
526+
switch {
527+
case err == nil, errors.Is(err, UnknownTopicOrPartition):
528528
if len(ops) != oParts {
529529
g.log(func(l Logger) {
530530
l.Printf("Partition changes found, reblancing group: %v.", g.GroupID)
531531
})
532532
return
533533
}
534+
534535
default:
535536
g.logError(func(l Logger) {
536537
l.Printf("Problem getting partitions while checking for changes, %v", err)
537538
})
538-
if _, ok := err.(Error); ok {
539+
var kafkaError Error
540+
if errors.As(err, &kafkaError) {
539541
continue
540542
}
541543
// other errors imply that we lost the connection to the coordinator, so we
@@ -724,20 +726,24 @@ func (cg *ConsumerGroup) run() {
724726
// to the next generation. it will be non-nil in the case of an error
725727
// joining or syncing the group.
726728
var backoff <-chan time.Time
727-
switch err {
728-
case nil:
729+
730+
switch {
731+
case err == nil:
729732
// no error...the previous generation finished normally.
730733
continue
731-
case ErrGroupClosed:
734+
735+
case errors.Is(err, ErrGroupClosed):
732736
// the CG has been closed...leave the group and exit loop.
733737
_ = cg.leaveGroup(memberID)
734738
return
735-
case RebalanceInProgress:
739+
740+
case errors.Is(err, RebalanceInProgress):
736741
// in case of a RebalanceInProgress, don't leave the group or
737742
// change the member ID, but report the error. the next attempt
738743
// to join the group will then be subject to the rebalance
739744
// timeout, so the broker will be responsible for throttling
740745
// this loop.
746+
741747
default:
742748
// leave the group and report the error if we had gotten far
743749
// enough so as to have a member ID. also clear the member id
@@ -984,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
984990
for _, balancer := range cg.config.GroupBalancers {
985991
userData, err := balancer.UserData()
986992
if err != nil {
987-
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %v", balancer.ProtocolName(), err)
993+
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
988994
}
989995
request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
990996
ProtocolName: balancer.ProtocolName(),
@@ -1050,7 +1056,7 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
10501056
metadata := groupMetadata{}
10511057
reader := bufio.NewReader(bytes.NewReader(item.MemberMetadata))
10521058
if remain, err := (&metadata).readFrom(reader, len(item.MemberMetadata)); err != nil || remain != 0 {
1053-
return nil, fmt.Errorf("unable to read metadata for member, %v: %v", item.MemberID, err)
1059+
return nil, fmt.Errorf("unable to read metadata for member, %v: %w", item.MemberID, err)
10541060
}
10551061

10561062
members = append(members, GroupMember{

createtopics.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kafka
33
import (
44
"bufio"
55
"context"
6+
"errors"
67
"fmt"
78
"net"
89
"time"
@@ -384,12 +385,14 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error {
384385
_, err := c.createTopics(createTopicsRequestV0{
385386
Topics: requestV0Topics,
386387
})
388+
if err != nil {
389+
if errors.Is(err, TopicAlreadyExists) {
390+
// ok
391+
return nil
392+
}
387393

388-
switch err {
389-
case TopicAlreadyExists:
390-
// ok
391-
return nil
392-
default:
393394
return err
394395
}
396+
397+
return nil
395398
}

dialer_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/tls"
66
"crypto/x509"
7+
"errors"
78
"fmt"
89
"io"
910
"net"
@@ -294,7 +295,7 @@ func TestDialerConnectTLSHonorsContext(t *testing.T) {
294295
defer cancel()
295296

296297
_, err := d.connectTLS(ctx, conn, d.TLS)
297-
if context.DeadlineExceeded != err {
298+
if !errors.Is(err, context.DeadlineExceeded) {
298299
t.Errorf("expected err to be %v; got %v", context.DeadlineExceeded, err)
299300
t.FailNow()
300301
}

discard_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kafka
33
import (
44
"bufio"
55
"bytes"
6+
"errors"
67
"io"
78
"testing"
89
)
@@ -52,7 +53,7 @@ func TestDiscardN(t *testing.T) {
5253
scenario: "discard more than available",
5354
function: func(t *testing.T, r *bufio.Reader, sz int) {
5455
remain, err := discardN(r, sz, sz+1)
55-
if err != errShortRead {
56+
if !errors.Is(err, errShortRead) {
5657
t.Errorf("Expected errShortRead, got %v", err)
5758
}
5859
if remain != 0 {
@@ -64,7 +65,7 @@ func TestDiscardN(t *testing.T) {
6465
scenario: "discard returns error",
6566
function: func(t *testing.T, r *bufio.Reader, sz int) {
6667
remain, err := discardN(r, sz+2, sz+1)
67-
if err != io.EOF {
68+
if !errors.Is(err, io.EOF) {
6869
t.Errorf("Expected EOF, got %v", err)
6970
}
7071
if remain != 2 {
@@ -76,7 +77,7 @@ func TestDiscardN(t *testing.T) {
7677
scenario: "errShortRead doesn't mask error",
7778
function: func(t *testing.T, r *bufio.Reader, sz int) {
7879
remain, err := discardN(r, sz+1, sz+2)
79-
if err != io.EOF {
80+
if !errors.Is(err, io.EOF) {
8081
t.Errorf("Expected EOF, got %v", err)
8182
}
8283
if remain != 1 {

example_consumergroup_test.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kafka_test
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"os"
78

@@ -42,18 +43,20 @@ func ExampleGeneration_Start_consumerGroupParallelReaders() {
4243
reader.SetOffset(offset)
4344
for {
4445
msg, err := reader.ReadMessage(ctx)
45-
switch err {
46-
case kafka.ErrGenerationEnded:
47-
// generation has ended. commit offsets. in a real app,
48-
// offsets would be committed periodically.
49-
gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset + 1}})
50-
return
51-
case nil:
52-
fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
53-
offset = msg.Offset
54-
default:
46+
if err != nil {
47+
if errors.Is(err, kafka.ErrGenerationEnded) {
48+
// generation has ended. commit offsets. in a real app,
49+
// offsets would be committed periodically.
50+
gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset + 1}})
51+
return
52+
}
53+
5554
fmt.Printf("error reading message: %+v\n", err)
55+
return
5656
}
57+
58+
fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
59+
offset = msg.Offset
5760
}
5861
})
5962
}

message_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bufio"
55
"bytes"
66
"encoding/hex"
7+
"errors"
78
"fmt"
89
"io"
910
"math/rand"
@@ -551,7 +552,7 @@ func TestMessageSetReaderEmpty(t *testing.T) {
551552
if headers != nil {
552553
t.Errorf("expected nil headers, got %v", headers)
553554
}
554-
if err != RequestTimedOut {
555+
if !errors.Is(err, RequestTimedOut) {
555556
t.Errorf("expected RequestTimedOut, got %v", err)
556557
}
557558

0 commit comments

Comments
 (0)