diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index d8f87238892..46e4a6e943c 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -297,17 +297,25 @@ queue_fetch_loop(#fetch_st{} = St) -> {changes, ChangesManager, Changes, ReportSeq} -> % Find missing revisions (POST to _revs_diff) {IdRevs, RdSt1} = find_missing(Changes, Target, Parent, RdSt), - {Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs, Parent, BgSt), - % Documents without attachments can be uploaded right away - BatchFun = fun({_, #doc{} = Doc}) -> - ok = gen_server:call(Parent, {batch_doc, Doc}, infinity) + % Filter out and handle design docs individually + DDocFilter = fun + ({<>, _Rev}, _PAs) -> true; + ({_Id, _Rev}, _PAs) -> false end, - lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))), - % Fetch individually if _bulk_get failed or there are attachments + DDocIdRevs = maps:filter(DDocFilter, IdRevs), FetchFun = fun({Id, Rev}, PAs) -> ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, infinity) end, - maps:map(FetchFun, maps:without(maps:keys(Docs), IdRevs)), + maps:map(FetchFun, DDocIdRevs), + % IdRevs1 is all the docs without design docs. Bulk get those. + IdRevs1 = maps:without(maps:keys(DDocIdRevs), IdRevs), + {Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs1, Parent, BgSt), + BatchFun = fun({_, #doc{} = Doc}) -> + ok = gen_server:call(Parent, {batch_doc, Doc}, infinity) + end, + lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))), + % Invidually upload docs with attachments. + maps:map(FetchFun, maps:without(maps:keys(Docs), IdRevs1)), {ok, Stats} = gen_server:call(Parent, flush, infinity), ok = report_seq_done(Cp, ReportSeq, Stats), couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]), diff --git a/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl index 2ecd0f4ee91..f0d9569db43 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl @@ -26,7 +26,11 @@ bulk_get_test_() -> fun couch_replicator_test_helper:test_teardown/1, [ ?TDEF_FE(use_bulk_get), + ?TDEF_FE(use_bulk_get_with_ddocs), + ?TDEF_FE(use_bulk_get_with_attachments), ?TDEF_FE(dont_use_bulk_get), + ?TDEF_FE(dont_use_bulk_get_ddocs), + ?TDEF_FE(dont_use_bulk_get_attachments), ?TDEF_FE(job_enable_overrides_global_disable), ?TDEF_FE(global_disable_works) ] @@ -39,7 +43,33 @@ use_bulk_get({_Ctx, {Source, Target}}) -> replicate(Source, Target, true), BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3), JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6), + DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4), ?assertEqual(0, JustGets), + ?assertEqual(0, DocUpdates), + ?assert(BulkGets >= 1), + compare_dbs(Source, Target). + +use_bulk_get_with_ddocs({_Ctx, {Source, Target}}) -> + populate_db_ddocs(Source, ?DOC_COUNT), + meck:new(couch_replicator_api_wrap, [passthrough]), + replicate(Source, Target, true), + BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3), + JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6), + DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4), + ?assertEqual(?DOC_COUNT, JustGets), + ?assertEqual(?DOC_COUNT, DocUpdates), + ?assert(BulkGets >= 1), + compare_dbs(Source, Target). + +use_bulk_get_with_attachments({_Ctx, {Source, Target}}) -> + populate_db_atts(Source, ?DOC_COUNT), + meck:new(couch_replicator_api_wrap, [passthrough]), + replicate(Source, Target, true), + BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3), + JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6), + DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4), + ?assertEqual(?DOC_COUNT, JustGets), + ?assertEqual(?DOC_COUNT, DocUpdates), ?assert(BulkGets >= 1), compare_dbs(Source, Target). @@ -49,10 +79,36 @@ dont_use_bulk_get({_Ctx, {Source, Target}}) -> replicate(Source, Target, false), BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3), JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6), + DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4), ?assertEqual(0, BulkGets), + ?assertEqual(0, DocUpdates), ?assertEqual(?DOC_COUNT, JustGets), compare_dbs(Source, Target). +dont_use_bulk_get_ddocs({_Ctx, {Source, Target}}) -> + populate_db_ddocs(Source, ?DOC_COUNT), + meck:new(couch_replicator_api_wrap, [passthrough]), + replicate(Source, Target, false), + BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3), + JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6), + DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4), + ?assertEqual(0, BulkGets), + ?assertEqual(?DOC_COUNT, JustGets), + ?assertEqual(?DOC_COUNT, DocUpdates), + compare_dbs(Source, Target). + +dont_use_bulk_get_attachments({_Ctx, {Source, Target}}) -> + populate_db_atts(Source, ?DOC_COUNT), + meck:new(couch_replicator_api_wrap, [passthrough]), + replicate(Source, Target, false), + BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3), + JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6), + DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4), + ?assertEqual(0, BulkGets), + ?assertEqual(?DOC_COUNT, JustGets), + ?assertEqual(?DOC_COUNT, DocUpdates), + compare_dbs(Source, Target). + job_enable_overrides_global_disable({_Ctx, {Source, Target}}) -> populate_db(Source, ?DOC_COUNT), Persist = false, @@ -78,10 +134,31 @@ global_disable_works({_Ctx, {Source, Target}}) -> compare_dbs(Source, Target). populate_db(DbName, DocCount) -> - Fun = fun(Id, Acc) -> [#doc{id = integer_to_binary(Id)} | Acc] end, + IdFun = fun(Id) -> integer_to_binary(Id) end, + Fun = fun(Id, Acc) -> [#doc{id = IdFun(Id)} | Acc] end, + Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)), + {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]). + +populate_db_ddocs(DbName, DocCount) -> + IdFun = fun(Id) -> <<"_design/", (integer_to_binary(Id))/binary>> end, + Fun = fun(Id, Acc) -> [#doc{id = IdFun(Id)} | Acc] end, Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)), {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]). +populate_db_atts(DbName, DocCount) -> + IdFun = fun(Id) -> integer_to_binary(Id) end, + Fun = fun(Id, Acc) -> [#doc{id = IdFun(Id), atts = [att(<<"a">>)]} | Acc] end, + Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)), + {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]). + +att(Name) when is_binary(Name) -> + couch_att:new([ + {name, Name}, + {att_len, 1}, + {type, <<"app/binary">>}, + {data, <<"x">>} + ]). + compare_dbs(Source, Target) -> couch_replicator_test_helper:cluster_compare_dbs(Source, Target).