diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index c5b02f6eb9c4..5af31777d872 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -16,7 +16,7 @@ -export([compact_file/2, truncate_file/4, delete_file/2]). %% internal --export([scan_file_for_valid_messages/1]). %% salvage tool +-export([scan_file_for_valid_messages/1, scan_file_for_valid_messages/2]). %% salvage tool -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/4, prioritise_cast/3, @@ -1472,15 +1472,17 @@ list_sorted_filenames(Dir, Ext) -> -define(SCAN_BLOCK_SIZE, 4194304). %% 4MB -scan_file_for_valid_messages(Dir, FileName) -> - scan_file_for_valid_messages(form_filename(Dir, FileName)). - +%% Exported as a salvage tool. Not as accurate as node recovery +%% because it doesn't have the queue index. scan_file_for_valid_messages(Path) -> + scan_file_for_valid_messages(Path, fun(_) -> valid end). + +scan_file_for_valid_messages(Path, Fun) -> case file:open(Path, [read, binary, raw]) of {ok, Fd} -> {ok, FileSize} = file:position(Fd, eof), {ok, _} = file:position(Fd, bof), - Messages = scan(<<>>, Fd, 0, FileSize, #{}, []), + Messages = scan(<<>>, Fd, Fun, 0, FileSize, #{}, []), ok = file:close(Fd), case Messages of [] -> @@ -1496,7 +1498,7 @@ scan_file_for_valid_messages(Path) -> Reason}} end. -scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) -> +scan(Buffer, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) -> case file:read(Fd, ?SCAN_BLOCK_SIZE) of eof -> Acc; @@ -1505,12 +1507,12 @@ scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) -> <<>> -> Data0; _ -> <> end, - scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc) + scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) end. %% Message might have been found. scan_data(<> = Data, - Fd, Offset, FileSize, MsgIdsFound, Acc) + Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) when Size >= 16 -> <> = MsgIdAndMsg, case MsgIdsFound of @@ -1519,26 +1521,34 @@ scan_data(<> = Data, %% simply be a coincidence. Try the next byte. #{MsgIdInt := true} -> <<_, Rest2/bits>> = Data, - scan_data(Rest2, Fd, Offset + 1, FileSize, MsgIdsFound, Acc); + scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc); %% Data looks to be a message. _ -> %% Avoid sub-binary construction. MsgId = <>, TotalSize = Size + 9, - scan_data(Rest, Fd, Offset + TotalSize, FileSize, - MsgIdsFound#{MsgIdInt => true}, - [{MsgId, TotalSize, Offset}|Acc]) + Tuple = {MsgId, TotalSize, Offset}, + case Fun(Tuple) of + %% Confirmed to be a message by the provided fun. + valid -> + scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize, + MsgIdsFound#{MsgIdInt => true}, [Tuple|Acc]); + %% Not a message, try the next byte. + invalid -> + <<_, Rest2/bits>> = Data, + scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc) + end end; %% This might be the start of a message. -scan_data(<> = Data, Fd, Offset, FileSize, MsgIdsFound, Acc) +scan_data(<> = Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) when byte_size(Rest) < Size + 1, Size < FileSize - Offset -> - scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc); -scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc) + scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc); +scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) when byte_size(Data) < 8 -> - scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc); + scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc); %% This is definitely not a message. Try the next byte. -scan_data(<<_, Rest/bits>>, Fd, Offset, FileSize, MsgIdsFound, Acc) -> - scan_data(Rest, Fd, Offset + 1, FileSize, MsgIdsFound, Acc). +scan_data(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) -> + scan_data(Rest, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc). %%---------------------------------------------------------------------------- %% Ets index @@ -1742,39 +1752,37 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit}, build_index_worker(Gatherer, #msstate { index_ets = IndexEts, dir = Dir }, File, Files) -> - FileName = filenum_to_name(File), + Path = form_filename(Dir, filenum_to_name(File)), rabbit_log:debug("Rebuilding message location index from ~ts (~B file(s) remaining)", - [form_filename(Dir, FileName), length(Files)]), + [Path, length(Files)]), %% The scan function already dealt with duplicate messages - %% within the file. We then get messages in reverse order. - {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, FileName), - %% Valid messages are in file order so the last message is - %% the last message from the list. - {ValidMessages, ValidTotalSize} = - lists:foldl( - fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - %% Fan-out may result in the same message data in multiple - %% files so we have to guard against it. - case index_lookup(IndexEts, MsgId) of - #msg_location { file = undefined } = StoreEntry -> - ok = index_update(IndexEts, StoreEntry #msg_location { - file = File, offset = Offset, - total_size = TotalSize }), - {[Obj | VMAcc], VTSAcc + TotalSize}; - _ -> - {VMAcc, VTSAcc} - end - end, {[], 0}, Messages), + %% within the file, and only returns valid messages (we do + %% the index lookup in the fun). But we get messages in reverse order. + {ok, Messages, FileSize} = scan_file_for_valid_messages(Path, + fun ({MsgId, TotalSize, Offset}) -> + %% Fan-out may result in the same message data in multiple + %% files so we have to guard against it. + case index_lookup(IndexEts, MsgId) of + #msg_location { file = undefined } = StoreEntry -> + ok = index_update(IndexEts, StoreEntry #msg_location { + file = File, offset = Offset, + total_size = TotalSize }), + valid; + _ -> + invalid + end + end), + ValidTotalSize = lists:foldl(fun({_, TotalSize, _}, Acc) -> Acc + TotalSize end, 0, Messages), FileSize1 = case Files of %% if it's the last file, we'll truncate to remove any %% rubbish above the last valid message. This affects the %% file size. - [] -> case ValidMessages of + [] -> case Messages of [] -> 0; - _ -> {_MsgId, TotalSize, Offset} = - lists:last(ValidMessages), + %% Messages is in reverse order so the first in the list + %% is the last message in the file. + [{_, TotalSize, Offset}|_] -> Offset + TotalSize end; [_|_] -> FileSize @@ -1933,7 +1941,7 @@ compact_file(File, State = #gc_state { index_ets = IndexEts, %% Load the messages. It's possible to get 0 messages here; %% that's OK. That means we have little to do as the file is %% about to be deleted. - {Messages, _} = scan_and_vacuum_message_file(File, State), + Messages = scan_and_vacuum_message_file(File, State), %% Blank holes. We must do this first otherwise the file is left %% with data that may confuse the code (for example data that looks %% like a message, isn't a message, but spans over a real message). @@ -2076,9 +2084,9 @@ truncate_file(File, Size, ThresholdTimestamp, #gc_state{ file_summary_ets = File -spec delete_file(non_neg_integer(), gc_state()) -> ok | defer. -delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, - file_handles_ets = FileHandlesEts, - dir = Dir }) -> +delete_file(File, #gc_state { file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, + dir = Dir }) -> case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of {[_|_], _Cont} -> rabbit_log:debug("Asked to delete file ~p but it has active readers. Deferring.", @@ -2087,7 +2095,8 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, _ -> [#file_summary{ valid_total_size = 0, file_size = FileSize }] = ets:lookup(FileSummaryEts, File), - {[], 0} = scan_and_vacuum_message_file(File, State), +%% @todo What do? +% [] = scan_and_vacuum_message_file(File, State), ok = file:delete(form_filename(Dir, filenum_to_name(File))), true = ets:delete(FileSummaryEts, File), rabbit_log:debug("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]), @@ -2096,28 +2105,28 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, scan_and_vacuum_message_file(File, #gc_state{ index_ets = IndexEts, dir = Dir }) -> %% Messages here will be end-of-file at start-of-list - {ok, Messages, _FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), - %% foldl will reverse so will end up with msgs in ascending offset order - lists:foldl( - fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) -> - case index_lookup(IndexEts, MsgId) of - #msg_location { file = File, total_size = TotalSize, - offset = Offset, ref_count = 0 } = Entry -> - index_delete_object(IndexEts, Entry), - Acc; - #msg_location { file = File, total_size = TotalSize, - offset = Offset } = Entry -> - {[ Entry | List ], TotalSize + Size}; - %% Fan-out may remove the entry but also write a new - %% entry in a different file when it needs to write - %% a message and the existing reference is in a file - %% that's about to be deleted. So we explicitly accept - %% these cases and ignore this message. - #msg_location { file = OtherFile, total_size = TotalSize } - when File =/= OtherFile -> - Acc; - not_found -> - Acc - end - end, {[], 0}, Messages). + Path = form_filename(Dir, filenum_to_name(File)), + {ok, Messages, _FileSize} = scan_file_for_valid_messages(Path, + fun ({MsgId, TotalSize, Offset}) -> + case index_lookup(IndexEts, MsgId) of + #msg_location { file = File, total_size = TotalSize, + offset = Offset, ref_count = 0 } = Entry -> + index_delete_object(IndexEts, Entry), + invalid; + #msg_location { file = File, total_size = TotalSize, + offset = Offset } -> + valid; + %% Fan-out may remove the entry but also write a new + %% entry in a different file when it needs to write + %% a message and the existing reference is in a file + %% that's about to be deleted. So we explicitly accept + %% these cases and ignore this message. + #msg_location { file = OtherFile, total_size = TotalSize } + when File =/= OtherFile -> + invalid; + not_found -> + invalid + end + end), + %% @todo Do we really need to reverse messages? + lists:reverse(Messages). diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 10129201b9dc..363bfacfccd6 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -629,6 +629,22 @@ msg_store_file_scan1(Config) -> %% Messages with no content. ok = Scan([{bin, <<0:64, "deadbeefdeadbeef", 255>>}]), ok = Scan([{msg, gen_id(), <<>>}]), + %% Tricky messages. + %% + %% These only get properly detected when the index is populated. + %% In this test case we simulate the index with a fun. + TrickyScan = fun (Blocks, Expected, Fun) -> + Path = gen_msg_file(Config, Blocks), + Result = rabbit_msg_store:scan_file_for_valid_messages(Path, Fun), + case Result of + Expected -> ok; + _ -> {expected, Expected, got, Result} + end + end, + ok = TrickyScan( + [{bin, <<0, 0:48, 17, 17, "idididididididid", 255, 0:4352/unit:8, 255>>}], + {ok, [{<<"idididididididid">>, 4378, 1}], 4379}, + fun({<<"idididididididid">>, 4378, 1}) -> valid; (_) -> invalid end), %% All good!! passed.