Skip to content

Commit 1072695

Browse files
committed
Add fallback for missing TWCC header extension
1 parent b8e4acd commit 1072695

File tree

2 files changed

+47
-3
lines changed

2 files changed

+47
-3
lines changed

pkg/ccfb/interceptor.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/pion/interceptor"
9+
"github.com/pion/logging"
910
"github.com/pion/rtcp"
1011
"github.com/pion/rtp"
1112
)
@@ -88,6 +89,7 @@ func (f *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor,
8889
i := &Interceptor{
8990
NoOp: interceptor.NoOp{},
9091
lock: sync.Mutex{},
92+
log: logging.NewDefaultLoggerFactory().NewLogger("ccfb_interceptor"),
9193
timestamp: time.Now,
9294
convertCCFB: convertCCFB,
9395
convertTWCC: convertTWCC,
@@ -117,6 +119,7 @@ func (f *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor,
117119
type Interceptor struct {
118120
interceptor.NoOp
119121
lock sync.Mutex
122+
log logging.LeveledLogger
120123
timestamp func() time.Time
121124
convertCCFB func(ts time.Time, feedback *rtcp.CCFeedbackReport) (time.Time, map[uint32][]acknowledgement)
122125
convertTWCC func(feedback *rtcp.TransportLayerCC) (time.Time, map[uint32][]acknowledgement)
@@ -157,12 +160,16 @@ func (i *Interceptor) BindLocalStream(info *interceptor.StreamInfo, writer inter
157160
ssrc := header.SSRC
158161
seqNr := header.SequenceNumber
159162
if useTWCC {
160-
ssrc = 0
161163
var twccHdrExt rtp.TransportCCExtension
162164
if err := twccHdrExt.Unmarshal(header.GetExtension(twccHdrExtID)); err != nil {
163-
return 0, err
165+
i.log.Warnf("CCFB configured for TWCC, but failed to get TWCC header extension from outgoing packet. Falling back to saving history for CCFB feedback reports. err: %v", err)
166+
if _, ok := i.ssrcToHistory[ssrc]; !ok {
167+
i.ssrcToHistory[ssrc] = i.historyFactory(i.historySize)
168+
}
169+
} else {
170+
seqNr = twccHdrExt.TransportSequence
171+
ssrc = 0
164172
}
165-
seqNr = twccHdrExt.TransportSequence
166173
}
167174
if err := i.ssrcToHistory[ssrc].add(seqNr, header.MarshalSize()+len(payload), i.timestamp()); err != nil {
168175
return 0, err

pkg/ccfb/interceptor_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,43 @@ func TestInterceptor(t *testing.T) {
137137
}
138138
})
139139

140+
t.Run("missingTWCCHeaderExtension", func(t *testing.T) {
141+
mt := func() time.Time {
142+
return mockTimestamp
143+
}
144+
mh := &mockHistory{
145+
added: []mockHistoryAddEntry{},
146+
}
147+
f, err := NewInterceptor(
148+
historyFactory(func(_ int) history {
149+
return mh
150+
}),
151+
timeFactory(mt),
152+
)
153+
assert.NoError(t, err)
154+
155+
i, err := f.NewInterceptor("")
156+
assert.NoError(t, err)
157+
158+
info := &interceptor.StreamInfo{}
159+
info.RTPHeaderExtensions = append(info.RTPHeaderExtensions, interceptor.RTPHeaderExtension{
160+
URI: transportCCURI,
161+
ID: 2,
162+
})
163+
stream := test.NewMockStream(info, i)
164+
165+
err = stream.WriteRTP(&rtp.Packet{
166+
Header: rtp.Header{SequenceNumber: 3},
167+
Payload: []byte{},
168+
})
169+
assert.NoError(t, err)
170+
assert.Equal(t, mh.added, []mockHistoryAddEntry{{
171+
seqNr: 3,
172+
size: 12,
173+
departure: mockTimestamp,
174+
}})
175+
})
176+
140177
t.Run("readRTCP", func(t *testing.T) {
141178
cases := []struct {
142179
mh *mockHistory

0 commit comments

Comments
 (0)