Skip to content
This repository was archived by the owner on Mar 15, 2024. It is now read-only.

Commit d71ffd9

Browse files
hannahhowardwillscott
authored andcommitted
feat(eventrecorder): complete aggregated event work
1 parent 6a8b9a4 commit d71ffd9

File tree

4 files changed

+235
-40
lines changed

4 files changed

+235
-40
lines changed

eventrecorder/event.go

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,28 @@ func (e EventBatch) Validate() error {
120120
return nil
121121
}
122122

123+
type RetrievalAttempt struct {
124+
Error string `json:"error,omitempty"`
125+
TimeToFirstByte string `json:"timeToFirstByte,omitempty"`
126+
}
127+
123128
type AggregateEvent struct {
124-
RetrievalID string `json:"retrievalId"` // The unique ID of the retrieval
125129
InstanceID string `json:"instanceId"` // The ID of the Lassie instance generating the event
130+
RetrievalID string `json:"retrievalId"` // The unique ID of the retrieval
126131
StorageProviderID string `json:"storageProviderId,omitempty"` // The ID of the storage provider that served the retrieval content
127-
TimeToFirstByte int64 `json:"timeToFirstByte,omitempty"` // The time it took to receive the first byte in milliseconds
132+
TimeToFirstByte string `json:"timeToFirstByte,omitempty"` // The time it took to receive the first byte in milliseconds
128133
Bandwidth uint64 `json:"bandwidth,omitempty"` // The bandwidth of the retrieval in bytes per second
134+
BytesTransferred uint64 `json:"bytesTransferred,omitempty"` // The total transmitted deal size
129135
Success bool `json:"success"` // Wether or not the retreival ended with a success event
130136
StartTime time.Time `json:"startTime"` // The time the retrieval started
131137
EndTime time.Time `json:"endTime"` // The time the retrieval ended
138+
139+
TimeToFirstIndexerResult string `json:"timeToFirstIndexerResult,omitempty"` // time it took to receive our first "CandidateFound" event
140+
IndexerCandidatesReceived int `json:"indexerCandidatesReceived"` // The number of candidates received from the indexer
141+
IndexerCandidatesFiltered int `json:"indexerCandidatesFiltered"` // The number of candidates that made it through the filtering stage
142+
ProtocolsAllowed []string `json:"protocolsAllowed,omitempty"` // The available protocols that could be used for this retrieval
143+
ProtocolsAttempted []string `json:"protocolsAttempted,omitempty"` // The protocols that were used to attempt this retrieval
144+
RetrievalAttempts map[string]*RetrievalAttempt `json:"retrievalAttempts,omitempty"` // All of the retrieval attempts, indexed by their SP ID
132145
}
133146

134147
func (e AggregateEvent) Validate() error {
@@ -144,6 +157,29 @@ func (e AggregateEvent) Validate() error {
144157
case e.EndTime.Before(e.StartTime):
145158
return errors.New("property endTime cannot be before startTime")
146159
default:
160+
if e.TimeToFirstByte != "" {
161+
_, err := time.ParseDuration(e.TimeToFirstByte)
162+
if err != nil {
163+
return err
164+
}
165+
}
166+
if e.TimeToFirstIndexerResult != "" {
167+
_, err := time.ParseDuration(e.TimeToFirstIndexerResult)
168+
if err != nil {
169+
return err
170+
}
171+
}
172+
for _, retrievalAttempt := range e.RetrievalAttempts {
173+
if retrievalAttempt == nil {
174+
return errors.New("all retrieval attempts should have values")
175+
}
176+
if retrievalAttempt.TimeToFirstByte != "" {
177+
_, err := time.ParseDuration(retrievalAttempt.TimeToFirstByte)
178+
if err != nil {
179+
return err
180+
}
181+
}
182+
}
147183
return nil
148184
}
149185
}

eventrecorder/recorder.go

Lines changed: 82 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package eventrecorder
33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
"github.com/filecoin-project/lassie/pkg/types"
89
"github.com/ipfs/go-log/v2"
@@ -105,47 +106,120 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr
105106
totalLogger := logger.With("total", len(events))
106107

