Skip to content

Commit d107533

Browse files
banitag1facebook-github-bot
authored andcommitted
Add support of request tracking (#891)
Summary: Pull Request resolved: #891 This diff adds the support of request tracking, so we can visualize the request processing across threads and also monitor the tail latency behavior. Reviewed By: zyan0 Differential Revision: D42002684 fbshipit-source-id: 07cfff683c9de092d54547704e41f680b9c3e1e6
1 parent 5ca2a14 commit d107533

File tree

3 files changed

+23
-1
lines changed

3 files changed

+23
-1
lines changed

torchrec/inference/include/torchrec/inference/Types.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ struct PredictionResponse {
6262
struct RequestContext {
6363
uint32_t batchSize;
6464
folly::Promise<std::unique_ptr<PredictionResponse>> promise;
65+
// folly request context for request tracking in crochet
66+
std::shared_ptr<folly::RequestContext> follyRequestContext;
6567
};
6668

6769
using PredictionException = std::runtime_error;

torchrec/inference/src/BatchingQueue.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,10 @@ void BatchingQueue::add(
101101
const auto batchSize = request->batch_size;
102102
queue.push(QueryQueueEntry{
103103
std::move(request),
104-
RequestContext{batchSize, std::move(promise)},
104+
RequestContext{
105+
batchSize,
106+
std::move(promise),
107+
folly::RequestContext::saveContext()},
105108
addedTime});
106109
});
107110
}
@@ -150,6 +153,7 @@ void BatchingQueue::createBatch() {
150153
}
151154

152155
auto& context = contexts.emplace_back(std::move(front.context));
156+
folly::RequestContext::setContext(context.follyRequestContext);
153157
requests.push_back(std::move(front.request));
154158
batchSize += requests.back()->batch_size;
155159
queue.pop();
@@ -178,6 +182,8 @@ void BatchingQueue::createBatch() {
178182
contexts.clear();
179183
}
180184

185+
folly::RequestContext::setContext(nullptr);
186+
181187
if (!full) {
182188
/* sleep override */
183189
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -207,6 +213,9 @@ void BatchingQueue::pinMemory(int gpuIdx) {
207213
if (!requests.empty() || !contexts.empty()) {
208214
RECORD_USER_SCOPE("PinMemory");
209215

216+
if (!contexts.empty()) {
217+
folly::RequestContext::setContext(contexts[0].follyRequestContext);
218+
}
210219
// Combine data.
211220
size_t combinedBatchSize = 0;
212221
for (auto i : c10::irange(requests.size())) {
@@ -323,6 +332,9 @@ void BatchingQueue::pinMemory(int gpuIdx) {
323332
observer_->observeBatchCompletion(batch->size(), batch->batchSize);
324333

325334
cbs_[gpuIdx](batch);
335+
336+
// unset request tracking
337+
folly::RequestContext::setContext(nullptr);
326338
}
327339
} catch (const std::exception& ex) {
328340
LOG(FATAL) << "Error batching requests, ex: " << folly::exceptionStr(ex);

torchrec/inference/src/GPUExecutor.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <folly/executors/CPUThreadPoolExecutor.h>
2121
#include <folly/futures/Future.h>
2222
#include <folly/io/IOBuf.h>
23+
#include <folly/io/async/Request.h>
2324
#include <folly/stop_watch.h>
2425
#include <gflags/gflags.h>
2526
#include <glog/logging.h>
@@ -204,6 +205,10 @@ void GPUExecutor::process(int idx) {
204205
continue;
205206
}
206207

208+
if (!batch->contexts.empty()) {
209+
folly::RequestContext::setContext(batch->contexts[0].follyRequestContext);
210+
}
211+
207212
auto timeInQueue = getTimeElapsedMS(batch->enqueueTime);
208213
observer_->recordQueueLatency(timeInQueue.count());
209214

@@ -324,6 +329,9 @@ void GPUExecutor::process(int idx) {
324329
observer->recordTotalLatency(
325330
getTimeElapsedMS(batch->enqueueTime).count());
326331
});
332+
333+
// reset request tracking
334+
folly::RequestContext::setContext(nullptr);
327335
}
328336
}
329337

0 commit comments

Comments
 (0)