I. distributed programming in Erlang A. definition and applications ------------------------------------------ WHAT IS DISTRIBUTED PROGRAMMING? A program is distributed when it executes on Advantages: - true concurrency - scalability (speed, data size) - extensibility - availability - fault tolerance - access to large set of resources - fits if application is intrinsically distributed Disadvantages: - true concurrency (synchroniztion hard) - time lags - no global state - network partitions - partial failures (one node goes down...) - open system causes security problems ------------------------------------------ ------------------------------------------ PROBLEMS WITH DISTRIBUTED PROGRAMS Constraints: - data can't be directly shared - network is slow (compared to memory) - resources can be local (devices) - nodes and communication can fail For availablity, so clients can always reach a server, and stable storage, which survives crashes of nodes, want more than one copy of data... Coordination: - giving clients the illusion of a Examples: - selling seats in an airplane - selling an item in an auction - changing account information ------------------------------------------ ------------------------------------------ BYZANTINE GENERALS PROBLEM An analogy hill valley hill general A enemy general B + troops troops + troops ------------------------------------------ ------------------------------------------ ERLANG CONCEPTS FOR DISTRIBUTED SYSTEMS Node = a computer running Erlang an alive node has a name (unique atom) def: when a node N is *monitored* by a node L, then if N is not responding then a {'DOWN', ..., N} message is sent to L. def: when a node N is *linked* to node L, then when either exits, the other does also. Message passing semantics: - messages are delivered in the order sent - messages are never corrupted or lost ------------------------------------------ Given what we have already seen in Erlang, how should we make distributed programs? ------------------------------------------ TESTING DISTRIBUTION Use 2 OS shells: $ erl -sname node1 $ erl -sname node2 MESSAGE PASSING {runavg, nodeA} ! Msg sent to 'runavg' on nodeA ------------------------------------------ ------------------------------------------ BIFS FOR DISTRIBUTION call/4 call(Node, Mod, Function, Args) -> Result | {badrpc, Reason} spawn/2 spawn(Node, Fun) -> Pid spawn/4 spawn(Node, Mod, Fun, Args) -> Pid monitor_node/2 monitor_node(Node, Flag) Flag : {true, false} node/0 node() -> Node ------------------------------------------ B. fault tolerance and atomicity ------------------------------------------ LINKING AND MONITORING Linking: 2 nodes can be linked so that when one node goes down, then the other does too! link(Pid) spawn_link(Node, Fun) -> Pid spawn_link(Node, Mod, Fun, Args) -> Pid Monitoring: if A monitors B, then A is sent a message: - {'UP', B, ...}, when B joins net - {'DOWN', B, ...}, when B leaves net monitor(process, Pid) spawn_monitor(Fun) -> {Pid, Ref} spawn_monitor(Node, Mod, Func, Args) -> {Pid, Ref} ------------------------------------------ C. security ? ------------------------------------------ COOKIE PROTECTION SYSTEM Designed for system running on a LAN. Nodes have 1 cookie each. Only nodes with the same cookie can talk erlang:set_cookie/2 set_cookie(Node, Cookie) -> true erlang:get_cookie/0 get_cookie() -> Cookie Cookie random string created by Erlang on machine Ways to get the same cookie in a system: - change the $HOME/.erlang.cookie file - use: erl -setcookie C - use erlang:set_cookie/2 ------------------------------------------ II. multicore programming in Erlang A. parallelism motivation ------------------------------------------ WHY IS PARALLELISM USEFUL? ------------------------------------------ B. streams ------------------------------------------ STREAMS def: a *stream* is Grammar: ::= , ------------------------------------------ What kind of recursion does this grammar suggest? 1. Producer/Consumer ------------------------------------------ PRODUCER/CONSUMER PIPELINE WITH STREAMS |-----------| 3|5|7|9|... |-----------| | Consumer |<------------| Producer | |-----------| |-----------| Example: |-----------| |-----------| | Player |<------------|MP3 Encoder| |-----------| |-----------| Idea: ------------------------------------------ What kinds of things would the consumer do? 2. flow control What happens if the producer runs faster than the consumer? What happens if the consumer runs faster than the producer? ------------------------------------------ BUFFERING |-----------| 3|5|7|9|... |-----------| | Consumer |<------------| Producer | |-----------| |-----------| Extremes of flow: Eager producer: Lazy producer: |-----------| |-----------| | Consumer |<--[buffer]--| Producer | |-----------| |-----------| ------------------------------------------ C. extended example motivation ------------------------------------------ HAILSTONE (3x+1) EXAMPLE -spec h(pos_integer()) -> pos_integer(). h(N) -> if N rem 2 =:= 1 -> (3*N+1) div 2; true -> N div 2 end. % returns the trajectory of the number N: % [N, h(N), h(h(N)), h(h(h(N))), ..., 1] -spec trajectoryTo1(pos_integer()) -> [pos_integer()]. trajectoryTo1(N) -> if N > 1 -> [N|trajectoryTo1(h(N))]; true -> [N] end. ------------------------------------------ D. buffered stream actor templates 1. architecture ------------------------------------------ OVERALL PROTOCOL FOR STREAMS production is lazy: {From, get} message triggers production {Name, {put, Value}} message is the response |-----------| Value |-----------| | Consumer |<------------| Producer | |-----------| |-----------| timeline: | {Consumer, get} | |---------------------->| | | |{Producer, {put,Value}}| |<----------------------| | | ------------------------------------------ ------------------------------------------ BUFFERING Approach: Erlang mailbox is the buffer Pick a Size of buffer, N say 100 invariant: There are always N requests (get) sent to the producer and not yet received (put) ------------------------------------------ 2. producer ------------------------------------------ GENERALIZED PRODUCER MODULE -module(producer). -export([start_link/3]). -spec start_link(Node::atom(), Name::pid(), Mod::atom()) -> pid(). start_link(Node, Name, Mod) -> register(Name, spawn_link(Node, fun() -> loop(Name, Mod, Mod:init()) end)). loop(Name, Mod, State) -> ------------------------------------------ What is the Node argument for? 3. pipes (in the middle) a. generalized mapper ------------------------------------------ GENERALIZED MAPPER -module(mapper). -export([start_link/6]). -spec start_link(atom(), atom(), atom(), atom(), atom(), pos_integer()) -> no_return(). start_link(Node, MyName, Mod, MyOutput, MyInput, Size) -> register(MyName, spawn_link(Node, fun() -> init(MyName, Mod, MyOutput, MyInput, Size) end)). -spec init(atom(), atom(), pid(), pid(), pos_integer()) -> no_return(). init(MyName, Mod, MyOutput, MyInput, Size) -> %% establish the invariant that the number of outstanding requests %% to MyInput is equal to Size lists:foreach(fun(_) -> ask(MyInput, MyName) end, lists:seq(1, Size)), loop(MyName, Mod, MyOutput, MyInput, Mod:init()). -spec ask(pid(), atom()) -> ok. ask(MyInput, MyName) -> MyInput ! {MyName, get}, ok. -spec loop(atom(), atom(), atom(), atom(), any()) -> no_return(). loop(MyName, Mod, MyOutput, MyInput, State) -> ------------------------------------------ What should the loop do? b. filterer ------------------------------------------ GENERALIZED FILTERER -module(filterer). -export([start_link/6]). start_link(Node, MyName, Mod, MyOutput, MyInput, Size) -> register(MyName, spawn_link(Node, fun() -> init(MyName, Mod, MyOutput, MyInput, Size) end)). init(MyName, Mod, MyOutput, MyInput, Size) -> lists:foreach(fun(_) -> ask(MyInput, MyName) end, lists:seq(1, Size)), loop(MyName, Mod, MyOutput, MyInput, Mod:init()). ask(MyInput, MyName) -> MyInput ! {MyName, get}. loop(MyName, Mod, MyOutput, MyInput, State) -> ------------------------------------------ 4. consumer ------------------------------------------ GENERALIZED CONSUMER -module(consumer). -export([start_link/4]). -spec start_link(atom(), atom(), atom(), atom()) -> no_return(). start_link(Node, MyName, Mod, MyInput) -> register(MyName, spawn_link(Node, fun() -> loop(MyName, Mod, MyInput, Mod:init()) end)). -spec loop(atom(), atom(), atom(), any()) -> no_return(). loop(MyName, Mod, MyInput, State) -> ------------------------------------------ E. example system design ------------------------------------------ HAILSTONE STREAM SYSTEM Consumer Mapper Producer |---------| |-------| |------| | peaks | <-- | graph | <--- | odds | | + | | max | |------| |(Storage)| | value | |---------| |-------| ------------------------------------------ 1. producer ------------------------------------------ odds_producer -module(odds_producer). -export([init/0,produce/1]). -spec init() -> pos_integer(). init() -> -spec produce(pos_integer()) -> {pos_integer(), pos_integer()}. produce(Next) -> ------------------------------------------ 2. mapper ------------------------------------------ max_value_graph_mapper -module(max_value_graph_mapper). -export([init/0, transform/2]). -import(hailstone,[hailstoneMax/1]). -spec init() -> ok. init() -> ok. -spec transform(State::any(), Value::pos_integer()) -> {any(), {pos_integer(), pos_integer()}}. transform(State, N) -> ------------------------------------------ 3. consumer ------------------------------------------ max_value_peak_consumer -module(max_value_peak_consumer). -export([init/0, consume/2]). -spec init() -> {pos_integer(), pos_integer()}. init() -> -spec consume(State::{pos_integer(), pos_integer()}, Value::{pos_integer(), pos_integer()}) -> {pos_integer(), pos_integer()}. consume({Arg, ArgValue}, {NewArg, NewValue}) -> ------------------------------------------ 4. overall system ------------------------------------------ hailstone_stream_system -module(hailstone_stream_system). -import(producer,[start_link/3]). -export([start/1]). -spec start(pos_integer()) -> pid(). start(BuffSize) -> compile:file(myrpc), compile:file(producer), compile:file(odds_producer), compile:file(mapper), compile:file(max_value_graph_mapper), compile:file(consumer), compile:file(max_value_peak_consumer), producer:start_link(node(), odds_producer, odds_producer), mapper:start_link(node(), max_value_graph_mapper, max_value_graph_mapper, max_value_peak_consumer, odds_producer, BuffSize), consumer:start_link(node(), max_value_peak_consumer, max_value_peak_consumer, max_value_graph_mapper). ------------------------------------------ F. architecture for using more distributed machines ------------------------------------------ ARCHITECTURE FOR MORE DISTRIBUTION coordinator + N workers coordinator: tracks state of search assigns intervals to workers records results on stable storage workers asks coordinator for an interval searches the interval reports any results to coordinator ------------------------------------------