Skip to content

Commit 6cf49f5

Browse files
committed
Add interceptor to aggregate CCFB reports
1 parent c06f448 commit 6cf49f5

11 files changed

+1323
-4
lines changed

internal/test/mock_stream.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type RTPWithError struct {
4141
// RTCPWithError is used to send a batch of rtcp packets or an error on a channel
4242
type RTCPWithError struct {
4343
Packets []rtcp.Packet
44+
Attr interceptor.Attributes
4445
Err error
4546
}
4647

@@ -107,21 +108,21 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc
107108
go func() {
108109
buf := make([]byte, 1500)
109110
for {
110-
i, _, err := s.rtcpReader.Read(buf, interceptor.Attributes{})
111+
i, attr, err := s.rtcpReader.Read(buf, interceptor.Attributes{})
111112
if err != nil {
112113
if !errors.Is(err, io.EOF) {
113-
s.rtcpInModified <- RTCPWithError{Err: err}
114+
s.rtcpInModified <- RTCPWithError{Attr: attr, Err: err}
114115
}
115116
return
116117
}
117118

118119
pkts, err := rtcp.Unmarshal(buf[:i])
119120
if err != nil {
120-
s.rtcpInModified <- RTCPWithError{Err: err}
121+
s.rtcpInModified <- RTCPWithError{Attr: attr, Err: err}
121122
return
122123
}
123124

124-
s.rtcpInModified <- RTCPWithError{Packets: pkts}
125+
s.rtcpInModified <- RTCPWithError{Attr: attr, Packets: pkts}
125126
}
126127
}()
127128
go func() {

pkg/ccfb/ccfb_receiver.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package ccfb
2+
3+
import (
4+
"time"
5+
6+
"github.com/pion/interceptor/internal/ntp"
7+
"github.com/pion/rtcp"
8+
)
9+
10+
type acknowledgement struct {
11+
seqNr uint16
12+
arrived bool
13+
arrival time.Time
14+
ecn rtcp.ECN
15+
}
16+
17+
func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) (time.Time, map[uint32][]acknowledgement) {
18+
if feedback == nil {
19+
return time.Time{}, nil
20+
}
21+
result := map[uint32][]acknowledgement{}
22+
referenceTime := ntp.ToTime32(feedback.ReportTimestamp, ts)
23+
for _, rb := range feedback.ReportBlocks {
24+
result[rb.MediaSSRC] = convertMetricBlock(referenceTime, rb.BeginSequence, rb.MetricBlocks)
25+
}
26+
return referenceTime, result
27+
}
28+
29+
func convertMetricBlock(reference time.Time, seqNrOffset uint16, blocks []rtcp.CCFeedbackMetricBlock) []acknowledgement {
30+
reports := make([]acknowledgement, len(blocks))
31+
for i, mb := range blocks {
32+
if mb.Received {
33+
arrival := time.Time{}
34+
35+
/// RFC 8888 states: If the measurement is unavailable or if the
36+
//arrival time of the RTP packet is after the time represented by
37+
//the RTS field, then an ATO value of 0x1FFF MUST be reported for
38+
//the packet. In that case, we set a zero time.Time value.
39+
if mb.ArrivalTimeOffset != 0x1FFF {
40+
delta := time.Duration((float64(mb.ArrivalTimeOffset) / 1024.0) * float64(time.Second))
41+
arrival = reference.Add(-delta)
42+
}
43+
reports[i] = acknowledgement{
44+
seqNr: seqNrOffset + uint16(i),
45+
arrived: true,
46+
arrival: arrival,
47+
ecn: mb.ECN,
48+
}
49+
} else {
50+
reports[i] = acknowledgement{
51+
seqNr: seqNrOffset + uint16(i),
52+
arrived: false,
53+
arrival: time.Time{},
54+
ecn: 0,
55+
}
56+
}
57+
}
58+
return reports
59+
}

pkg/ccfb/ccfb_receiver_test.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package ccfb
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
"github.com/pion/interceptor/internal/ntp"
9+
"github.com/pion/rtcp"
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func TestConvertCCFB(t *testing.T) {
14+
timeZero := time.Now()
15+
cases := []struct {
16+
ts time.Time
17+
feedback *rtcp.CCFeedbackReport
18+
expect map[uint32][]acknowledgement
19+
expectTS time.Time
20+
}{
21+
{},
22+
{
23+
ts: timeZero.Add(2 * time.Second),
24+
feedback: &rtcp.CCFeedbackReport{
25+
SenderSSRC: 1,
26+
ReportBlocks: []rtcp.CCFeedbackReportBlock{
27+
{
28+
MediaSSRC: 2,
29+
BeginSequence: 17,
30+
MetricBlocks: []rtcp.CCFeedbackMetricBlock{
31+
{
32+
Received: true,
33+
ECN: 0,
34+
ArrivalTimeOffset: 512,
35+
},
36+
},
37+
},
38+
},
39+
ReportTimestamp: ntp.ToNTP32(timeZero.Add(time.Second)),
40+
},
41+
expect: map[uint32][]acknowledgement{
42+
2: []acknowledgement{
43+
{
44+
seqNr: 17,
45+
arrived: true,
46+
arrival: timeZero.Add(500 * time.Millisecond),
47+
ecn: 0,
48+
},
49+
},
50+
},
51+
expectTS: timeZero.Add(time.Second),
52+
},
53+
}
54+
for i, tc := range cases {
55+
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
56+
resTS, res := convertCCFB(tc.ts, tc.feedback)
57+
58+
assert.InDelta(t, tc.expectTS.UnixNano(), resTS.UnixNano(), float64(time.Millisecond.Nanoseconds()))
59+
60+
// Can't directly check equality since arrival timestamp conversions
61+
// may be slightly off due to ntp conversions.
62+
assert.Equal(t, len(tc.expect), len(res))
63+
for i, acks := range tc.expect {
64+
for j, ack := range acks {
65+
assert.Equal(t, ack.seqNr, res[i][j].seqNr)
66+
assert.Equal(t, ack.arrived, res[i][j].arrived)
67+
assert.Equal(t, ack.ecn, res[i][j].ecn)
68+
assert.InDelta(t, ack.arrival.UnixNano(), res[i][j].arrival.UnixNano(), float64(time.Millisecond.Nanoseconds()))
69+
}
70+
}
71+
})
72+
}
73+
}
74+
75+
func TestConvertMetricBlock(t *testing.T) {
76+
cases := []struct {
77+
ts time.Time
78+
reference time.Time
79+
seqNrOffset uint16
80+
blocks []rtcp.CCFeedbackMetricBlock
81+
expected []acknowledgement
82+
}{
83+
{
84+
ts: time.Time{},
85+
reference: time.Time{},
86+
seqNrOffset: 0,
87+
blocks: []rtcp.CCFeedbackMetricBlock{},
88+
expected: []acknowledgement{},
89+
},
90+
{
91+
ts: time.Time{}.Add(2 * time.Second),
92+
reference: time.Time{}.Add(time.Second),
93+
seqNrOffset: 3,
94+
blocks: []rtcp.CCFeedbackMetricBlock{
95+
{
96+
Received: true,
97+
ECN: 0,
98+
ArrivalTimeOffset: 512,
99+
},
100+
{
101+
Received: false,
102+
ECN: 0,
103+
ArrivalTimeOffset: 0,
104+
},
105+
{
106+
Received: true,
107+
ECN: 0,
108+
ArrivalTimeOffset: 0,
109+
},
110+
},
111+
expected: []acknowledgement{
112+
{
113+
seqNr: 3,
114+
arrived: true,
115+
arrival: time.Time{}.Add(500 * time.Millisecond),
116+
ecn: 0,
117+
},
118+
{
119+
seqNr: 4,
120+
arrived: false,
121+
arrival: time.Time{},
122+
ecn: 0,
123+
},
124+
{
125+
seqNr: 5,
126+
arrived: true,
127+
arrival: time.Time{}.Add(time.Second),
128+
ecn: 0,
129+
},
130+
},
131+
},
132+
{
133+
ts: time.Time{}.Add(2 * time.Second),
134+
reference: time.Time{}.Add(time.Second),
135+
seqNrOffset: 3,
136+
blocks: []rtcp.CCFeedbackMetricBlock{
137+
{
138+
Received: true,
139+
ECN: 0,
140+
ArrivalTimeOffset: 512,
141+
},
142+
{
143+
Received: false,
144+
ECN: 0,
145+
ArrivalTimeOffset: 0,
146+
},
147+
{
148+
Received: true,
149+
ECN: 0,
150+
ArrivalTimeOffset: 0,
151+
},
152+
{
153+
Received: true,
154+
ECN: 0,
155+
ArrivalTimeOffset: 0x1FFF,
156+
},
157+
},
158+
expected: []acknowledgement{
159+
{
160+
seqNr: 3,
161+
arrived: true,
162+
arrival: time.Time{}.Add(500 * time.Millisecond),
163+
ecn: 0,
164+
},
165+
{
166+
seqNr: 4,
167+
arrived: false,
168+
arrival: time.Time{},
169+
ecn: 0,
170+
},
171+
{
172+
seqNr: 5,
173+
arrived: true,
174+
arrival: time.Time{}.Add(time.Second),
175+
ecn: 0,
176+
},
177+
{
178+
seqNr: 6,
179+
arrived: true,
180+
arrival: time.Time{},
181+
ecn: 0,
182+
},
183+
},
184+
},
185+
}
186+
187+
for i, tc := range cases {
188+
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
189+
res := convertMetricBlock(tc.reference, tc.seqNrOffset, tc.blocks)
190+
assert.Equal(t, tc.expected, res)
191+
})
192+
}
193+
}

pkg/ccfb/duplicate_ack_filter.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package ccfb
2+
3+
type DuplicateAckFilter struct {
4+
highestAckedBySSRC map[uint32]int64
5+
}
6+
7+
func NewDuplicateAckFilter() *DuplicateAckFilter {
8+
return &DuplicateAckFilter{
9+
highestAckedBySSRC: make(map[uint32]int64),
10+
}
11+
}
12+
13+
func (f *DuplicateAckFilter) Filter(reports Report) {
14+
for ssrc, prs := range reports.SSRCToPacketReports {
15+
n := 0
16+
for _, report := range prs {
17+
if highest, ok := f.highestAckedBySSRC[ssrc]; !ok || report.SeqNr > highest {
18+
f.highestAckedBySSRC[ssrc] = report.SeqNr
19+
prs[n] = report
20+
n++
21+
}
22+
}
23+
reports.SSRCToPacketReports[ssrc] = prs[:n]
24+
}
25+
}

0 commit comments

Comments
 (0)