Skip to content

Commit

Permalink
refactor: get ready to support multiple topics in one wolff_producers
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Jul 20, 2024
1 parent fe084dc commit 3c4a18b
Showing 1 changed file with 111 additions and 72 deletions.
183 changes: 111 additions & 72 deletions src/wolff_producers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
-define(rediscover_client_delay, 1000).
-define(init_producers, init_producers).
-define(init_producers_delay, 1000).
-define(not_initialized, not_initialized).
-define(not_initialized(Reason), {not_initialized, Reason}).
-define(initialized, initialized).
-define(partition_count_refresh_interval_seconds, 300).
-define(refresh_partition_count, refresh_partition_count).
Expand Down Expand Up @@ -133,12 +133,12 @@ start_supervised(ClientId, Topic, ProducerCfg) ->
ID = ?GROUPED_TOPIC(Group, Topic),
case wolff_producers_sup:ensure_present(ClientId, ID, ProducerCfg) of
{ok, Pid} ->
case gen_server:call(Pid, get_workers, infinity) of
?not_initialized ->
case gen_server:call(Pid, get_status, infinity) of
#{Topic := ?not_initialized(Reason)} ->
%% This means wolff_client failed to fetch metadata
%% for this topic.
_ = wolff_producers_sup:ensure_absence(ClientId, ID),
{error, failed_to_initialize_producers_in_time};
{error, Reason};
_ ->
{ok, #{client_id => ClientId,
group => Group,
Expand Down Expand Up @@ -251,68 +251,66 @@ init({ClientId, ?GROUPED_TOPIC(_Group, Topic), Config}) ->
self() ! ?rediscover_client,
{ok, #{client_id => ClientId,
client_pid => false,
topic => Topic,
config => Config,
producers_status => ?not_initialized,
producers_status => #{Topic => ?not_initialized(pending)},
refresh_tref => start_partition_refresh_timer(Config)
}}.

