Skip to content

loggerd: add unit tests and fix identified sync issues #34840

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 15 additions & 72 deletions system/loggerd/loggerd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,9 @@
#include "common/params.h"
#include "system/loggerd/encoder/encoder.h"
#include "system/loggerd/loggerd.h"
#include "system/loggerd/video_writer.h"

ExitHandler do_exit;

struct LoggerdState {
LoggerState logger;
std::atomic<double> last_camera_seen_tms{0.0};
std::atomic<int> ready_to_rotate{0}; // count of encoders ready to rotate
int max_waiting = 0;
double last_rotate_tms = 0.; // last rotate time in ms
};

void logger_rotate(LoggerdState *s) {
bool ret =s->logger.next();
assert(ret);
Expand Down Expand Up @@ -53,17 +44,6 @@ void rotate_if_needed(LoggerdState *s) {
}
}

struct RemoteEncoder {
std::unique_ptr<VideoWriter> writer;
int encoderd_segment_offset;
int current_segment = -1;
std::vector<Message *> q;
int dropped_frames = 0;
bool recording = false;
bool marked_ready_to_rotate = false;
bool seen_first_packet = false;
};

size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEncoder &re, const EncoderInfo &encoder_info) {
auto edata = (event.*(encoder_info.get_encode_data_func))();
auto idx = edata.getIdx();
Expand Down Expand Up @@ -117,72 +97,35 @@ size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEnc
}

int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) {
int bytes_count = 0;

// extract the message
capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr<capnp::word>((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word)));
auto event = cmsg.getRoot<cereal::Event>();
auto edata = (event.*(encoder_info.get_encode_data_func))();
auto idx = edata.getIdx();

// encoderd can have started long before loggerd
if (!re.seen_first_packet) {
re.seen_first_packet = true;
re.encoderd_segment_offset = idx.getSegmentNum();
LOGD("%s: has encoderd offset %d", name.c_str(), re.encoderd_segment_offset);
}
int offset_segment_num = idx.getSegmentNum() - re.encoderd_segment_offset;
auto encoder_idx = (event.*(encoder_info.get_encode_data_func))().getIdx();

