Last active
March 29, 2017 08:54
-
-
Save redink/6411d0084e89a62a01a2de0104cbb5ef to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%%%------------------------------------------------------------------- | |
%%% @author redink | |
%%% @copyright (C) , redink | |
%%% @doc | |
%%% | |
%%% @end | |
%%% Created : by redink | |
%%%------------------------------------------------------------------- | |
-module(rcursor). | |
-behaviour(gen_server). | |
%% API | |
-export([start_link/0]). | |
-export([ new_group_cursor/2 | |
, write_msg/2 | |
, read_msg/2 | |
, batch_read_msg/3 | |
, ack_msg/3 | |
]). | |
%% gen_server callbacks | |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, | |
terminate/2, code_change/3]). | |
-define(SERVER, ?MODULE). | |
-define(HIBERNATE_TIMEOUT, 10000). | |
-record(state, {}). | |
%%%=================================================================== | |
%%% API | |
%%%=================================================================== | |
%%-------------------------------------------------------------------- | |
start_link() -> | |
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). | |
new_group_cursor(GroupID, MsgNumLimit) -> | |
gen_server:call(?SERVER, {new_group_cursor, GroupID, MsgNumLimit}). | |
write_msg(GroupID, MsgID) -> | |
case ets:lookup(?SERVER, GroupID) of | |
[] -> | |
{error, group_not_found}; | |
[{GroupID, _, MsgIDTable, MsgNumLimit}] -> | |
ets:insert(MsgIDTable, {MsgID}), | |
case ets:info(MsgIDTable, size) > MsgNumLimit of | |
true -> | |
ets:delete(MsgIDTable, ets:first(MsgIDTable)); | |
_ -> | |
ignore | |
end, | |
{ok, ets:info(MsgIDTable, size)} | |
end. | |
read_msg(GroupID, UserID) -> | |
case ets:lookup(?SERVER, GroupID) of | |
[] -> | |
{error, group_not_found}; | |
[{_, UserCursorTable, MsgIDTable, _}] -> | |
case read_msg(UserCursorTable, UserID, MsgIDTable) of | |
{ok, '$end_of_table'} -> | |
{error, empty}; | |
Any -> | |
Any | |
end | |
end. | |
batch_read_msg(GroupID, UserID, Num) -> | |
case ets:lookup(?SERVER, GroupID) of | |
[] -> | |
{error, group_not_found}; | |
[{_, UserCursorTable, MsgIDTable, _}] -> | |
case read_msg(UserCursorTable, UserID, MsgIDTable) of | |
{ok, '$end_of_table'} -> | |
[]; | |
{ok, CurrentMsgID} -> | |
batch_read_msg(MsgIDTable, Num - 1, | |
CurrentMsgID, [CurrentMsgID]) | |
end | |
end. | |
ack_msg(GroupID, UserID, MsgID) -> | |
case ets:lookup(?SERVER, GroupID) of | |
[] -> | |
{error, group_not_found}; | |
[{_, UserCursorTable, MsgIDTable, _}] -> | |
ack_msg(MsgIDTable, MsgID, UserCursorTable, UserID) | |
end. | |
%%%=================================================================== | |
%%% gen_server callbacks | |
%%%=================================================================== | |
init([]) -> | |
ets:new(?SERVER, [named_table, set, public]), | |
{ok, #state{}, ?HIBERNATE_TIMEOUT}. | |
%%-------------------------------------------------------------------- | |
handle_call({new_group_cursor, GroupID, MsgNumLimit}, _From, State) -> | |
Res = | |
case ets:lookup(?SERVER, GroupID) of | |
[] -> | |
new_group_cursor_do(GroupID, MsgNumLimit), | |
ok; | |
_ -> | |
already_exist | |
end, | |
{reply, Res, State, ?HIBERNATE_TIMEOUT}; | |
handle_call(_Request, _From, State) -> | |
{reply, ok, State, ?HIBERNATE_TIMEOUT}. | |
%%-------------------------------------------------------------------- | |
handle_cast(_Msg, State) -> | |
{noreply, State, ?HIBERNATE_TIMEOUT}. | |
%%-------------------------------------------------------------------- | |
handle_info(timeout, State) -> | |
proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]), | |
{noreply, State, ?HIBERNATE_TIMEOUT}; | |
handle_info(_Info, State) -> | |
{noreply, State, ?HIBERNATE_TIMEOUT}. | |
%%-------------------------------------------------------------------- | |
terminate(_Reason, _State) -> | |
ok. | |
%%-------------------------------------------------------------------- | |
code_change(_OldVsn, State, _Extra) -> | |
{ok, State}. | |
%%%=================================================================== | |
%%% Internal functions | |
%%%=================================================================== | |
read_msg(UserCursorTable, UserID, MsgIDTable) -> | |
case ets:lookup(UserCursorTable, UserID) of | |
[] -> | |
{ok, ets_first(MsgIDTable)}; | |
[{UserID, UserCursorIndex}] -> | |
{ok, ets_next(MsgIDTable, UserCursorIndex)} | |
end. | |
batch_read_msg(_MsgIDTable, 0, _, Res) -> | |
lists:reverse(Res); | |
batch_read_msg(MsgIDTable, Num, Key, Res) -> | |
case ets_next(MsgIDTable, Key) of | |
'$end_of_table' -> | |
lists:reverse(Res); | |
NewKey -> | |
batch_read_msg(MsgIDTable, Num - 1, NewKey, [NewKey | Res]) | |
end. | |
ack_msg(MsgIDTable, MsgID, UserCursorTable, UserID) -> | |
case ets:lookup(MsgIDTable, MsgID) of | |
[] -> | |
{error, msgid_not_found}; | |
_ -> | |
{ok, ets:insert(UserCursorTable, {UserID, MsgID})} | |
end. | |
new_group_cursor_do(GroupID, MsgNumLimit) -> | |
UserCursorTable = ets:new(user_cursor, [set, public]), | |
MsgIDTable = ets:new(msgid, [ordered_set, public]), | |
ets:insert(?SERVER, {GroupID, UserCursorTable, MsgIDTable, MsgNumLimit}), | |
ok. | |
-ifdef(ASCENDING). | |
ets_first(Table) -> | |
ets:first(Table). | |
ets_next(Table, Key) -> | |
ets:next(Table, Key). | |
-else. | |
ets_first(Table) -> | |
ets:last(Table). | |
ets_next(Table, Key) -> | |
ets:prev(Table, Key). | |
-endif. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
one group include:
用于存储 {groupmember, msgidindex} 映射关系,groupmember 为group 群成员,msgidindex 为该成员最后ack 的消息ID
用于存储msgid,假设msgid 是有时序性的
所以,对于群离线消息的存储: