Erlang で memcached を作ってみました。
先日、こちらの Erlang の世界ではmemcachedとか要らない を興味深く読ませて頂きました。
たしかにクライアント側も Erlang で書かれている場合、例えばキャッシュサー バーにアクセスを行う WEB アプリケーションも Erlang で書かれていれば Erlang のプロセス間通信を使用することで簡単にキャッシュサーバを実装する ことが出来そうです。しかし、WEB アプリケーションなど、全てのシステムを Erlang で書くにはまだ私にとって勇気が要る事なので TCP/IP で memcache プ ロトコルを喋る Erlang 版 memcached を作ってみました。 その名も ememcached です。
# 見苦しい点が御座いましたらご指摘頂けると有り難いです。m(--)m
% ememcached.erl
-module(ememcached).
-export([start/0, ememcached/1, process_command/1]).
start() ->
register(ememcached, spawn(?MODULE, ememcached, [11211])).
ememcached(Port) ->
ets:new(item, [public, named_table]),
{ok, Listen} =
gen_tcp:listen(
Port, [binary, {packet, line}, {active, false}, {reuseaddr, true}]),
io:fwrite("< server listening ~p\n", [Port]),
ememcached_accept(Listen).
ememcached_accept(Listen) ->
{ok, Sock} = gen_tcp:accept(Listen),
io:fwrite("<~p new client connection\n", [Sock]),
spawn(?MODULE, process_command, [Sock]),
ememcached_accept(Listen).
process_command(Sock) ->
case gen_tcp:recv(Sock, 0) of
{ok, Line} ->
io:fwrite(">~p ~s", [Sock, Line]),
Token = string:tokens(binary_to_list(Line), " \r\n"),
case Token of
["get", Key] ->
process_get(Sock, Key);
["set", Key, Flags, Expire, Bytes] ->
inet:setopts(Sock,[{packet, raw}]),
process_set(Sock, Key, Flags, Expire, Bytes),
inet:setopts(Sock,[{packet, line}]);
["delete", Key] ->
process_delete(Sock, Key);
["quit"] -> gen_tcp:close(Sock);
_ -> gen_tcp:send(Sock, "ERROR\r\n")
end,
process_command(Sock);
{error, closed} ->
io:fwrite("<~p connection closed.\n", [Sock]);
Error ->
io:fwrite("<~p error: ~p\n", [Sock, Error])
end.
process_get(Sock, Key) ->
case ets:lookup(item, Key) of
[{_, {Value, Expire}}] ->
Diff = Expire - epoch(),
if
(Expire == 0) or (Diff > 0) ->
gen_tcp:send(Sock, io_lib:format(
"VALUE ~s 0 ~w\r\n~s\r\nEND\r\n",
[Key, size(Value), Value]));
true ->
gen_tcp:send(Sock, "END\r\n"),
io:fwrite("EXPIRED: ~s\n", [Key]),
ets:delete(item, Key)
end;
[] ->
gen_tcp:send(Sock, "END\r\n")
end.
process_set(Sock, Key, _Flags, _Expire, Bytes) ->
case gen_tcp:recv(Sock, list_to_integer(Bytes)) of
{ok, Value} ->
ets:insert(item, {Key, {Value,
case list_to_integer(_Expire) of
0 -> 0;
Expire -> epoch() + Expire
end}}),
gen_tcp:send(Sock, "STORED\r\n");
{error, closed} ->
ok;
Error ->
io:fwrite("Error: ~p\n", [Error])
end,
gen_tcp:recv(Sock, 2).
process_delete(Sock, Key) ->
case ets:lookup(item, Key) of
[{_, _}] ->
ets:delete(item, Key),
gen_tcp:send(Sock, "DELETED\r\n");
_ ->
gen_tcp:send(Sock, "NOT_FOUND\r\n")
end.
epoch() ->
{Msec, Sec, _} = now(),
Msec * 1000 + Sec.
実行方法は
% erlc ememcached.erl % erl -noshell -s ememcached start
で起動し、11211 を listen します。
僅かこれだけのコードで get と set と delete を行う Erlang 版 memcached が実装できました。 厳密なベンチマークがまだ出来ていませんがオリジナルの memcached と大差な いパフォーマンスが出ています。
続いて実装上のポイント幾つか紹介します。
ソケットオプション
ソケット通信のオプションにはデータを行単位で受信を行う line モードがあ ります。
memcache プロトコルにおいて、クライアントからのコマンドを受信する時に はこの line モードが非常に便利なのですが set コマンドを受け取った後は 改行を含むバイナリデータを受信する必要があります。
そこで通常は line モードでコマンドを受信しますが、set コマンドを受け取っ たら raw モードに切り替え、バイナリデータを受信したら line モードに戻す 様なコードになっています。
["set", Key, Flags, Expire, Bytes] ->
inet:setopts(Sock,[{packet, raw}]),
process_set(Sock, Key, Flags, Expire, Bytes),
inet:setopts(Sock,[{packet, line}]);
ストレージシステム ets
ets はキーに対応するErlang のオブジェクトを格納し、探索を行うことが出 来るストレージシステムです。ets の場合メモリ上に保持されるため再起動す れば消えてしまいますが高速です。
# 前回付箋 Web アプリで使用した dets はディスクに保持されるため永続性 があります。
ets の使い方は以下の様にとても単純です。
% テーブルを作成
> ets:new(table, [public, named_table]).
% キーと値を挿入
> ets:insert(table, {"key", "hello"}).
true
% キーに対応する値を取得
> ets:lookup(table, "key").
[{"key","hello"}]
今回はキャッシュのストレージとして ets を使用しましたが、 次回は Mnesia を使用して実装した ememcached を紹介したいと思います。 Mnesia を使用すると複数のサーバーでレプリケーションを行ったり、再起動 を行ってもデータが消えないよう永続化可能な memcached が実装できます。