if (offset_segment_num == s->logger.segment()) {
// loggerd is now on the segment that matches this packet

// if this is a new segment, we close any possible old segments, move to the new, and process any queued packets
if (re.current_segment != s->logger.segment()) {
if (re.recording) {
re.writer.reset();
re.recording = false;
}
re.current_segment = s->logger.segment();
re.marked_ready_to_rotate = false;
// we are in this segment now, process any queued messages before this one
if (!re.q.empty()) {
for (auto qmsg : re.q) {
capnp::FlatArrayMessageReader reader({(capnp::word *)qmsg->getData(), qmsg->getSize() / sizeof(capnp::word)});
bytes_count += write_encode_data(s, reader.getRoot<cereal::Event>(), re, encoder_info);
delete qmsg;
}
re.q.clear();
int bytes_count = 0;
// Synchronize segment and process if aligned
if (re.syncSegment(s, name, encoder_idx.getSegmentNum(), s->logger.segment())) {
// Process any queued messages before the current one
if (!re.q.empty()) {
for (auto qmsg : re.q) {
capnp::FlatArrayMessageReader reader({(capnp::word *)qmsg->getData(), qmsg->getSize() / sizeof(capnp::word)});
bytes_count += write_encode_data(s, reader.getRoot<cereal::Event>(), re, encoder_info);
delete qmsg;
}
re.q.clear();
}

// Process the current message
bytes_count += write_encode_data(s, event, re, encoder_info);
delete msg;
} else if (offset_segment_num > s->logger.segment()) {
// encoderd packet has a newer segment, this means encoderd has rolled over
if (!re.marked_ready_to_rotate) {
re.marked_ready_to_rotate = true;
++s->ready_to_rotate;
LOGD("rotate %d -> %d ready %d/%d for %s",
s->logger.segment(), offset_segment_num,
s->ready_to_rotate.load(), s->max_waiting, name.c_str());
}

// TODO: define this behavior, but for now don't leak
if (re.q.size() > MAIN_FPS*10) {
} else {
if (re.q.size() > MAIN_FPS * 10) {
LOGE_100("%s: dropping frame, queue is too large", name.c_str());
delete msg;
} else {
// queue up all the new segment messages, they go in after the rotate
re.q.push_back(msg);
}
} else {
LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoderd_segment_offset:%d",
name.c_str(), idx.getSegmentNum(), s->logger.segment(), re.encoderd_segment_offset);
// free the message, it's useless. this should never happen
// actually, this can happen if you restart encoderd
re.encoderd_segment_offset = -s->logger.segment();
delete msg;
}

return bytes_count;
}

Expand Down
73 changes: 73 additions & 0 deletions system/loggerd/loggerd.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "common/util.h"

#include "system/loggerd/logger.h"
#include "system/loggerd/video_writer.h"

constexpr int MAIN_FPS = 20;
const int MAIN_BITRATE = 1e7;
Expand Down Expand Up @@ -147,3 +148,75 @@ const LogCameraInfo stream_driver_camera_info{

const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info};
const LogCameraInfo stream_cameras_logged[] = {stream_road_camera_info, stream_wide_road_camera_info, stream_driver_camera_info};

struct LoggerdState {
LoggerState logger;
std::atomic<double> last_camera_seen_tms{0.0};
std::atomic<int> ready_to_rotate{0}; // count of encoders ready to rotate
int max_waiting = 0;
double last_rotate_tms = 0.; // last rotate time in ms
};


class RemoteEncoder {
public:
std::unique_ptr<VideoWriter> writer;
int encoder_segment_offset = 0;
int current_segment = -1;
std::vector<Message *> q;
int dropped_frames = 0;
bool recording = false;
bool marked_ready_to_rotate = false;
bool seen_first_packet = false;

bool syncSegment(LoggerdState *s, const std::string &name, int encoder_segment, int logger_segment) {
if (!seen_first_packet) {
seen_first_packet = true;
encoder_segment_offset = encoder_segment - logger_segment;
LOGD("%s: has encoderd offset %d", name.c_str(), encoder_segment_offset);
}

// Calculate adjusted segment based on offset
int adjusted_segment = encoder_segment - encoder_segment_offset;

// Case 1: Segments are synchronized
if (adjusted_segment == logger_segment) {
if (current_segment != logger_segment) {
// New segment detected; reset writer if recording
if (recording) {
writer.reset();
recording = false;
}
current_segment = logger_segment;
}
marked_ready_to_rotate = false;
return true;
}

// Case 2: Encoder is ahead (newer segment)
if (adjusted_segment > logger_segment) {
int segment_gap = adjusted_segment - logger_segment;
if (segment_gap > 1) {
LOGE("%s: encoder jumped ahead by %d segments (adj=%d, log=%d), adjusting offset",
name.c_str(), segment_gap, adjusted_segment, logger_segment);
encoder_segment_offset += segment_gap - 1;
}

if (!marked_ready_to_rotate) {
marked_ready_to_rotate = true;
++s->ready_to_rotate;
LOGD("rotate %d -> %d ready %d/%d for %s",
logger_segment, adjusted_segment,
s->ready_to_rotate.load(), s->max_waiting, name.c_str());
}

}
// Case 3: Encoder is behind (older segment)
else {
LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d encoder_segment_offset:%d",
name.c_str(), encoder_segment, logger_segment, encoder_segment_offset);
encoder_segment_offset = encoder_segment - logger_segment;
}
return false;
}
};
129 changes: 129 additions & 0 deletions system/loggerd/tests/test_logger.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "catch2/catch.hpp"
#include "system/loggerd/logger.h"
#include "system/loggerd/loggerd.h"

typedef cereal::Sentinel::SentinelType SentinelType;

Expand Down Expand Up @@ -73,3 +74,131 @@ TEST_CASE("logger") {
verify_segment(log_root + "/" + route_name, i, segment_cnt, 1);
}
}

TEST_CASE("RemoteEncoder::syncSegment robustness", "[sync]") {
LoggerdState state;
RemoteEncoder encoder;
std::string name = "test_encoder";

SECTION("Matching segment after offset set") {
REQUIRE(encoder.syncSegment(&state, name, 5, 5) == true); // First packet sets offset
REQUIRE(encoder.encoder_segment_offset == 0); // 5 - 5 = 0
REQUIRE(encoder.current_segment == 5);
REQUIRE(encoder.seen_first_packet == true);
REQUIRE(encoder.marked_ready_to_rotate == false);
REQUIRE(state.ready_to_rotate == 0);

REQUIRE(encoder.syncSegment(&state, name, 7, 7) == true); // 7 - 0 = 7 matches 7
REQUIRE(encoder.current_segment == 7);
REQUIRE(encoder.recording == false);
REQUIRE(encoder.marked_ready_to_rotate == false);
REQUIRE(state.ready_to_rotate == 0);
}

SECTION("Encoder restarts and sends segment 0") {
REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); // Initial sync
REQUIRE(encoder.encoder_segment_offset == 0); // 1 - 1 = 0
REQUIRE(encoder.current_segment == 0);

REQUIRE(encoder.syncSegment(&state, name, 0, 2) == false); // 0 - 0 = 0 < 2 (behind)
REQUIRE(encoder.encoder_segment_offset == -2); // Adjusted to 0 - 2 = -2
REQUIRE(encoder.marked_ready_to_rotate == false);
REQUIRE(state.ready_to_rotate == 0);

REQUIRE(encoder.syncSegment(&state, name, 0, 2) == true); // 0 - (-2) = 2 matches 2
REQUIRE(encoder.current_segment == 2);
REQUIRE(encoder.marked_ready_to_rotate == false);
}

