@@ -77,7 +77,7 @@ type Reader struct {
77
77
cancel context.CancelFunc
78
78
stop context.CancelFunc
79
79
done chan struct {}
80
- commits chan [] Message
80
+ commits chan commitRequest
81
81
version int64 // version holds the generation of the spawned readers
82
82
offset int64
83
83
lag int64
@@ -96,10 +96,12 @@ type Reader struct {
96
96
stats readerStats
97
97
}
98
98
99
- // useConsumerGroup indicates the Reader is part of a consumer group
100
- func (r * Reader ) useConsumerGroup () bool {
101
- return r .config .GroupID != ""
102
- }
99
+ // useConsumerGroup indicates whether the Reader is part of a consumer group.
100
+ func (r * Reader ) useConsumerGroup () bool { return r .config .GroupID != "" }
101
+
102
+ // useSyncCommits indicates whether the Reader is configured to perform sync or
103
+ // async commits.
104
+ func (r * Reader ) useSyncCommits () bool { return r .config .CommitInterval == 0 }
103
105
104
106
// membership returns the group generationID and memberID of the reader.
105
107
//
@@ -676,20 +678,16 @@ func (r *Reader) commitOffsetsWithRetry(conn offsetCommitter, offsetStash offset
676
678
type offsetStash map [string ]map [int ]int64
677
679
678
680
// merge updates the offsetStash with the offsets from the provided messages
679
- func (o offsetStash ) merge (msgs ... Message ) {
680
- if o == nil {
681
- return
682
- }
683
-
684
- for _ , m := range msgs {
685
- offsetsByPartition , ok := o [m .Topic ]
681
+ func (o offsetStash ) merge (commits []commit ) {
682
+ for _ , c := range commits {
683
+ offsetsByPartition , ok := o [c .topic ]
686
684
if ! ok {
687
685
offsetsByPartition = map [int ]int64 {}
688
- o [m . Topic ] = offsetsByPartition
686
+ o [c . topic ] = offsetsByPartition
689
687
}
690
688
691
- if offset , ok := offsetsByPartition [m . Partition ]; ! ok || m . Offset > offset {
692
- offsetsByPartition [m . Partition ] = m . Offset
689
+ if offset , ok := offsetsByPartition [c . partition ]; ! ok || c . offset > offset {
690
+ offsetsByPartition [c . partition ] = c . offset
693
691
}
694
692
}
695
693
}
@@ -701,35 +699,19 @@ func (o offsetStash) reset() {
701
699
}
702
700
}
703
701
704
- // isEmpty returns true if the offsetStash contains no entries
705
- func (o offsetStash ) isEmpty () bool {
706
- return len (o ) == 0
707
- }
708
-
709
702
// commitLoopImmediate handles each commit synchronously
710
703
func (r * Reader ) commitLoopImmediate (conn offsetCommitter , stop <- chan struct {}) {
704
+ offsetsByTopicAndPartition := offsetStash {}
705
+
711
706
for {
712
707
select {
713
708
case <- stop :
714
709
return
715
710
716
- case msgs , ok := <- r .commits :
717
- if ! ok {
718
- r .withErrorLogger (func (l * log.Logger ) {
719
- l .Println ("reader commit channel unexpectedly closed" )
720
- })
721
- return
722
- }
723
-
724
- offsetsByTopicAndPartition := offsetStash {}
725
- offsetsByTopicAndPartition .merge (msgs ... )
726
-
727
- if err := r .commitOffsetsWithRetry (conn , offsetsByTopicAndPartition , defaultCommitRetries ); err != nil {
728
- r .withErrorLogger (func (l * log.Logger ) {
729
- l .Printf ("unable to commit offset: %v" , err )
730
- })
731
- return
732
- }
711
+ case req := <- r .commits :
712
+ offsetsByTopicAndPartition .merge (req .commits )
713
+ req .errch <- r .commitOffsetsWithRetry (conn , offsetsByTopicAndPartition , defaultCommitRetries )
714
+ offsetsByTopicAndPartition .reset ()
733
715
}
734
716
}
735
717
}
@@ -740,40 +722,25 @@ func (r *Reader) commitLoopInterval(conn offsetCommitter, stop <-chan struct{})
740
722
ticker := time .NewTicker (r .config .HeartbeatInterval )
741
723
defer ticker .Stop ()
742
724
743
- defer func () {
744
- // commits any outstanding offsets on close
745
- if err := r .commitOffsetsWithRetry (conn , r .offsetStash , defaultCommitRetries ); err == nil {
725
+ commit := func () {
726
+ if err := r .commitOffsetsWithRetry (conn , r .offsetStash , defaultCommitRetries ); err != nil {
727
+ r .withErrorLogger (func (l * log.Logger ) { l .Print (err ) })
728
+ } else {
746
729
r .offsetStash .reset ()
747
730
}
748
- }()
731
+ }
749
732
750
733
for {
751
734
select {
752
735
case <- stop :
736
+ commit ()
753
737
return
754
738
755
739
case <- ticker .C :
756
- if len (r .offsetStash ) == 0 {
757
- continue
758
- }
759
-
760
- if err := r .commitOffsetsWithRetry (conn , r .offsetStash , defaultCommitRetries ); err != nil {
761
- r .withErrorLogger (func (l * log.Logger ) {
762
- l .Printf ("unable to commit offset: %v" , err )
763
- })
764
- return
765
- }
766
- r .offsetStash .reset ()
767
-
768
- case msgs , ok := <- r .commits :
769
- if ! ok {
770
- r .withErrorLogger (func (l * log.Logger ) {
771
- l .Println ("reader commit channel unexpectedly closed" )
772
- })
773
- return
774
- }
740
+ commit ()
775
741
776
- r .offsetStash .merge (msgs ... )
742
+ case req := <- r .commits :
743
+ r .offsetStash .merge (req .commits )
777
744
}
778
745
}
779
746
}
@@ -1106,7 +1073,7 @@ func NewReader(config ReaderConfig) *Reader {
1106
1073
msgs : make (chan readerMessage , config .QueueCapacity ),
1107
1074
cancel : func () {},
1108
1075
done : make (chan struct {}),
1109
- commits : make (chan [] Message ),
1076
+ commits : make (chan commitRequest ),
1110
1077
stop : stop ,
1111
1078
offset : firstOffset ,
1112
1079
stctx : stctx ,
@@ -1247,6 +1214,47 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
1247
1214
}
1248
1215
}
1249
1216
1217
+ // CommitMessages commits the list of messages passed as argument. The program
1218
+ // may pass a context to asynchronously cancel the commit operation when it was
1219
+ // configured to be blocking.
1220
+ func (r * Reader ) CommitMessages (ctx context.Context , msgs ... Message ) error {
1221
+ if ! r .useConsumerGroup () {
1222
+ return errNotAvailable
1223
+ }
1224
+
1225
+ var errch <- chan error
1226
+ var sync = r .useSyncCommits ()
1227
+ var creq = commitRequest {
1228
+ commits : makeCommits (msgs ... ),
1229
+ }
1230
+
1231
+ if sync {
1232
+ ch := make (chan error , 1 )
1233
+ errch , creq .errch = ch , ch
1234
+ }
1235
+
1236
+ select {
1237
+ case r .commits <- creq :
1238
+ case <- ctx .Done ():
1239
+ return ctx .Err ()
1240
+ case <- r .stctx .Done ():
1241
+ // This context is used to ensure we don't allow commits after the
1242
+ // reader was closed.
1243
+ return io .ErrClosedPipe
1244
+ }
1245
+
1246
+ if ! sync {
1247
+ return nil
1248
+ }
1249
+
1250
+ select {
1251
+ case <- ctx .Done ():
1252
+ return ctx .Err ()
1253
+ case err := <- errch :
1254
+ return err
1255
+ }
1256
+ }
1257
+
1250
1258
// ReadLag returns the current lag of the reader by fetching the last offset of
1251
1259
// the topic and partition and computing the difference between that value and
1252
1260
// the offset of the last message returned by ReadMessage.
@@ -1501,25 +1509,6 @@ func (r *Reader) start(offsetsByPartition map[int]int64) {
1501
1509
}
1502
1510
}
1503
1511
1504
- func (r * Reader ) CommitMessages (ctx context.Context , msgs ... Message ) error {
1505
- if len (msgs ) == 0 {
1506
- return nil
1507
- }
1508
-
1509
- if ! r .useConsumerGroup () {
1510
- return errNotAvailable
1511
- }
1512
-
1513
- select {
1514
- case <- ctx .Done ():
1515
- return ctx .Err ()
1516
- case <- r .stctx .Done ():
1517
- return r .stctx .Err ()
1518
- case r .commits <- msgs :
1519
- return nil
1520
- }
1521
- }
1522
-
1523
1512
// A reader reads messages from kafka and produces them on its channels, it's
1524
1513
// used as an way to asynchronously fetch messages while the main program reads
1525
1514
// them using the high level reader API.
0 commit comments