Created
September 29, 2022 03:33
-
-
Save shino/f8716ac57d28a4a5b483674ed4b3e1c3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(repro). | |
-export([main/1]). | |
-export([start_server/1, | |
setup_logger/0]). | |
%% -behaviour(ra_machine). | |
-export([init/1, | |
apply/3]). | |
-export([write/3, | |
get_all/1]). | |
-include_lib("kernel/include/logger.hrl"). | |
-include_lib("common_test/include/ct.hrl"). | |
-define(CLUSTER_NAME, g). | |
-define(HOST, "127.0.0.1"). | |
%% Preparation: git clone ra and make, set the path to this argument | |
%% e.g. $ erlc repro.erl && erl -s repro main "/path/to/ra" -s init stop | |
main([PathToRa]) -> | |
{ok, _Pid} = net_kernel:start(['[email protected]', longnames]), | |
true = erlang:set_cookie(node(), spam), | |
_ = setup_logger(), | |
[ _ = file:del_dir_r(atom_to_list(Short) ++ "@" ++ ?HOST) || Short <- [n1, n2] ], | |
Args = pa_args(PathToRa), | |
{Pid1, N1} = start_node(n1, Args), | |
{Pid2, N2} = start_node(n2, Args), | |
{Pid3, N3} = start_node(n3, Args), | |
ok = erpc_call(N1, ra, start, []), | |
{ok, Started1, Failed1} = erpc_call(N1, ra, start_cluster, [default, ?CLUSTER_NAME, machine_config(), [id(N1)]]), | |
?LOG_INFO("ra start_cluster() at n1: started=~p, failed=~p", [Started1, Failed1]), | |
_ = add_member(N2, N1), | |
_ = timer:sleep(1000), | |
_ = add_member(N3, N1), | |
?LOG_INFO("ra:members() at n1: result=~p", [erpc_call(N1, ra, members, [id(N1)])]), | |
?LOG_INFO("ra:members() at n3: result=~p", [erpc_call(N3, ra, members, [id(N3)])]), | |
Writer = spawn_writer(N3, 50, 100), | |
_WriterMonitor = monitor(process, Writer), | |
%% Wait for writer do half jobs | |
receive | |
go -> | |
ok | |
end, | |
%% Stop n1 and join new node n4 | |
ok = stop_node(N1, Pid1), | |
{Pid4, N4} = start_node(n4, Args), | |
_ = add_member(N4, N2), | |
?LOG_INFO("ra:members() at n3: result=~p", [erpc_call(N3, ra, members, [id(N3)])]), | |
?LOG_INFO("ra:members() at n4: result=~p", [erpc_call(N4, ra, members, [id(N4)])]), | |
%% Wait for writer finish | |
receive | |
done -> | |
ok; | |
{'DOWN', _, _, _, _} = Message -> | |
?LOG_ERROR("writer process down, message=~p", [Message]) | |
end, | |
%% ?LOG_INFO("get_all() at n1: result=~p", [erpc_call(N1, ?MODULE, get_all, [N1])]), | |
%% ?LOG_INFO("get_all() at n3: result=~p", [erpc_call(N3, ?MODULE, get_all, [N3])]), | |
%% clean up | |
ok = peer:stop(Pid2), | |
ok = peer:stop(Pid3), | |
ok = peer:stop(Pid4), | |
?LOG_INFO("Finished!!", []), | |
timer:sleep(100), | |
ok. | |
add_member(NodeToAdd, Seed) -> | |
ok = erpc_call(NodeToAdd, ra, start, []), | |
AddRes = {ok, _, _} = erpc_call(NodeToAdd, ra, add_member, [id(Seed), id(NodeToAdd)]), | |
?LOG_INFO("ra:add_member() ~p to ~p: result=~p", [NodeToAdd, Seed, AddRes]), | |
StartServerRes = erpc_call(NodeToAdd, ?MODULE, start_server, [[NodeToAdd]]), | |
?LOG_INFO("ra:start_server() at ~p: result=~p", [NodeToAdd, StartServerRes]), | |
StartServerRes. | |
spawn_writer(Node, MidCount, Count) -> | |
Parent = self(), | |
spawn(fun() -> | |
Start = erlang:system_time(microsecond), | |
[ begin | |
WriteRes = erpc:call(Node, ?MODULE, write, [Node, I, I]), | |
[ ?LOG_INFO("Writer i=~p, result=~p", [I, WriteRes]) || WriteRes =/= okaaa ], | |
case I =:= MidCount of | |
true -> | |
Parent ! go; | |
_ -> | |
ok | |
end | |
end || I <- lists:seq(1, Count) ], | |
Duration = erlang:system_time(microsecond) - Start, | |
?LOG_INFO("Writer process DONE, ~p [msec] = ~p [usec]", [Duration div 1000, Duration]), | |
Parent ! done | |
end). | |
erpc_call(Node, Mod, Fun, Args) -> | |
?LOG_INFO("erpc:call(~p) | ~p:~p()", [Node, Mod, Fun]), | |
erpc:call(Node, Mod, Fun, Args). | |
pa_args(PathToRa) -> | |
DepsPath = filename:join([PathToRa, "deps"]), | |
{ok, DepsDirs0} = file:list_dir(DepsPath), | |
DepsDirs1 = [ filename:join([DepsPath, Dir]) || Dir <- DepsDirs0 ], | |
lists:append([ ["-pa", filename:join(Dir, "ebin")] || Dir <- [PathToRa | DepsDirs1] ]). | |
start_server(Node) -> | |
ra:start_server(default, ?CLUSTER_NAME, id(node()), machine_config(), [id(Node)]). | |
start_node(ShortName, Args) -> | |
{ok, PeerPid, Node} = ?CT_PEER(#{name => ShortName, host => ?HOST, args => Args}), | |
_ = erpc_call(Node, ?MODULE, setup_logger, []), | |
{PeerPid, Node}. | |
stop_node(Node, PeerPid) -> | |
StopServer = erpc_call(Node, ra, stop_server, [id(Node)]), | |
?LOG_INFO("ra:stop_server() at ~p: result=~p", [Node, StopServer]), | |
ok = peer:stop(PeerPid). | |
machine_config() -> | |
MachineInitialConfig = #{}, | |
{module, ?MODULE, MachineInitialConfig}. | |
id(Node) -> | |
{?MODULE, Node}. | |
setup_logger() -> | |
ok = logger:set_primary_config(level, debug), | |
CustomFilter = fun(#{msg := {report, #{label := {application_controller, Type}}}}, _) | |
when Type =:= progress; | |
Type =:= exit -> | |
stop; | |
(#{meta := #{domain := [otp, sasl]}}, _) -> | |
stop; | |
(#{meta := Meta0} = Msg, _) -> | |
Msg#{meta => Meta0#{node => node()}} | |
end, | |
ok = logger:add_primary_filter(custom, {CustomFilter, no_filter_arg}), | |
ok = logger:remove_handler_filter(default, remote_gl), | |
ok = logger:set_handler_config(default, filter_default, log), | |
ok = logger:update_formatter_config(default, single_line, false), | |
ok = logger:update_formatter_config(default, legacy_header, false), | |
ok = logger:update_formatter_config(default, | |
template, | |
["[", node, "] ", time, " [", level, "] ", mfa, "\n ", msg, "\n"]), | |
ok. | |
%% ra_machine | |
write(Node, Key, Value) -> | |
try | |
%% ?LOG_INFO("write begin: node=~p, value=~p", [Node, Value]), | |
case ra:process_command(id(Node), {write, Key, Value}, 100) of | |
{ok, ok, _Leader} -> | |
ok; | |
{ok, {error, Reason}, _Leader} -> | |
{error, Reason}; | |
{error, Reason} -> | |
{error, {ra_process_command_error, Reason}}; | |
{timeout, _} -> | |
{error, timeout} | |
end | |
catch | |
C:E:ST -> | |
?LOG_ERROR("!!!!!!! write error by exception !!!!!!!!!: ~p", [{C, E, ST}]), | |
{error, {exception, {C, E, ST}}} | |
end. | |
get_all(Node) -> | |
case ra:consistent_query(id(Node), | |
fun(State) -> | |
State | |
end, | |
1000) of | |
{ok, State, _} -> | |
State; | |
{timeout, _} -> | |
{error, timeout}; | |
{error, Reason} -> | |
{error, Reason} | |
end. | |
init(Config) -> | |
?LOG_INFO("ra_machine callback init() called: config=~p", [Config]), | |
#{}. | |
apply(_Meta, {write, Node, Value} = Cmd, State) -> | |
?LOG_INFO("ra_machine callback apply() called: cmd=~p", [Cmd]), | |
{State#{Node => Value}, ok, []}; | |
apply(_Meta, Cmd, State) -> | |
?LOG_INFO("ra_machine callback apply() called: cmd=~p", [Cmd]), | |
{State, {error, not_implemeted}, []}. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment