Skip to content

Commit 8cf4e11

Browse files
committed
Delete overwritten segment files when detected.
1 parent 4cd6b18 commit 8cf4e11

File tree

3 files changed

+149
-71
lines changed

3 files changed

+149
-71
lines changed

src/ra_log.erl

Lines changed: 64 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,17 @@ init(#{uid := UId,
230230
SegRefs = my_segrefs(UId, SegWriter),
231231
Reader = ra_log_reader:init(UId, Dir, MaxOpen, AccessPattern, SegRefs,
232232
Names, Counter),
233-
SegRange = ra_log_reader:range(Reader),
233+
SegmentRange = ra_log_reader:range(Reader),
234234
%% TODO: check ra_range:add/2 actually performas the correct logic we expect
235-
Range = ra_range:add(MtRange, SegRange),
235+
Range = ra_range:add(MtRange, SegmentRange),
236+
237+
%% TODO: review this
238+
[begin
239+
?DEBUG("~ts: deleting overwritten segment ~w",
240+
[LogId, SR]),
241+
catch prim_file:delete(filename:join(Dir, F))
242+
end
243+
|| {_, F} = SR <- SegRefs -- ra_log_reader:segment_refs(Reader)],
236244

237245
%% assert there is no gap between the snapshot
238246
%% and the first index in the log
@@ -298,10 +306,10 @@ init(#{uid := UId,
298306
{_, LI} ->
299307
fetch_term(LI, State0)
300308
end,
301-
LastSegRefIdx = case SegRefs of
302-
[] ->
309+
LastSegRefIdx = case SegmentRange of
310+
undefined ->
303311
-1;
304-
[{{_, L}, _} | _] ->
312+
{_, L} ->
305313
L
306314
end,
307315
LastWrittenIdx = case ra_log_wal:last_writer_seq(Wal, UId) of
@@ -311,7 +319,8 @@ init(#{uid := UId,
311319
{ok, Idx} ->
312320
max(Idx, LastSegRefIdx);
313321
{error, wal_down} ->
314-
?ERROR("~ts: ra_log:init/1 cannot complete as wal process is down.",
322+
?ERROR("~ts: ra_log:init/1 cannot complete as wal"
323+
" process is down.",
315324
[State2#?MODULE.cfg#cfg.log_id]),
316325
exit(wal_down)
317326
end,
@@ -332,9 +341,7 @@ init(#{uid := UId,
332341
State = maybe_append_first_entry(State4),
333342
?DEBUG("~ts: ra_log:init recovered last_index_term ~w"
334343
" snapshot_index_term ~w, last_written_index_term ~w",
335-
[State#?MODULE.cfg#cfg.log_id,
336-
last_index_term(State),
337-
{SnapIdx, SnapTerm},
344+
[LogId, last_index_term(State), {SnapIdx, SnapTerm},
338345
State#?MODULE.last_written_index_term
339346
]),
340347
assert(State).
@@ -767,11 +774,15 @@ handle_event({written, Term, WrittenSeq},
767774
end
768775
end;
769776
handle_event({segments, TidRanges, NewSegs},
770-
#?MODULE{cfg = #cfg{uid = UId, names = Names} = Cfg,
777+
#?MODULE{cfg = #cfg{uid = UId,
778+
log_id = LogId,
779+
directory = Dir,
780+
names = Names} = Cfg,
771781
reader = Reader0,
772782
pending = Pend0,
773783
mem_table = Mt0} = State0) ->
774-
Reader = ra_log_reader:update_segments(NewSegs, Reader0),
784+
{Reader, OverwrittenSegRefs} = ra_log_reader:update_segments(NewSegs, Reader0),
785+
775786
put_counter(Cfg, ?C_RA_SVR_METRIC_NUM_SEGMENTS,
776787
ra_log_reader:segment_ref_count(Reader)),
777788
%% the tid ranges arrive in the reverse order they were written
@@ -782,6 +793,7 @@ handle_event({segments, TidRanges, NewSegs},
782793
ok = ra_log_ets:execute_delete(Names, UId, Spec),
783794
Acc
784795
end, Mt0, TidRanges),
796+
785797
%% it is theoretically possible that the segment writer flush _could_
786798
%% over take WAL notifications
787799
%%
@@ -795,26 +807,35 @@ handle_event({segments, TidRanges, NewSegs},
795807
State = State0#?MODULE{reader = Reader,
796808
pending = Pend,
797809
mem_table = Mt},
798-
{State, []};
810+
Fun = fun () ->
811+
[begin
812+
?DEBUG("~ts: deleting overwritten segment ~w",
813+
[LogId, SR]),
814+
catch prim_file:delete(filename:join(Dir, F))
815+
end
816+
|| {_, F} = SR <- OverwrittenSegRefs],
817+
ok
818+
end,
819+
{State, [{bg_work, Fun, fun (_Err) -> ok end}]};
799820
handle_event({segments_to_be_deleted, SegRefs},
800821
#?MODULE{cfg = #cfg{uid = UId,
801822
log_id = LogId,
802823
directory = Dir,
803824
counter = Counter,
804825
names = Names},
805826
reader = Reader} = State) ->
806-
Fun = fun () ->
807-
[prim_file:delete(filename:join(Dir, F))
808-
|| {_, F} <- SegRefs],
809-
ok
810-
end,
811827
ActiveSegs = ra_log_reader:segment_refs(Reader) -- SegRefs,
812828
#{max_size := MaxOpenSegments} = ra_log_reader:info(Reader),
813829
% close all open segments
814830
ok = ra_log_reader:close(Reader),
815831
?DEBUG("~ts: ~b obsolete segments - remaining: ~b",
816832
[LogId, length(SegRefs), length(ActiveSegs)]),
817833
%% open a new segment with the new max open segment value
834+
Fun = fun () ->
835+
[prim_file:delete(filename:join(Dir, F))
836+
|| {_, F} <- SegRefs],
837+
ok
838+
end,
818839
{State#?MODULE{reader = ra_log_reader:init(UId, Dir, MaxOpenSegments,
819840
random,
820841
ActiveSegs, Names, Counter)},
@@ -824,8 +845,6 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
824845
#?MODULE{cfg = #cfg{uid = UId,
825846
names = Names} = Cfg,
826847
range = {FstIdx, _} = Range,
827-
% first_index = FstIdx,
828-
% last_index = LstIdx,
829848
mem_table = Mt0,
830849
pending = Pend0,
831850
last_written_index_term = {LastWrittenIdx, _} = LWIdxTerm0,
@@ -1307,8 +1326,9 @@ release_resources(MaxOpenSegments,
13071326
%%% Local functions
13081327

13091328

1310-
schedule_compaction(SnapIdx, #?MODULE{cfg = #cfg{},
1311-
snapshot_state = SnapState,
1329+
schedule_compaction(SnapIdx, #?MODULE{cfg = #cfg{uid = _UId,
1330+
segment_writer = _SegWriter},
1331+
live_indexes = LiveIndexes,
13121332
reader = Reader0}) ->
13131333
case ra_log_reader:segment_refs(Reader0) of
13141334
[] ->
@@ -1320,29 +1340,34 @@ schedule_compaction(SnapIdx, #?MODULE{cfg = #cfg{},
13201340
SegRefs = lists:takewhile(fun ({{_Start, End}, _}) ->
13211341
End =< SnapIdx
13221342
end, lists:reverse(Compactable)),
1323-
SnapDir = ra_snapshot:current_snapshot_dir(SnapState),
1343+
% SnapDir = ra_snapshot:current_snapshot_dir(SnapState),
13241344

13251345
%% TODO: minor compactions should also delete / truncate
13261346
%% segments with completely overwritten indexes
13271347

13281348
Self = self(),
1329-
Fun = fun () ->
1330-
{ok, Indexes} = ra_snapshot:indexes(SnapDir),
1331-
{Delete, _} = lists:foldl(
1332-
fun ({Range, _} = S, {Del, Keep}) ->
1333-
case ra_seq:in_range(Range, Indexes) of
1334-
[] ->
1335-
{[S | Del], Keep};
1336-
_ ->
1337-
{Del, [S | Keep]}
1338-
end
1339-
end, {[], []}, SegRefs),
1340-
%% need to update the ra_servers list of seg refs _before_
1341-
%% the segments can actually be deleted
1342-
Self ! {ra_log_event,
1343-
{segments_to_be_deleted, Delete}},
1344-
ok
1345-
end,
1349+
Fun =
1350+
fun () ->
1351+
% {ok, Indexes} = ra_snapshot:indexes(SnapDir),
1352+
1353+
%% get all current segrefs
1354+
% AllSegRefs = my_segrefs(UId, SegWriter),
1355+
Delete = lists:foldl(
1356+
fun({Range, _} = S, Del) ->
1357+
case ra_seq:in_range(Range,
1358+
LiveIndexes) of
1359+
[] ->
1360+
[S | Del];
1361+
_ ->
1362+
Del
1363+
end
1364+
end, [], SegRefs),
1365+
%% need to update the ra_servers list of seg refs _before_
1366+
%% the segments can actually be deleted
1367+
Self ! {ra_log_event,
1368+
{segments_to_be_deleted, Delete}},
1369+
ok
1370+
end,
13461371

13471372
[{bg_work, Fun, fun (_Err) ->
13481373
?WARN("bgwork err ~p", [_Err]), ok

src/ra_log_reader.erl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,17 @@ close(#?STATE{open_segments = Open}) ->
103103
_ = ra_flru:evict_all(Open),
104104
ok.
105105

106-
-spec update_segments([segment_ref()], state()) -> state().
106+
-spec update_segments([segment_ref()], state()) ->
107+
{state(), OverwrittenSegments :: [segment_ref()]}.
107108
update_segments(NewSegmentRefs,
108-
#?STATE{open_segments = Open0,
109+
#?STATE{cfg = _Cfg,
110+
open_segments = Open0,
109111
segment_refs = SegRefs0} = State) ->
110112

111113
SegmentRefs0 = ra_lol:to_list(SegRefs0),
112114
%% TODO: capture segrefs removed by compact_segrefs/2 and delete them
113115
SegmentRefsComp = compact_segrefs(NewSegmentRefs, SegmentRefs0),
116+
OverwrittenSegments = NewSegmentRefs -- SegmentRefsComp,
114117
SegmentRefsCompRev = lists:reverse(SegmentRefsComp),
115118
SegRefs = ra_lol:from_list(fun seg_ref_gt/2, SegmentRefsCompRev),
116119
Range = case SegmentRefsComp of
@@ -129,9 +132,10 @@ update_segments(NewSegmentRefs,
129132
error -> Acc0
130133
end
131134
end, Open0, NewSegmentRefs),
132-
State#?MODULE{segment_refs = SegRefs,
135+
{State#?MODULE{segment_refs = SegRefs,
133136
range = Range,
134-
open_segments = Open}.
137+
open_segments = Open},
138+
OverwrittenSegments}.
135139

136140
-record(log_compaction_result,
137141
{%range :: ra:range(),

test/ra_log_2_SUITE.erl

Lines changed: 77 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ all_tests() ->
6767
updated_segment_can_be_read,
6868
open_segments_limit,
6969
write_config,
70-
sparse_write
70+
sparse_write,
71+
overwritten_segment_is_cleared,
72+
overwritten_segment_is_cleared_on_init
7173
].
7274

7375
groups() ->
@@ -665,11 +667,7 @@ updated_segment_can_be_read(Config) ->
665667
% this should return all entries
666668
{Entries1, _} = ra_log_take(1, 15, Log4),
667669
?assertEqual(15, length(Entries1)),
668-
ct:pal("Entries: ~p", [Entries]),
669-
ct:pal("Entries1: ~p", [Entries1]),
670-
ct:pal("Counters ~p", [ra_counters:overview(?FUNCTION_NAME)]),
671670
?assertEqual(15, length(Entries1)),
672-
% l18 = length(Entries1),
673671
ok.
674672

675673
cache_overwrite_then_take(Config) ->
@@ -759,7 +757,6 @@ last_index_reset_before_written(Config) ->
759757
end),
760758
4 = ra_log:next_index(Log3),
761759
{3, 1} = ra_log:last_index_term(Log3),
762-
% #{cache_size := 0} = ra_log:overview(Log3),
763760
ok.
764761

765762
recovery(Config) ->
@@ -1606,6 +1603,53 @@ sparse_write(Config) ->
16061603
{9, _, _}], ResReInit),
16071604
ok.
16081605

1606+
overwritten_segment_is_cleared(Config) ->
1607+
Log0 = ra_log_init(Config, #{}),
1608+
% write a few entries
1609+
Log1 = write_and_roll(1, 256, 1, Log0),
1610+
Log2 = assert_log_events(Log1,
1611+
fun(L) ->
1612+
#{num_segments := N} = ra_log:overview(L),
1613+
N == 2
1614+
end),
1615+
Log3 = write_and_roll(128, 256 + 128, 2, Log2),
1616+
UId = ?config(uid, Config),
1617+
Log = assert_log_events(Log3,
1618+
fun(L) ->
1619+
#{num_segments := N} = ra_log:overview(L),
1620+
N == 3 andalso
1621+
3 == length(ra_log_segment_writer:my_segments(ra_log_segment_writer, UId))
1622+
end),
1623+
1624+
ct:pal("Log overview ~p", [ra_log:overview(Log)]),
1625+
ok.
1626+
1627+
overwritten_segment_is_cleared_on_init(Config) ->
1628+
Log0 = ra_log_init(Config, #{}),
1629+
% write a few entries
1630+
Log1 = write_and_roll(1, 256, 1, Log0),
1631+
Log2 = assert_log_events(Log1,
1632+
fun(L) ->
1633+
#{num_segments := N} = ra_log:overview(L),
1634+
N == 2
1635+
end),
1636+
Log3 = write_n(128, 256 + 128, 2, Log2),
1637+
ok = ra_log_wal:force_roll_over(ra_log_wal),
1638+
ra_log:close(Log3),
1639+
% _Log3 = write_and_roll(128, 256 + 128, 2, Log2),
1640+
UId = ?config(uid, Config),
1641+
timer:sleep(1000),
1642+
flush(),
1643+
Log = ra_log_init(Config, #{}),
1644+
1645+
ct:pal("my segments ~p",
1646+
[ra_log_segment_writer:my_segments(ra_log_segment_writer, UId)]),
1647+
ct:pal("Log overview ~p", [ra_log:overview(Log)]),
1648+
?assertEqual(3, length(
1649+
ra_log_segment_writer:my_segments(ra_log_segment_writer, UId))),
1650+
1651+
ok.
1652+
16091653
validate_fold(From, To, Term, Log0) ->
16101654
{Entries0, Log} = ra_log:fold(From, To,
16111655
fun (E, A) -> [E | A] end,
@@ -1728,7 +1772,11 @@ deliver_all_log_events(Log0, Timeout) ->
17281772
P ! E,
17291773
Acc;
17301774
({next_event, {ra_log_event, E}}, Acc0) ->
1731-
{Acc, _} = ra_log:handle_event(E, Acc0),
1775+
{Acc, Effs} = ra_log:handle_event(E, Acc0),
1776+
run_effs(Effs),
1777+
Acc;
1778+
({bg_work, Fun, _}, Acc) ->
1779+
Fun(),
17321780
Acc;
17331781
(_, Acc) ->
17341782
Acc
@@ -1743,29 +1791,30 @@ assert_log_events(Log0, AssertPred) ->
17431791
assert_log_events(Log0, AssertPred, 2000).
17441792

17451793
assert_log_events(Log0, AssertPred, Timeout) ->
1746-
receive
1747-
{ra_log_event, Evt} ->
1748-
ct:pal("log evt: ~p", [Evt]),
1749-
{Log1, Effs} = ra_log:handle_event(Evt, Log0),
1750-
run_effs(Effs),
1751-
%% handle any next events
1752-
Log = lists:foldl(
1753-
fun ({next_event, {ra_log_event, E}}, Acc0) ->
1754-
{Acc, Effs} = ra_log:handle_event(E, Acc0),
1755-
run_effs(Effs),
1756-
Acc;
1757-
(_, Acc) ->
1758-
Acc
1759-
end, Log1, Effs),
1760-
case AssertPred(Log) of
1761-
true ->
1762-
Log;
1763-
false ->
1794+
case AssertPred(Log0) of
1795+
true ->
1796+
Log0;
1797+
false ->
1798+
receive
1799+
{ra_log_event, Evt} ->
1800+
ct:pal("log evt: ~p", [Evt]),
1801+
{Log1, Effs} = ra_log:handle_event(Evt, Log0),
1802+
run_effs(Effs),
1803+
%% handle any next events
1804+
Log = lists:foldl(
1805+
fun ({next_event, {ra_log_event, E}}, Acc0) ->
1806+
{Acc, Effs} = ra_log:handle_event(E, Acc0),
1807+
run_effs(Effs),
1808+
Acc;
1809+
(_, Acc) ->
1810+
Acc
1811+
end, Log1, Effs),
17641812
assert_log_events(Log, AssertPred, Timeout)
1813+
1814+
after Timeout ->
1815+
flush(),
1816+
exit({assert_log_events_timeout, Log0})
17651817
end
1766-
after Timeout ->
1767-
flush(),
1768-
exit({assert_log_events_timeout, Log0})
17691818
end.
17701819

17711820
wait_for_segments(Log0, Timeout) ->

0 commit comments

Comments
 (0)