@@ -133,9 +133,11 @@ const (
133
133
ReadCommitted IsolationLevel = 1
134
134
)
135
135
136
- // DefaultClientID is the default value used as ClientID of kafka
137
- // connections.
138
- var DefaultClientID string
136
+ var (
137
+ // DefaultClientID is the default value used as ClientID of kafka
138
+ // connections.
139
+ DefaultClientID string
140
+ )
139
141
140
142
func init () {
141
143
progname := filepath .Base (os .Args [0 ])
@@ -261,12 +263,10 @@ func (c *Conn) Controller() (broker Broker, err error) {
261
263
}
262
264
for _ , brokerMeta := range res .Brokers {
263
265
if brokerMeta .NodeID == res .ControllerID {
264
- broker = Broker {
265
- ID : int (brokerMeta .NodeID ),
266
+ broker = Broker {ID : int (brokerMeta .NodeID ),
266
267
Port : int (brokerMeta .Port ),
267
268
Host : brokerMeta .Host ,
268
- Rack : brokerMeta .Rack ,
269
- }
269
+ Rack : brokerMeta .Rack }
270
270
break
271
271
}
272
272
}
@@ -322,6 +322,7 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
322
322
err := c .readOperation (
323
323
func (deadline time.Time , id int32 ) error {
324
324
return c .writeRequest (findCoordinator , v0 , id , request )
325
+
325
326
},
326
327
func (deadline time.Time , size int ) error {
327
328
return expectZeroSize (func () (remain int , err error ) {
@@ -751,8 +752,9 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
751
752
// ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
752
753
// with the default values in ReadBatchConfig except for minBytes and maxBytes.
753
754
func (c * Conn ) ReadBatchWith (cfg ReadBatchConfig ) * Batch {
755
+
754
756
var adjustedDeadline time.Time
755
- maxFetch : = int (c .fetchMaxBytes )
757
+ var maxFetch = int (c .fetchMaxBytes )
756
758
757
759
if cfg .MinBytes < 0 || cfg .MinBytes > maxFetch {
758
760
return & Batch {err : fmt .Errorf ("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds" , cfg .MinBytes , maxFetch )}
@@ -857,7 +859,11 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
857
859
858
860
var msgs * messageSetReader
859
861
if err == nil {
860
- msgs , err = newMessageSetReader (& c .rbuf , remain )
862
+ if highWaterMark == offset {
863
+ msgs = & messageSetReader {empty : true }
864
+ } else {
865
+ msgs , err = newMessageSetReader (& c .rbuf , remain )
866
+ }
861
867
}
862
868
if err == errShortRead {
863
869
err = checkTimeoutErr (adjustedDeadline )
@@ -953,6 +959,7 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
953
959
// connection. If there are none, the method fetches all partitions of the kafka
954
960
// cluster.
955
961
func (c * Conn ) ReadPartitions (topics ... string ) (partitions []Partition , err error ) {
962
+
956
963
if len (topics ) == 0 {
957
964
if len (c .topic ) != 0 {
958
965
defaultTopics := [... ]string {c .topic }
@@ -1181,6 +1188,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
1181
1188
}
1182
1189
return size , err
1183
1190
}
1191
+
1184
1192
})
1185
1193
if err != nil {
1186
1194
return size , err
@@ -1548,7 +1556,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
1548
1556
return nil , err
1549
1557
}
1550
1558
if version == v1 {
1551
- request : = saslAuthenticateRequestV0 {Data : data }
1559
+ var request = saslAuthenticateRequestV0 {Data : data }
1552
1560
var response saslAuthenticateResponseV0
1553
1561
1554
1562
err := c .writeOperation (
0 commit comments