107108
var batchQuery pgx.Batch
109+
var batchRetrievalAttempts pgx.Batch
108110
for _, event := range events {
111+
var timeToFirstByte time.Duration
112+
if event.TimeToFirstByte != "" {
113+
timeToFirstByte, _ = time.ParseDuration(event.TimeToFirstByte)
114+
}
115+
var timeToFirstIndexerResult time.Duration
116+
if event.TimeToFirstIndexerResult != "" {
117+
timeToFirstIndexerResult, _ = time.ParseDuration(event.TimeToFirstIndexerResult)
118+
}
109119
query := `
110120
INSERT INTO aggregate_retrieval_events(
111121
instance_id,
112122
retrieval_id,
113123
storage_provider_id,
114-
time_to_first_byte_ms,
124+
time_to_first_byte,
115125
bandwidth_bytes_sec,
126+
bytes_transferred,
116127
success,
117128
start_time,
118-
end_time
129+
end_time,
130+
time_to_first_indexer_result,
131+
indexer_candidates_received,
132+
indexer_candidates_filtered,
133+
protocols_allowed,
134+
protocols_attempted
119135
)
120-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
136+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
121137
`
122138
batchQuery.Queue(query,
123139
event.InstanceID,
124140
event.RetrievalID,
125141
event.StorageProviderID,
126-
event.TimeToFirstByte,
142+
timeToFirstByte,
127143
event.Bandwidth,
144+
event.BytesTransferred,
128145
event.Success,
129146
event.StartTime,
130147
event.EndTime,
148+
timeToFirstIndexerResult,
149+
event.IndexerCandidatesReceived,
150+
event.IndexerCandidatesFiltered,
151+
event.ProtocolsAllowed,
152+
event.ProtocolsAttempted,
131153
).Exec(func(ct pgconn.CommandTag) error {
132154
rowsAffected := ct.RowsAffected()
133155
switch rowsAffected {
134156
case 0:
135-
totalLogger.Warnw("Retrieval event insertion did not affect any rows", "event", event, "rowsAffected", rowsAffected)
157+
totalLogger.Warnw("Aggregated event insertion did not affect any rows", "event", event, "rowsAffected", rowsAffected)
136158
default:
137-
totalLogger.Debugw("Inserted event successfully", "event", event, "rowsAffected", rowsAffected)
159+
totalLogger.Debugw("Inserted aggregated event successfully", "event", event, "rowsAffected", rowsAffected)
138160
}
139161
return nil
140162
})
163+
attempts := make(map[string]string, len(event.RetrievalAttempts))
164+
for storageProviderID, retrievalAttempt := range event.RetrievalAttempts {
165+
attempts[storageProviderID] = retrievalAttempt.Error
166+
var timeToFirstByte time.Duration
167+
if retrievalAttempt.TimeToFirstByte != "" {
168+
timeToFirstByte, _ = time.ParseDuration(retrievalAttempt.TimeToFirstByte)
169+
}
170+
query := `
171+
INSERT INTO retrieval_attempts(
172+
retrieval_id,
173+
storage_provider_id,
174+
time_to_first_byte,
175+
error
176+
)
177+
VALUES ($1, $2, $3, $4)
178+
`
179+
batchRetrievalAttempts.Queue(query,
180+
event.RetrievalID,
181+
storageProviderID,
182+
timeToFirstByte,
183+
retrievalAttempt.Error,
184+
).Exec(func(ct pgconn.CommandTag) error {
185+
rowsAffected := ct.RowsAffected()
186+
switch rowsAffected {
187+
case 0:
188+
totalLogger.Warnw("Retrieval attempt insertion did not affect any rows", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected)
189+
default:
190+
totalLogger.Debugw("Inserted retrieval attempt successfully", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected)
191+
}
192+
return nil
193+
})
194+
}
195+
if r.cfg.metrics != nil {
196+
r.cfg.metrics.HandleAggregatedEvent(ctx,
197+
timeToFirstIndexerResult,
198+
timeToFirstByte,
199+
event.Success,
200+
event.StorageProviderID,
201+
event.StartTime,
202+
event.EndTime,
203+
int64(event.Bandwidth),
204+
int64(event.BytesTransferred),
205+
int64(event.IndexerCandidatesReceived),
206+
int64(event.IndexerCandidatesFiltered),
207+
attempts,
208+
)
209+
}
141210
}
142211
batchResult := r.db.SendBatch(ctx, &batchQuery)
143212
err := batchResult.Close()
144213
if err != nil {
145-
totalLogger.Errorw("At least one retrieval event insertion failed", "err", err)
214+
totalLogger.Errorw("At least one aggregated event insertion failed", "err", err)
215+
return err
216+
}
217+
batchResult = r.db.SendBatch(ctx, &batchRetrievalAttempts)
218+
err = batchResult.Close()
219+
if err != nil {
220+
totalLogger.Errorw("At least one retrieval attempt insertion failed", "err", err)
146221
return err
147222
}
148-
149223
totalLogger.Info("Successfully submitted batch event insertion")
150224
return nil
151225
}