SECTION("Encoder restarts and sends segment greater than 0") {
REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); // Initial sync
REQUIRE(encoder.encoder_segment_offset == 0); // 0 - 0 = 0
REQUIRE(encoder.current_segment == 0);

REQUIRE(encoder.syncSegment(&state, name, 2, 3) == false); // 2 - 0 = 2 < 3 (behind)
REQUIRE(encoder.encoder_segment_offset == -1); // Adjusted to 2 - 3 = -1
REQUIRE(encoder.marked_ready_to_rotate == false);
REQUIRE(state.ready_to_rotate == 0);

REQUIRE(encoder.syncSegment(&state, name, 2, 3) == true); // 2 - (-1) = 3 matches 3
REQUIRE(encoder.current_segment == 3);
}

SECTION("Logger restarts to lower segment") {
REQUIRE(encoder.syncSegment(&state, name, 5, 5) == true);
REQUIRE(encoder.encoder_segment_offset == 0);
REQUIRE(encoder.current_segment == 5);

// Logger restarts to 0, encoder at 6
REQUIRE(encoder.syncSegment(&state, name, 6, 0) == false); // 6 - 0 = 6 > 0 (ahead)
REQUIRE(encoder.marked_ready_to_rotate == true);
REQUIRE(state.ready_to_rotate == 1);
REQUIRE(encoder.current_segment == 5); // Unchanged until sync

// Encoder advances to 7, logger still at 0
REQUIRE(encoder.syncSegment(&state, name, 7, 0) == false); // 7 - 0 = 7 > 0
REQUIRE(state.ready_to_rotate == 1); // No further increment
}

SECTION("Encoder is ahead by more than one segment") {
REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true);
REQUIRE(encoder.encoder_segment_offset == 0);
REQUIRE(encoder.current_segment == 0);

// Encoder jumps to 2, logger at 0
REQUIRE(encoder.syncSegment(&state, name, 2, 0) == false); // 2 - 0 = 2 > 0
REQUIRE(encoder.encoder_segment_offset == 1); // Adjusted: += (2 - 1) = 1
REQUIRE(encoder.marked_ready_to_rotate == true);
REQUIRE(state.ready_to_rotate == 1);
REQUIRE(encoder.syncSegment(&state, name, 2, 0) == false);

// Logger advances to 1, encoder at 2
state.ready_to_rotate = 0;
REQUIRE(encoder.syncSegment(&state, name, 2, 1) == true); // 2 - 1 = 1 matches 1
REQUIRE(encoder.current_segment == 1);
REQUIRE(encoder.marked_ready_to_rotate == false);

REQUIRE(encoder.syncSegment(&state, name, 3, 1) == false); // 3 - 0 = 3 > 0
REQUIRE(state.ready_to_rotate == 1); // No additional increment
}

SECTION("Sync after rotation") {
REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true);
REQUIRE(encoder.encoder_segment_offset == 0);

REQUIRE(encoder.syncSegment(&state, name, 1, 0) == false); // 1 - 0 = 1 > 0
REQUIRE(encoder.marked_ready_to_rotate == true);
REQUIRE(state.ready_to_rotate == 1);

// Simulate rotation: logger catches up to encoder
REQUIRE(encoder.syncSegment(&state, name, 1, 1) == true); // 1 - 0 = 1 == 1
REQUIRE(encoder.current_segment == 1);
REQUIRE(encoder.marked_ready_to_rotate == false);
REQUIRE(state.ready_to_rotate == 1); // Not decremented here
}

SECTION("Encoder catches up after being behind") {
REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true);
REQUIRE(encoder.encoder_segment_offset == 0);

// Logger advances to 1, encoder sends 0
REQUIRE(encoder.syncSegment(&state, name, 0, 1) == false); // 0 - 0 = 0 < 1
REQUIRE(encoder.encoder_segment_offset == -1); // Adjusted to 0 - 1

// Logger advances to 2, encoder sends 1
REQUIRE(encoder.syncSegment(&state, name, 1, 2) == true); // 1 - (-1) = 2 == 2
REQUIRE(encoder.current_segment == 2);
REQUIRE(encoder.marked_ready_to_rotate == false);
}

SECTION("Recording reset on segment change") {
encoder.recording = true; // Simulate active recording
REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); // Initial sync
REQUIRE(encoder.encoder_segment_offset == 0);
REQUIRE(encoder.current_segment == 0);

REQUIRE(encoder.syncSegment(&state, name, 1, 1) == true); // 1 - 0 = 1 matches 1
REQUIRE(encoder.current_segment == 1);
REQUIRE(encoder.recording == false); // Recording reset on segment change
}
}
Loading