Skip to content

Commit 2363616

Browse files
committed
Delete overwritten segment files when detected.
1 parent 4cd6b18 commit 2363616

File tree

4 files changed

+162
-85
lines changed

4 files changed

+162
-85
lines changed

src/ra_lib.erl

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -471,19 +471,11 @@ consult(Path) ->
471471
Err
472472
end.
473473

474+
-spec cons(term(), list()) -> list().
474475
cons(Item, List)
475476
when is_list(List) ->
476477
[Item | List].
477478

478-
tokens(Str) ->
479-
case erl_scan:string(Str) of
480-
{ok, Tokens, _EndLoc} ->
481-
erl_parse:parse_term(Tokens);
482-
{error, Err, _ErrLoc} ->
483-
{error, Err}
484-
end.
485-
486-
487479
%% raw copy of ensure_dir
488480
ensure_dir("/") ->
489481
ok;
@@ -510,6 +502,15 @@ ensure_dir(F) ->
510502
end
511503
end.
512504

505+
tokens(Str) ->
506+
case erl_scan:string(Str) of
507+
{ok, Tokens, _EndLoc} ->
508+
erl_parse:parse_term(Tokens);
509+
{error, Err, _ErrLoc} ->
510+
{error, Err}
511+
end.
512+
513+
513514
-ifdef(TEST).
514515
-include_lib("eunit/include/eunit.hrl").
515516

src/ra_log.erl