metrics/events.go

Lines changed: 96 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,31 +31,7 @@ func (m *Metrics) HandleFailureEvent(ctx context.Context, id types.RetrievalID,
3131
if storageProviderID != types.BitswapIndentifier {
3232
m.graphsyncRetrievalFailureCount.Add(ctx, 1, attribute.String("sp_id", storageProviderID))
3333
}
34-
var errorMetricMatches = map[string]instrument.Int64Counter{
35-
"response rejected": m.retrievalErrorRejectedCount,
36-
"Too many retrieval deals received": m.retrievalErrorTooManyCount,
37-
"Access Control": m.retrievalErrorACLCount,
38-
"Under maintenance, retry later": m.retrievalErrorMaintenanceCount,
39-
"miner is not accepting online retrieval deals": m.retrievalErrorNoOnlineCount,
40-
"unconfirmed block transfer": m.retrievalErrorUnconfirmedCount,
41-
"timeout after ": m.retrievalErrorTimeoutCount,
42-
"there is no unsealed piece containing payload cid": m.retrievalErrorNoUnsealedCount,
43-
"getting pieces for cid": m.retrievalErrorDAGStoreCount,
44-
"graphsync request failed to complete: request failed - unknown reason": m.retrievalErrorGraphsyncCount,
45-
"failed to dial": m.retrievalErrorFailedToDialCount,
46-
}
47-
48-
var matched bool
49-
for substr, metric := range errorMetricMatches {
50-
if strings.Contains(msg, substr) {
51-
metric.Add(ctx, 1, attribute.String("protocol", protocol(storageProviderID)))
52-
matched = true
53-
break
54-
}
55-
}
56-
if !matched {
57-
m.retrievalErrorOtherCount.Add(ctx, 1, attribute.String("protocol", protocol(storageProviderID)))
58-
}
34+
m.matchErrorMessage(ctx, msg, storageProviderID)
5935
}
6036
}
6137

@@ -158,6 +134,101 @@ func (m *Metrics) HandleSuccessEvent(ctx context.Context, id types.RetrievalID,
158134
m.failedRetrievalsPerRequestCount.Record(ctx, int64(finalDetails.FailedCount))
159135
}
160136

