Iris through Erlang

Taking gen_server to the next level

Péter Szilágyi

European Institute for Innovation and Technology

Eötvös Loránd University, Budapest, Hungary

Babeş-Bolyai University, Cluj-Napoca, Romania

Note, these are the offline slides of the presentation. For executable codes, please check playground availability at http://iris.karalabe.com/talks.

What the heck is Iris?

Decentralized messaging framework

What does it do?

It communicates, of course!

Show me the code!

Challenge #1 – Group chat

Implement an IRC style group chat system:

Solution #1 – Broadcast

% Print all received broadcast messages
echo() ->
    receive {broadcast, Msg} -> io:format("~p~n", [Msg]) end,
    echo().

main() ->
    % Connect to the Iris network
    {ok, Conn} = iris:connect(55555, "irc", spawn(fun echo/0)),

    % Send a dummy message every second
    lists:foreach(fun(_) ->
        ok = iris:broadcast(Conn, "irc", <<"Alice: Hello!">>),
        timer:sleep(1000)
    end, lists:seq(1, 100)),
    iris:close(Conn).

Hint: Rename Alice in a second window 😉

Broadcast highlights

iris:connect(Xyz, App :: string(), Handler :: pid()) → {ok, Conn :: iris:connection()}
iris:broadcast(Conn :: iris:connection(), App :: string(), Msg :: binary())
{broadcast, Msg :: binary()}
iris:close(Conn :: iris:connection())

Solution #1 – OTP – Broadcast

-behavior(iris_server).

% Initialization and cleanup (same as gen_server)
init(nil) -> {ok, nil}.
terminate(_Reason, _State) -> ok.

% Print all received broadcast messages
handle_broadcast(Message, State, _Link) ->
    io:format("~p~n", [Message]),
    {noreply, State}.

main() ->
    % Connect to the Iris network
    {ok, Server, Link} = iris_server:start(55555, "irc", ?MODULE, nil),

    % Send a dummy message every second
    lists:foreach(fun(_) ->
        ok = iris:broadcast(Link, "irc", <<"Alice: Hello, OTP!">>),
        timer:sleep(1000)
    end, lists:seq(1, 100)),
    ok = iris_server:stop(Server).

OTP Broadcast highlights

-behavior(iris_server)
iris_server:start(Xyz, App :: string(), Callback :: atom(), Args :: term()) →
    {ok, Server :: pid(), Link :: iris:connection()}
callback:handle_broadcast(Msg :: binary(), State :: term(), Link :: iris:connection())
iris_server:stop(Server :: pid())

Challenge #2 – Web requests

Implement a system for handling web requests:

Solution #2 – Browser

main() ->
    % Connect to the Iris network
    {ok, Conn} = iris:connect(55555, "browser", none),

    % Issue a dummy request every second
    lists:foreach(fun(_) ->
        case iris:request(Conn, "webserver", <<"Some request">>, 1000) of
            {ok, Reply}     -> io:format("Web reply ~p~n", [Reply]);
            {error, Reason} -> io:format("Request failed: ~p~n", [Reason])
        end,
        timer:sleep(1000)
    end, lists:seq(1, 100)),
    iris:close(Conn).

Hint: Start some webservers and check back 😉

Solution #2 – Web server

% Format each request a bit and return as the reply
serve(Id) ->
    receive
        {request, From, Req} -> ok = iris:reply(From, <<Id/binary, ": ", Req/binary>>)
    end, serve(Id).

main() ->
    % Generate a random ID for the web server
    random:seed(erlang:now()),
    Id = list_to_binary(io_lib:format("www-~p", [random:uniform(100)])),

    % Connect to the Iris network
    {ok, Conn} = iris:connect(55555, "webserver", spawn(fun() -> serve(Id) end)),

    % Serve a while, then quit
    io:format("Waiting for requests...~n"),
    timer:sleep(100 * 1000),
    iris:close(Conn).

The presentation supports only one active demo process per window. Open new tab?

Request / Reply highlights

iris:request(Conn :: iris:connection(), App :: string(), Req :: binary(), Timeout :: int()) →
    {ok, Reply :: binary()}
{request, From :: iris:sender(), Req :: binary()}

Solution #2 – OTP – Web server

% Format each request a bit and return as the reply
handle_request(Req, _From, Id, _Link) ->
    {reply, <<Id/binary, ": ", Req/binary>>, Id}.

main() ->
    % Generate a random ID for the web server
    random:seed(erlang:now()),
    Id = list_to_binary(io_lib:format("www-~p", [random:uniform(100)])),

    % Connect to the Iris network
    {ok, Server, _Link} = iris_server:start(55555, "webserver", ?MODULE, Id),

    % Serve a while, then quit
    io:format("Waiting for requests...~n"),
    timer:sleep(100 * 1000),
    ok = iris_server:stop(Server).

The presentation supports only one active demo process per window. Open new tab?

OTP Request / Reply highlights

callback:handle_request(Req :: binary(), From :: iris:sender(), State :: term(), Link) → Result
Result = {reply, Reply :: binary(), State :: term()}
Result = {noreply, State :: term()}

Challenge #3 – Aperture Science Enrichment Center 😈

Implement the comlink for Aperture Laboratories¹:

¹ http://en.wikipedia.org/wiki/Portal_(video_game)

Solution #3 – GLaDOS

main() ->
    % Connect to the Iris network
    {ok, Conn} = iris:connect(55555, "GLaDOS", none),

    % Publish the nice wishes
    io:format("GLaDOS is online...~n"),
    lists:foreach(fun(_) ->
        Wish = lists:nth(random:uniform(count()), wishes()),
        ok = iris:publish(Conn, "official", <<"GLaDOS: ", Wish/binary>>),
        timer:sleep(5000)
    end, lists:seq(1, 100)),
    iris:close(Conn).