handle_info(?refresh_partition_count, #{refresh_tref := Tref, config := Config} = St0) ->
handle_info(?refresh_partition_count, #{refresh_tref := Tref, config := Config} = St) ->
%% this message can be sent from anywhere,
%% so we should ensure the timer is cancelled before starting a new one
%% but we do not care to flush an already expired timer because
%% one extra refresh does no harm
ok = ensure_timer_cancelled(Tref),
St = refresh_partition_count(St0),
ok = refresh_partition_count(St),
{noreply, St#{refresh_tref := start_partition_refresh_timer(Config)}};
handle_info(?rediscover_client, #{client_id := ClientId,
client_pid := false,
config := Config,
topic := Topic
} = St0) ->
handle_info(?rediscover_client, #{client_pid := false, client_id := ClientId} = St0) ->
St1 = St0#{?rediscover_client_tref => false},
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
_ = erlang:monitor(process, Pid),
St2 = St1#{client_pid := Pid},
St3 = maybe_init_producers(St2),
St = maybe_restart_producers(St3),
St = init_producers(St2),
ok = maybe_restart_producers(St2),
{noreply, St};
{error, Reason} ->
log_error("failed_to_discover_client",
#{reason => Reason,
topic => Topic,
group => get_group(Config),
client_id => ClientId}),
producer_id => producer_id(St0)}),
{noreply, ensure_rediscover_client_timer(St1)}
end;
handle_info(?init_producers, St) ->
%% this is a retry of last failure when initializing producer procs
{noreply, maybe_init_producers(St)};
{noreply, init_producers(St)};
handle_info({'DOWN', _, process, Pid, Reason}, #{client_id := ClientId,
client_pid := Pid,
config := Config,
topic := Topic
client_pid := Pid
} = St) ->
log_error("client_pid_down", #{client_id => ClientId,
topic => Topic,
group => get_group(Config),
client_pid => Pid,
producer_id => producer_id(St),
reason => Reason}),
%% client down, try to discover it after a delay
%% producers should all monitor client pid,
%% expect their 'EXIT' signals soon
{noreply, ensure_rediscover_client_timer(St#{client_pid := false})};
handle_info({'EXIT', Pid, Reason},
#{topic := Topic,
client_id := ClientId,
#{client_id := ClientId,
client_pid := ClientPid,
producers_status := Status,
config := Config
} = St) ->
Group = get_group(Config),
case find_partition_by_pid(Pid) of
[] ->
%% this should not happen, hence error level
log_error("unknown_EXIT_message", #{pid => Pid, reason => Reason, topic => Topic, client_id => ClientId, group => Group});
[Partition] ->
log_error("unknown_EXIT_message",
#{pid => Pid,
reason => Reason,
producer_id => producer_id(St)});
[{Group, Topic, Partition}] ->
%% assert
Group = get_group(Config),
%% assert
true = is_map_key(Topic, Status),
case is_alive(ClientPid) of
true ->
%% wolff_producer is not designed to crash & restart
Expand All @@ -331,17 +329,13 @@ handle_info({'EXIT', Pid, Reason},
end,
{noreply, St};
handle_info(Info, St) ->
#{client_id := ClientId, config := Config, topic := Topic} = St,
Group = get_group(Config),
log_error("unknown_info", #{info => Info, topic => Topic, group => Group, client_id => ClientId}),
log_error("unknown_info", #{info => Info, producer_id => producer_id(St)}),
{noreply, St}.

handle_call(get_workers, _From, #{producers_status := Status} = St) ->
handle_call(get_status, _From, #{producers_status := Status} = St) ->
{reply, Status, St};
handle_call(Call, From, St) ->
#{client_id := ClientId, config := Config, topic := Topic} = St,
Group = get_group(Config),
log_error("unknown_call", #{call => Call, from => From, topic => Topic, group => Group, client_id => ClientId}),
log_error("unknown_call", #{call => Call, from => From, producer_id => producer_id(St)}),
{reply, {error, unknown_call}, St}.

handle_cast(Cast, St) ->
Expand All @@ -351,9 +345,9 @@ handle_cast(Cast, St) ->
code_change(_OldVsn, St, _Extra) ->
{ok, St}.

terminate(_, #{client_id := ClientId, topic := Topic, config := Config}) ->
Group = get_group(Config),
ok = cleanup_workers_table(ClientId, ?GROUPED_TOPIC(Group, Topic)).
terminate(_, #{client_id := ClientId} = St) ->
ID = supervisor_child_id(St),
ok = cleanup_workers_table(ClientId, ID).

ensure_rediscover_client_timer(#{?rediscover_client_tref := false} = St) ->
Tref = erlang:send_after(?rediscover_client_delay, self(), ?rediscover_client),
Expand All @@ -376,28 +370,43 @@ start_link_producers(ClientId, Topic, Connections, Config) ->
Acc#{Partition => WorkerPid}
end, #{}, Connections).

maybe_init_producers(#{producers_status := ?not_initialized,
topic := Topic,
client_id := ClientId,
config := Config
} = St) ->
init_producers(#{producers_status := Status} = St) ->
NewStatus = init_producers(St, maps:to_list(Status), #{}, #{}),
St#{producers_status => NewStatus}.

init_producers(_St, [], OK, ERR) ->
case map_size(ERR) of
0 ->
ok;
_ ->
erlang:send_after(?init_producers_delay, self(), ?init_producers)
end,
maps:merge(OK, ERR);
init_producers(St, [{Topic, ?not_initialized(pending)} | More], OK, ERR) ->
#{client_id := ClientId, config := Config} = St,
case start_linked_producers(ClientId, Topic, Config) of
{ok, #{workers := Workers}} ->
ok = insert_producers(ClientId, get_group(Config), Topic, Workers),
St#{producers_status := ?initialized};
init_producers(St, More, OK#{Topic => ?initialized}, ERR);
{error, Reason} ->
log_error("failed_to_init_producers", #{topic => Topic, group => get_group(Config), reason => Reason}),
erlang:send_after(?init_producers_delay, self(), ?init_producers),
St
log_error("failed_to_init_producers",
#{producer_id => producer_id(St),
topic => Topic,
reason => Reason}),
init_producers(St, More, OK, ERR#{Topic => ?not_initialized(Reason)})
end;
maybe_init_producers(St) ->
St.

maybe_restart_producers(#{producers_status := ?not_initialized} = St) -> St;
maybe_restart_producers(#{client_id := ClientId,
topic := Topic,
config := Config
} = St) ->
init_producers(St, [{Topic, ?initialized} | More], OK, ERR) ->
init_producers(St, More, OK#{Topic => ?initialized}, ERR).

maybe_restart_producers(#{producers_status := Status} = St) ->
ok = maybe_restart_producers(St, maps:to_list(Status)).

maybe_restart_producers(_St, []) ->
ok;
maybe_restart_producers(St, [{_Topic, ?not_initialized(_Reason)} | More]) ->
%% This producers of topic is yet to be initialized, ignore for now
maybe_restart_producers(St, More);
maybe_restart_producers(#{client_id := ClientId, config := Config} = St, [{Topic, ?initialized} | More]) ->
Producers = find_producers_by_client_topic(ClientId, get_group(Config), Topic),
lists:foreach(
fun({Partition, Pid}) ->
Expand All @@ -406,7 +415,7 @@ maybe_restart_producers(#{client_id := ClientId,
false -> start_producer_and_insert_pid(ClientId, get_group(Config), Topic, Partition, Config)
end
end, Producers),
St.
maybe_restart_producers(St, More).

-spec cleanup_workers_table(wolff:client_id(), id()) -> ok.
cleanup_workers_table(ClientId, ?GROUPED_TOPIC(GName, Topic)) ->
Expand Down Expand Up @@ -434,7 +443,7 @@ find_producers_by_client_topic(ClientId, Group, Topic) ->
ets:select(?WOLFF_PRODUCERS_GLOBAL_TABLE, Ms).

find_partition_by_pid(Pid) ->
Ms = ets:fun2ms(fun({{_ClientId, _Group, _Topic, Partition}, P}) when P =:= Pid -> Partition end),
Ms = ets:fun2ms(fun({{_ClientId, Group, Topic, Partition}, P}) when P =:= Pid -> {Group, Topic, Partition} end),
ets:select(?WOLFF_PRODUCERS_GLOBAL_TABLE, Ms).

insert_producers(ClientId, Group, Topic, Workers0) ->
Expand All @@ -459,28 +468,32 @@ start_partition_refresh_timer(Config) ->
erlang:send_after(Interval, self(), ?refresh_partition_count)
end.

refresh_partition_count(#{client_pid := Pid} = St) when not is_pid(Pid) ->
refresh_partition_count(#{client_pid := Pid}) when not is_pid(Pid) ->
%% client is to be (re)discovered
St;
refresh_partition_count(#{producers_status := ?not_initialized} = St) ->
ok;
refresh_partition_count(#{producers_status := Status} = St) ->
ok = refresh_partition_count(St, maps:to_list(Status)).

refresh_partition_count(_St, []) ->
ok;
refresh_partition_count(St, [{_Topic, ?not_initialized(_)} | More]) ->
%% to be initialized
St;
refresh_partition_count(#{client_pid := Pid, topic := Topic, config := Config} = St) ->
refresh_partition_count(St, More);
refresh_partition_count(#{client_pid := Pid, config := Config} = St, [{Topic, ?initialized} | More]) ->
Group = get_group(Config),
MaxPartitions = maps:get(max_partitions, Config, ?all_partitions),
case wolff_client:get_leader_connections(Pid, Group, Topic, MaxPartitions) of
{ok, Connections} ->
start_new_producers(St, Connections);
start_new_producers(St, Topic, Connections);
{error, Reason} ->
log_warning("failed_to_refresh_partition_count_will_retry",
#{topic => Topic, group => Group, reason => Reason}),
St
end.
#{topic => Topic, group => Group, reason => Reason})
end,
refresh_partition_count(St, More).

start_new_producers(#{client_id := ClientId,
topic := Topic,
config := Config
} = St, Connections0) ->
}, Topic, Connections0) ->
Group = get_group(Config),
NowCount = length(Connections0),
%% process only the newly discovered connections
Expand All @@ -502,9 +515,7 @@ start_new_producers(#{client_id := ClientId,
ok = put_partition_cnt(ClientId, Group, Topic, NowCount);
false ->
ok
end,
St.

end.

-if(OTP_RELEASE >= "26").
get_partition_cnt(ClientId, Group, Topic) ->
Expand All @@ -529,3 +540,31 @@ ensure_timer_cancelled(Tref) when is_reference(Tref) ->
ok;
ensure_timer_cancelled(_) ->
ok.

supervisor_child_id(St) ->
producer_id(St, supervisor).

producer_id(St) ->
producer_id(St, readable).

producer_id(#{config := Config, producers_status := Status}, Format) ->
Group = get_group(Config),
Topics = maps:keys(Status),
ID = get_producer_id(Group, Topics),
case Format of
readable ->
case ID of
?GROUPED_TOPIC(?NO_GROUP, Topic) ->
Topic;
{_, Topic} ->
<<Group/binary, $:, Topic/binary>>
end;
supervisor ->
ID
end.

get_producer_id(?NO_GROUP, Topics) ->
1 = length(Topics),
?GROUPED_TOPIC(?NO_GROUP, hd(Topics));
get_producer_id(Group, [Topic]) ->
?GROUPED_TOPIC(Group, Topic).

0 comments on commit 3c4a18b

Please sign in to comment.