Lines changed: 64 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,17 @@ init(#{uid := UId,
230230
SegRefs = my_segrefs(UId, SegWriter),
231231
Reader = ra_log_reader:init(UId, Dir, MaxOpen, AccessPattern, SegRefs,
232232
Names, Counter),
233-
SegRange = ra_log_reader:range(Reader),
233+
SegmentRange = ra_log_reader:range(Reader),
234234
%% TODO: check ra_range:add/2 actually performas the correct logic we expect
235-
Range = ra_range:add(MtRange, SegRange),
235+
Range = ra_range:add(MtRange, SegmentRange),
236+
237+
%% TODO: review this
238+
[begin
239+
?DEBUG("~ts: deleting overwritten segment ~w",
240+
[LogId, SR]),
241+
catch prim_file:delete(filename:join(Dir, F))
242+
end
243+
|| {_, F} = SR <- SegRefs -- ra_log_reader:segment_refs(Reader)],
236244

237245
%% assert there is no gap between the snapshot
238246
%% and the first index in the log
@@ -298,10 +306,10 @@ init(#{uid := UId,
298306
{_, LI} ->
299307
fetch_term(LI, State0)
300308
end,
301-
LastSegRefIdx = case SegRefs of
302-
[] ->
309+
LastSegRefIdx = case SegmentRange of
310+
undefined ->
303311
-1;
304-
[{{_, L}, _} | _] ->
312+
{_, L} ->
305313
L
306314
end,
307315
LastWrittenIdx = case ra_log_wal:last_writer_seq(Wal, UId) of
@@ -311,7 +319,8 @@ init(#{uid := UId,
311319
{ok, Idx} ->
312320
max(Idx, LastSegRefIdx);
313321
{error, wal_down} ->
314-
?ERROR("~ts: ra_log:init/1 cannot complete as wal process is down.",
322+
?ERROR("~ts: ra_log:init/1 cannot complete as wal"
323+
" process is down.",
315324
[State2#?MODULE.cfg#cfg.log_id]),
316325
exit(wal_down)
317326
end,
@@ -332,9 +341,7 @@ init(#{uid := UId,
332341
State = maybe_append_first_entry(State4),
333342
?DEBUG("~ts: ra_log:init recovered last_index_term ~w"
334343
" snapshot_index_term ~w, last_written_index_term ~w",
335-
[State#?MODULE.cfg#cfg.log_id,
336-
last_index_term(State),
337-
{SnapIdx, SnapTerm},
344+
[LogId, last_index_term(State), {SnapIdx, SnapTerm},
338345
State#?MODULE.last_written_index_term
339346
]),
340347
assert(State).
@@ -767,11 +774,15 @@ handle_event({written, Term, WrittenSeq},
767774
end
768775
end;
769776
handle_event({segments, TidRanges, NewSegs},
770-
#?MODULE{cfg = #cfg{uid = UId, names = Names} = Cfg,
777+
#?MODULE{cfg = #cfg{uid = UId,
778+
log_id = LogId,
779+
directory = Dir,
780+
names = Names} = Cfg,
771781
reader = Reader0,
772782
pending = Pend0,
773783
mem_table = Mt0} = State0) ->
774-
Reader = ra_log_reader:update_segments(NewSegs, Reader0),
784+
{Reader, OverwrittenSegRefs} = ra_log_reader:update_segments(NewSegs, Reader0),
785+
775786
put_counter(Cfg, ?C_RA_SVR_METRIC_NUM_SEGMENTS,
776787
ra_log_reader:segment_ref_count(Reader)),
777788
%% the tid ranges arrive in the reverse order they were written
@@ -782,6 +793,7 @@ handle_event({segments, TidRanges, NewSegs},
782793
ok = ra_log_ets:execute_delete(Names, UId, Spec),
783794
Acc
784795
end, Mt0, TidRanges),
796+
785797
%% it is theoretically possible that the segment writer flush _could_
786798
%% over take WAL notifications
787799
%%
@@ -795,26 +807,35 @@ handle_event({segments, TidRanges, NewSegs},
795807
State = State0#?MODULE{reader = Reader,
796808
pending = Pend,
797809
mem_table = Mt},
798-
{State, []};
810+
Fun = fun () ->
811+
[begin
812+
?DEBUG("~ts: deleting overwritten segment ~w",
813+
[LogId, SR]),
814+
catch prim_file:delete(filename:join(Dir, F))
815+
end
816+
|| {_, F} = SR <- OverwrittenSegRefs],
817+
ok
818+
end,
819+
{State, [{bg_work, Fun, fun (_Err) -> ok end}]};
799820
handle_event({segments_to_be_deleted, SegRefs},
800821
#?MODULE{cfg = #cfg{uid = UId,
801822
log_id = LogId,
802823
directory = Dir,
803824
counter = Counter,
804825
names = Names},
805826
reader = Reader} = State) ->
806-
Fun = fun () ->
807-
[prim_file:delete(filename:join(Dir, F))
808-
|| {_, F} <- SegRefs],
809-
ok
810-
end,
811827
ActiveSegs = ra_log_reader:segment_refs(Reader) -- SegRefs,
812828
#{max_size := MaxOpenSegments} = ra_log_reader:info(Reader),
813829
% close all open segments
814830
ok = ra_log_reader:close(Reader),
815831
?DEBUG("~ts: ~b obsolete segments - remaining: ~b",
816832
[LogId, length(SegRefs), length(ActiveSegs)]),
817833
%% open a new segment with the new max open segment value
834+
Fun = fun () ->
835+
[prim_file:delete(filename:join(Dir, F))
836+
|| {_, F} <- SegRefs],
837+
ok
838+
end,
818839
{State#?MODULE{reader = ra_log_reader:init(UId, Dir, MaxOpenSegments,
819840
random,
820841
ActiveSegs, Names, Counter)},
@@ -824,8 +845,6 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
824845
#?MODULE{cfg = #cfg{uid = UId,
825846
names = Names} = Cfg,
826847
range = {FstIdx, _} = Range,
827-
% first_index = FstIdx,
828-
% last_index = LstIdx,
829848
mem_table = Mt0,
830849
pending = Pend0,
831850
last_written_index_term = {LastWrittenIdx, _} = LWIdxTerm0,
@@ -1307,8 +1326,9 @@ release_resources(MaxOpenSegments,
13071326
%%% Local functions
13081327

13091328

1310-
schedule_compaction(SnapIdx, #?MODULE{cfg = #cfg{},
1311-
snapshot_state = SnapState,
1329+
schedule_compaction(SnapIdx, #?MODULE{cfg = #cfg{uid = _UId,
1330+
segment_writer = _SegWriter},
1331+
live_indexes = LiveIndexes,
13121332
reader = Reader0}) ->
13131333
case ra_log_reader:segment_refs(Reader0) of
13141334
[] ->
@@ -1320,29 +1340,34 @@ schedule_compaction(SnapIdx, #?MODULE{cfg = #cfg{},
13201340
SegRefs = lists:takewhile(fun ({{_Start, End}, _}) ->
13211341
End =< SnapIdx
13221342
end, lists:reverse(Compactable)),
1323-
SnapDir = ra_snapshot:current_snapshot_dir(SnapState),
1343+
% SnapDir = ra_snapshot:current_snapshot_dir(SnapState),
13241344

13251345
%% TODO: minor compactions should also delete / truncate
13261346
%% segments with completely overwritten indexes
13271347

13281348
Self = self(),
1329-
Fun = fun () ->
1330-
{ok, Indexes} = ra_snapshot:indexes(SnapDir),
1331-
{Delete, _} = lists:foldl(
1332-
fun ({Range, _} = S, {Del, Keep}) ->
1333-
case ra_seq:in_range(Range, Indexes) of
1334-
[] ->
1335-
{[S | Del], Keep};
1336-
_ ->
1337-
{Del, [S | Keep]}
1338-
end
1339-
end, {[], []}, SegRefs),
1340-
%% need to update the ra_servers list of seg refs _before_
1341-
%% the segments can actually be deleted
1342-
Self ! {ra_log_event,
1343-
{segments_to_be_deleted, Delete}},
1344-
ok
1345-
end,
1349+
Fun =
1350+
fun () ->
1351+
% {ok, Indexes} = ra_snapshot:indexes(SnapDir),
1352+
1353+
%% get all current segrefs
1354+
% AllSegRefs = my_segrefs(UId, SegWriter),
1355+
Delete = lists:foldl(
1356+
fun({Range, _} = S, Del) ->
1357+
case ra_seq:in_range(Range,
1358+
LiveIndexes) of
1359+
[] ->
1360+
[S | Del];
1361+
_ ->
1362+
Del
1363+
end
1364+
end, [], SegRefs),
1365+
%% need to update the ra_servers list of seg refs _before_
1366+
%% the segments can actually be deleted
1367+
Self ! {ra_log_event,
1368+
{segments_to_be_deleted, Delete}},
1369+
ok
1370+
end,
13461371

13471372
[{bg_work, Fun, fun (_Err) ->
13481373
?WARN("bgwork err ~p", [_Err]), ok

src/ra_log_reader.erl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,17 @@ close(#?STATE{open_segments = Open}) ->
103103
_ = ra_flru:evict_all(Open),
104104
ok.
105105

106-
-spec update_segments([segment_ref()], state()) -> state().
106+
-spec update_segments([segment_ref()], state()) ->
107+
{state(), OverwrittenSegments :: [segment_ref()]}.
107108
update_segments(NewSegmentRefs,
108-
#?STATE{open_segments = Open0,
109+
#?STATE{cfg = _Cfg,
110+
open_segments = Open0,
109111
segment_refs = SegRefs0} = State) ->
110112

111113
SegmentRefs0 = ra_lol:to_list(SegRefs0),
112114
%% TODO: capture segrefs removed by compact_segrefs/2 and delete them
113115
SegmentRefsComp = compact_segrefs(NewSegmentRefs, SegmentRefs0),
116+
OverwrittenSegments = NewSegmentRefs -- SegmentRefsComp,
114117
SegmentRefsCompRev = lists:reverse(SegmentRefsComp),
115118
SegRefs = ra_lol:from_list(fun seg_ref_gt/2, SegmentRefsCompRev),
116119
Range = case SegmentRefsComp of
@@ -129,9 +132,10 @@ update_segments(NewSegmentRefs,
129132
error -> Acc0
130133
end
131134
end, Open0, NewSegmentRefs),
132-
State#?MODULE{segment_refs = SegRefs,
135+
{State#?MODULE{segment_refs = SegRefs,
133136
range = Range,
134-
open_segments = Open}.
137+
open_segments = Open},
138+
OverwrittenSegments}.
135139

136140
-record(log_compaction_result,
137141
{%range :: ra:range(),

0 commit comments

Comments
 (0)