Hint: Boot GLaDOS and let the experiment begin 😉

Solution #3 – Chell

% Print all received topic events
echo() ->
    receive {publish, _Topic, Event} ->
        io:format("~s~n~n", [binary_to_list(Event)])
    end, echo().

main() ->
    % Connect to the Iris network
    {ok, Conn} = iris:connect(55555, "Chell", spawn(fun echo/0)),

    % Subscribe to some topics
    io:format("Tuning in to Aperture channels...~n"),
    lists:foreach(fun(Topic) ->
        ok = iris:subscribe(Conn, Topic)
    end, ["official", "portal"]),

    % Wait a bit, then terminate
    timer:sleep(100 * 1000),
    iris:close(Conn).

Hint: Maybe there is an "unofficial" channel? 😉

Publish / Subscribe highlights

iris:subscribe(Conn :: iris:connection(), Topic :: string())
iris:publish(Conn :: iris:connection(), Topic ::string(), Msg :: binary())
{publish, Topic :: string(), Event :: binary()}

Solution #3 – OTP – Chell

% Print all received topic events
handle_publish(_Topic, Event, State, _Link) ->
    io:format("~s~n~n", [binary_to_list(Event)]),
    {noreply, State}.

main() ->
    % Connect to the Iris network
    {ok, Server, Link} = iris_server:start(55555, "Chell", ?MODULE, nil),

    % Subscribe to some topics
    io:format("Tuning in to Aperture channels...~n"),
    lists:foreach(fun(Topic) ->
        ok = iris:subscribe(Link, Topic)
    end, ["official", "portal"]),

    % Wait a bit, then terminate
    timer:sleep(100 * 1000),
    iris_server:close(Server).

Hint: Maybe there is an "unofficial" channel? 😉

OTP Publish / Subscribe highlights

callback:handle_publish(Topic :: string(), Event :: binary(), State :: term(), Link) → Result
Result = {noreply, NewState :: term()}
Result = {stop, Reason :: term(), NewState :: term()}

Challenge #4 – Transactions

Implement a distributed transaction system:

Solution #4 – Client

    % Open an outbound tunnel to a database
    Tun = case iris:tunnel(Conn, "database", 1000) of
        {ok, Tunnel}    -> Tunnel;
        {error, Reason} -> io:format("Tunneling failed: ~p~n", [Reason]), erlang:halt()
    end,

    % Send a multi-part transaction
    lists:foreach(fun(Idx) ->
        Msg = list_to_binary(io_lib:format("Part #~p", [Idx])),
        ok = iris:send(Tun, Msg, 1000)
    end, lists:seq(1, 10)),

    % Retrieve the database replies
    lists:foreach(fun(_) ->
        {ok, Msg} = iris:recv(Tun, 1000),
        io:format("~p~n", [Msg])
    end, lists:seq(1, 10)),

    iris:close(Tun),

Hint: Start some database processes 😉

Solution #4 – Database

% Echo back each transaction part
echo(Tun, Id, Tx) ->
    case iris:recv(Tun, 1000) of
        {ok, Msg} ->
            Req = binary_to_list(Msg),
            Rep = list_to_binary(io_lib:format("db-~p (tx-~p): ~s", [Id, Tx, Req])),
            ok = iris:send(Tun, Rep, 1000),
            echo(Tun, Id, Tx);
        {error, _Reason} ->
            ok = iris:close(Tun)
    end.

handle(Id, Transaction) ->
    receive
        {tunnel, Tun} -> echo(Tun, Id, Transaction)
    end, handle(Id, Transaction + 1).

The presentation supports only one active demo process per window. Open new tab?

Tunnel highlights

iris:tunnel(App :: string()) → {ok, Tunnel :: iris:tunnel()}
{tunnel, Tunnel :: iris:tunnel()}
iris:send(Tunnel :: iris:tunnel(), Msg :: binary(), Timeout :: integer())
iris:recv(Tunnel :: iris:tunnel(), Timeout :: integer()) → {ok, Msg :: binary()}

Solution #4 – OTP – Database

% Echo back each transaction part
echo(Tun, Id, Tx) ->
    case iris:recv(Tun, 1000) of
        {ok, Msg} ->
            Req = binary_to_list(Msg),
            Rep = list_to_binary(io_lib:format("db-~p (tx-~p): ~s", [Id, Tx, Req])),
            ok = iris:send(Tun, Rep, 1000),
            echo(Tun, Id, Tx);
        {error, _Reason} ->
            ok = iris:close(Tun)
    end.

handle_tunnel(Tunnel, {Id, Transaction}, _Link) ->
    spawn(?MODULE, echo, [Tunnel, Id, Transaction ]),
    {noreply, {Id, Transaction+1}}.

The presentation supports only one active demo process per window. Open new tab?

OTP Tunnel highlights

callback:handle_tunnel(Tunnel :: iris:tunnel(), State :: term(), Link) → Result
Result = {noreply, NewState :: term()}
Result = {stop, Reason :: term(), NewState :: term()}

So how does this all work?

Sneak behind the scenes

Iris nodes do the heavy lifting (one/host):

Thin clients bathe in the glory:

Finally, my promise (xyz in iris:connect):

Thank you

Péter Szilágyi

European Institute for Innovation and Technology

Eötvös Loránd University, Budapest, Hungary

Babeş-Bolyai University, Cluj-Napoca, Romania