From eb9618ce2cff57c0d0f27a1575028a702830ad79 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 1 Sep 2023 15:29:13 +0200 Subject: [PATCH 1/3] feat: start port programs on demand and not always when the jq app starts --- src/jq_app.erl | 4 ---- src/jq_port.erl | 31 ++++++++++++++++++++++--------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/jq_app.erl b/src/jq_app.erl index 2e54770..f36f433 100644 --- a/src/jq_app.erl +++ b/src/jq_app.erl @@ -16,10 +16,6 @@ start(_StartType, _StartArgs) -> application:get_env(jq, jq_port_nr_of_jq_port_servers, erlang:system_info(schedulers)), jq_port:set_nr_of_jq_port_servers(NrOfJQPortServers), Res = jq_port_sup:start_link(), - %% Configure the jq port servers once they are up and running - CacheMaxSize = - application:get_env(jq, jq_filter_program_lru_cache_max_size, 500), - jq:set_filter_program_lru_cache_max_size(CacheMaxSize), Res. stop(_State) -> diff --git a/src/jq_port.erl b/src/jq_port.erl index f83260c..6532efd 100644 --- a/src/jq_port.erl +++ b/src/jq_port.erl @@ -98,13 +98,19 @@ get_filter_program_lru_cache_max_size() -> end, do_op_ensure_started(Op). +set_filter_program_lru_cache_max_size(PortServer, NewSize) -> + gen_server:call(PortServer, + {set_filter_program_lru_cache_max_size, NewSize}, + infinity). + set_filter_program_lru_cache_max_size(NewSize) when is_integer(NewSize), NewSize >= 0, NewSize < 1073741824 -> Op = fun() -> - gen_server:call(port_server(), - {set_filter_program_lru_cache_max_size, NewSize}, - infinity) + Expect = [ok || _ <- lists:seq(0, jq_port:nr_of_jq_port_servers() - 1)], + Expect = [set_filter_program_lru_cache_max_size(port_server_by_id(Id), NewSize) || + Id <- lists:seq(0, jq_port:nr_of_jq_port_servers() - 1)], + ok end, do_op_ensure_started(Op). @@ -137,9 +143,8 @@ remove_from_lookup_table(Id) -> init(Id) -> process_flag(trap_exit, true), - Port = start_port_program(), State = #{ - port => Port, + port => port_not_started, id => Id, processed_json_calls => 0, restart_period => application:get_env(jq, jq_port_restart_period, 1000000) @@ -198,6 +203,8 @@ is_port_alive(Port) -> error({bad_ping_response, Other}) end. +kill_port(port_not_started) -> + ok; kill_port(Port) -> Port ! {self(), {command, <<"exit\0">>}}, erlang:port_close(Port), @@ -279,6 +286,14 @@ new_state_after_process_json(State) -> State#{processed_json_calls => NrOfCalls + 1} end. +handle_call(Call, From, #{port := port_not_started} = State) -> + StateWithPort = State#{port => start_port_program()}, + %% Configure the jq port server once it is up and running + CacheMaxSize = + application:get_env(jq, jq_filter_program_lru_cache_max_size, 500), + {reply, ok, NewState} = handle_call({set_filter_program_lru_cache_max_size, CacheMaxSize}, From, StateWithPort), + %% Do the original call with the updated state + handle_call(Call, From, NewState); handle_call({jq_process_json, FilterProgram, JSONText, TimeoutMs}, _From, State) -> Port = state_port(State), try @@ -354,11 +369,9 @@ terminate(_Reason, State) -> ok. handle_info({'EXIT', Port, Reason}, #{port := Port} = State) -> - logger:error(io_lib:format("jq port program has died unexpectedly for reason ~p (state = ~p) \nTrying to restart...", + logger:error(io_lib:format("jq port program has died unexpectedly for reason ~p (state = ~p) \nTrying to restart on next request...", [Reason, State])), - %% Let us try to start a new port - NewPort = start_port_program(), - {noreply, State#{port => NewPort}}; + {noreply, State#{port => port_not_started}}; handle_info({'EXIT', Port, _Reason}, State) when is_port(Port) -> %% Flush message from old port {noreply, State}; From 476e2f9f24be19f8b4e5b8104afa4c2b67bddb80 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 1 Sep 2023 17:55:47 +0200 Subject: [PATCH 2/3] feat: automatically turn off port program after configurable idle time --- README.md | 8 +++++++- src/jq_port.erl | 52 +++++++++++++++++++++++++++++++++++++++++++---- test/jq_tests.erl | 23 +++++++++++++++++++++ 3 files changed, 78 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 82f595c..1c5bc73 100644 --- a/README.md +++ b/README.md @@ -68,10 +68,16 @@ application is loaded (the jq Erlang application is loaded automatically when * `jq_port_restart_period` (default value = 1000000) (only relevant for the `jq_port` option) - Use this option to set how many `jq:process_json/2` calls a port program can process before it is restarted. This is a safety option - that can be handy if it turn out that the jq port program has memory leaks. + that can be handy if it turns out that the jq port program has memory leaks. As far as we know the port program does not have any memory leaks but it is possible that it is leaky for inputs that we have not tested (please report a bug if you find any leaks). +* `jq_port_auto_turn_off_time_seconds` (default value = 300) (only relevant for + the `jq_port` option) - Use this option to set how long time a port program + can be idle before it can be turned off. The port program will be automatically + turned on again when it is needed so this feature should be invisible from the + point of view of the user and is only meant to save resources. If this option + is set to 0 then the port programs will never be turned off automatically. ## Test with address sanitizer diff --git a/src/jq_port.erl b/src/jq_port.erl index 6532efd..6374a46 100644 --- a/src/jq_port.erl +++ b/src/jq_port.erl @@ -254,7 +254,7 @@ misbehaving_port_program(State, ErrorClass, Reason) -> end, OldPort = state_port(State), kill_port(OldPort), - NewPort = start_port_program(), + NewPort = port_not_started, %% will be started by the next call NewState = State#{port => NewPort}, {reply, Ret, NewState}. @@ -280,18 +280,43 @@ new_state_after_process_json(State) -> 0 -> OldPort = state_port(State), kill_port(OldPort), - NewPort = start_port_program(), + NewPort = port_not_started, State#{port => NewPort, processed_json_calls => NrOfCalls + 1}; _ -> State#{processed_json_calls => NrOfCalls + 1} end. +processed_json_calls_after_call({jq_process_json, _, _, _} = _Call, + #{processed_json_calls := CallCnt} = _State) -> + CallCnt + 1; +processed_json_calls_after_call(_Call, #{processed_json_calls := CallCnt}) -> + CallCnt. + +jq_port_auto_turn_off_time_ms() -> + application:get_env(jq, + jq_port_auto_turn_off_time_seconds, + 300) * 1000. + +maybe_send_idle_check_message(Port, NrOfProcessJSONCalls) -> + Timeout = jq_port_auto_turn_off_time_ms(), + case Timeout of + 0 -> ok; + _ -> + TimeoutMessage = {turn_off_port, + #{port => Port, + processed_json_calls_on_start => NrOfProcessJSONCalls}}, + {ok, _} = timer:send_after(Timeout, TimeoutMessage) + end. + handle_call(Call, From, #{port := port_not_started} = State) -> - StateWithPort = State#{port => start_port_program()}, + NewPort = start_port_program(), + StateWithPort = State#{port => NewPort}, %% Configure the jq port server once it is up and running CacheMaxSize = application:get_env(jq, jq_filter_program_lru_cache_max_size, 500), {reply, ok, NewState} = handle_call({set_filter_program_lru_cache_max_size, CacheMaxSize}, From, StateWithPort), + %% Set a timer to turn off the port after a while if idle + maybe_send_idle_check_message(NewPort, processed_json_calls_after_call(Call, NewState)), %% Do the original call with the updated state handle_call(Call, From, NewState); handle_call({jq_process_json, FilterProgram, JSONText, TimeoutMs}, _From, State) -> @@ -368,6 +393,25 @@ terminate(_Reason, State) -> remove_from_lookup_table({?MODULE, Id}), ok. +handle_info({turn_off_port, #{port := PortToTurnOff, + processed_json_calls_on_start := NrOfCallsOnStart}}, + #{port := StatePort, + processed_json_calls := NrOfCallsNow} = State) + when PortToTurnOff =:= StatePort, + NrOfCallsOnStart =:= NrOfCallsNow -> + %% Port is idle, stop it, it will be started on next request + kill_port(PortToTurnOff), + {noreply, State#{port => port_not_started}}; +handle_info({turn_off_port, #{port := PortToTurnOff}}, + #{port := StatePort, + processed_json_calls := NrOfCallsNow} = State) + when PortToTurnOff =:= StatePort -> + %% Port is not idle enough to be turned off, so we set a new timer + maybe_send_idle_check_message(StatePort, NrOfCallsNow), + {noreply, State}; +handle_info({turn_off_port, _}, State) -> + %% Flush message for old port + {noreply, State}; handle_info({'EXIT', Port, Reason}, #{port := Port} = State) -> logger:error(io_lib:format("jq port program has died unexpectedly for reason ~p (state = ~p) \nTrying to restart on next request...", [Reason, State])), @@ -387,7 +431,7 @@ handle_info(UnknownMessage, State) -> code_change(_OldVsn, State, _Extra) -> OldPort = state_port(State), kill_port(OldPort), - NewPort = start_port_program(), + NewPort = port_not_started, %% Will be started on next request NewState = State#{port => NewPort}, {ok, NewState}. diff --git a/test/jq_tests.erl b/test/jq_tests.erl index b65f933..cb7d53d 100644 --- a/test/jq_tests.erl +++ b/test/jq_tests.erl @@ -312,6 +312,29 @@ concurrent_queries_t_() -> concurrent_queries_test_() -> wrap_setup_cleanup(concurrent_queries_t_()). -ifndef(TEST_ONLY_NIF). + +port_program_turn_off_automatically_test_() -> + {timeout, 10, + fun() -> + %% This test tries to trigger relevant code paths, but + %% at the time off writing, one have to check manually + %% that the right code is triggered. + %% TODO: Use snabbkaffe to check that the right code is triggered + jq:set_implementation_module(jq_port), + application:set_env(jq, jq_port_auto_turn_off_time_seconds, 1), + {ok, [<<"42">>]} = jq:process_json(<<"42">>, <<"42">>), + timer:sleep(1100), + %% should be off now + {ok, [<<"42">>]} = jq:process_json(<<"42">>, <<"42">>), + {ok, [<<"42">>]} = jq:process_json(<<"42">>, <<"42">>), + timer:sleep(1100), + %% Should still be on now + {ok, [<<"42">>]} = jq:process_json(<<"42">>, <<"42">>), + ok + end}. + + + port_program_valgrind_test_() -> {timeout, 30, fun() -> From b1f1d5d927ff10d7a53bb25dd17078b3751f7c6b Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 4 Sep 2023 10:53:40 +0200 Subject: [PATCH 3/3] Code improvement due to @thalesmg's suggestion Co-authored-by: Thales Macedo Garitezi --- src/jq_port.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/jq_port.erl b/src/jq_port.erl index 6374a46..1e6131c 100644 --- a/src/jq_port.erl +++ b/src/jq_port.erl @@ -107,7 +107,7 @@ set_filter_program_lru_cache_max_size(NewSize) when is_integer(NewSize), NewSize >= 0, NewSize < 1073741824 -> Op = fun() -> - Expect = [ok || _ <- lists:seq(0, jq_port:nr_of_jq_port_servers() - 1)], + Expect = lists:duplicate(jq_port:nr_of_jq_port_servers(), ok), Expect = [set_filter_program_lru_cache_max_size(port_server_by_id(Id), NewSize) || Id <- lists:seq(0, jq_port:nr_of_jq_port_servers() - 1)], ok