137+
func (m *Metrics) HandleAggregatedEvent(ctx context.Context,
138+
timeToFirstIndexerResult time.Duration,
139+
timeToFirstByte time.Duration,
140+
success bool,
141+
storageProviderID string,
142+
startTime time.Time,
143+
endTime time.Time,
144+
bandwidth int64,
145+
bytesTransferred int64,
146+
indexerCandidates int64,
147+
indexerFiltered int64,
148+
attempts map[string]string) {
149+
m.totalRequestCount.Add(ctx, 1)
150+
failureCount := 0
151+
var recordedGraphSync, recordedBitswap bool
152+
for storageProviderID, msg := range attempts {
153+
if storageProviderID == types.BitswapIndentifier {
154+
if !recordedBitswap {
155+
recordedBitswap = true
156+
m.requestWithBitswapAttempt.Add(ctx, 1)
157+
}
158+
} else {
159+
if !recordedGraphSync {
160+
recordedGraphSync = true
161+
m.requestWithGraphSyncAttempt.Add(ctx, 1, attribute.String("sp_id", storageProviderID))
162+
}
163+
}
164+
if msg != "" {
165+
if storageProviderID != types.BitswapIndentifier {
166+
m.graphsyncRetrievalFailureCount.Add(ctx, 1, attribute.String("sp_id", storageProviderID))
167+
}
168+
m.matchErrorMessage(ctx, msg, storageProviderID)
169+
failureCount += 0
170+
}
171+
}
172+
if timeToFirstIndexerResult > 0 {
173+
m.timeToFirstIndexerResult.Record(ctx, timeToFirstIndexerResult.Seconds())
174+
}
175+
if indexerCandidates > 0 {
176+
m.requestWithIndexerCandidatesCount.Add(ctx, 1)
177+
}
178+
if indexerFiltered > 0 {
179+
m.requestWithIndexerCandidatesFilteredCount.Add(ctx, 1)
180+
}
181+
if timeToFirstByte > 0 {
182+
m.requestWithFirstByteReceivedCount.Add(ctx, 1, attribute.String("protocol", protocol(storageProviderID)))
183+
m.timeToFirstByte.Record(ctx, timeToFirstByte.Seconds(), attribute.String("protocol", protocol(storageProviderID)))
184+
}
185+
if success {
186+
m.requestWithSuccessCount.Add(ctx, 1)
187+
if storageProviderID == types.BitswapIndentifier {
188+
m.requestWithBitswapSuccessCount.Add(ctx, 1)
189+
} else {
190+
m.requestWithGraphSyncSuccessCount.Add(ctx, 1, attribute.String("sp_id", storageProviderID))
191+
}
192+
193+
m.retrievalDealDuration.Record(ctx, endTime.Sub(startTime).Seconds(), attribute.String("protocol", protocol(storageProviderID)))
194+
m.retrievalDealSize.Record(ctx, bytesTransferred, attribute.String("protocol", protocol(storageProviderID)))
195+
m.bandwidthBytesPerSecond.Record(ctx, bandwidth, attribute.String("protocol", protocol(storageProviderID)))
196+
197+
m.indexerCandidatesPerRequestCount.Record(ctx, indexerCandidates)
198+
m.indexerCandidatesFilteredPerRequestCount.Record(ctx, indexerFiltered)
199+
m.failedRetrievalsPerRequestCount.Record(ctx, int64(failureCount))
200+
} else if len(attempts) == 0 {
201+
m.requestWithIndexerFailures.Add(ctx, 1)
202+
}
203+
}
204+
205+
func (m *Metrics) matchErrorMessage(ctx context.Context, msg string, storageProviderID string) {
206+
var errorMetricMatches = map[string]instrument.Int64Counter{
207+
"response rejected": m.retrievalErrorRejectedCount,
208+
"Too many retrieval deals received": m.retrievalErrorTooManyCount,
209+
"Access Control": m.retrievalErrorACLCount,
210+
"Under maintenance, retry later": m.retrievalErrorMaintenanceCount,
211+
"miner is not accepting online retrieval deals": m.retrievalErrorNoOnlineCount,
212+
"unconfirmed block transfer": m.retrievalErrorUnconfirmedCount,
213+
"timeout after ": m.retrievalErrorTimeoutCount,
214+
"there is no unsealed piece containing payload cid": m.retrievalErrorNoUnsealedCount,
215+
"getting pieces for cid": m.retrievalErrorDAGStoreCount,
216+
"graphsync request failed to complete: request failed - unknown reason": m.retrievalErrorGraphsyncCount,
217+
"failed to dial": m.retrievalErrorFailedToDialCount,
218+
}
219+
220+
var matched bool
221+
for substr, metric := range errorMetricMatches {
222+
if strings.Contains(msg, substr) {
223+
metric.Add(ctx, 1, attribute.String("protocol", protocol(storageProviderID)))
224+
matched = true
225+
break
226+
}
227+
}
228+
if !matched {
229+
m.retrievalErrorOtherCount.Add(ctx, 1, attribute.String("protocol", protocol(storageProviderID)))
230+
}
231+
}
161232
func protocol(storageProviderId string) string {
162233
if storageProviderId == types.BitswapIndentifier {
163234
return "bitswap"

schema.sql

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,26 @@ create table if not exists retrieval_events(
1111
);
1212

1313
create table if not exists aggregate_retrieval_events(
14-
retrieval_id uuid not null,
14+
retrieval_id uuid not null UNIQUE,
1515
instance_id character varying(64) not null,
1616
storage_provider_id character varying(256),
17-
time_to_first_byte_ms integer,
18-
bandwidth_bytes_sec integer,
17+
time_to_first_byte int8,
18+
bandwidth_bytes_sec int8,
19+
bytes_transferred int8,
1920
success boolean not null,
2021
start_time timestamp with time zone not null,
21-
end_time timestamp with time zone not null
22-
);
22+
end_time timestamp with time zone not null,
23+
time_to_first_indexer_result int8,
24+
indexer_candidates_received integer,
25+
indexer_candidates_filtered integer,
26+
protocols_allowed varchar[256][],
27+
protocols_attempted varchar[256][]
28+
);
29+
30+
create table if not exists retrieval_attempts(
31+
retrieval_id uuid not null,
32+
storage_provider_id character varying(256),
33+
time_to_first_byte int8,
34+
error character varying(256),
35+
FOREIGN KEY (retrieval_id) REFERENCES aggregate_retrieval_events (retrieval_id)
36+
);

0 commit comments

Comments
 (0)