I. Message-Passing Concurrency (Ch 5) A. motivation ------------------------------------------ MESSAGE-PASSING CONCURRENCY (5) def: message passing is a programming style in which a program consists of independent entities (agents) that interact by sending messages asynchronously Importance: ------------------------------------------ What is asynchronous? What's the importance of message passing? B. concepts ------------------------------------------ CONCEPTS new idea: asynchronous communication channel - named - all clients can always send - server can read all messages Fundamentally nondeterministic Port: channel with an associated stream send = append to stream read = take head from stream ------------------------------------------ What does it mean to be "named"? Do we still have referential transparency? C. semantics by example ------------------------------------------ SEMANTICS BY EXAMPLE % Basic semantics of NewPort and Send declare Strm Port in {Browse Port} {Browse Strm} % NewPort takes an variable representing a stream % and initializes the Port argument {NewPort Strm Port} % Sending on the port adds to the stream thread {Delay 1000} {Send Port 3} end thread {Delay 999} {Send Port 4} end thread {Delay 3001} {Send Port 5} end thread {Delay 3000} {Send Port 6} end ------------------------------------------ How would you do this in Java? Why is a mutation needed to describe the meaning of Send? ------------------------------------------ NEWPORT AS A FUNCTION declare S2 P2 U1 U2 in P2 = {NewPort S2} % You can send anything to a port {Browse S2} {Delay 3000} thread {Send P2 7} end thread {Send P2 unit} end thread {Send P2 true} end % You can send undetermined store variables thread {Send P2 U1} end % You can send partial values {Send P2 hmmm(x:U2)} thread U1 = 4020 end ------------------------------------------ ------------------------------------------ ALL SENDS TO A PORT ARE MERGED declare P S {Browse S} P={NewPort S} thread for I in 1..10 do {Send P I} {Delay 500} end end thread for J in 201..210 do {Send P J} {Delay 300} end end ------------------------------------------ ------------------------------------------ THE STREAM IS ONLY READABLE AFTER declare S3 P3 in P3 = {NewPort S3} {Browse S3} local Z in S3 = 1|Z end % suspends {Browse ok} ------------------------------------------ ------------------------------------------ NO SHARING OF STREAMS BETWEEN PORTS declare S4 P4a P4b in P4a = {NewPort S4} {Browse gotHere} P4b = {NewPort S4} % suspends {Browse P4a == P4b} ------------------------------------------ ------------------------------------------ MUST GIVE NEWPORT AN UNDETERMINED STORE VARIABLE declare S5 S5a P5 in {Browse S5} {Browse S5a} S5=5|S5a {Browse 'calling NewPort with S5...'} P5 = {NewPort S5} % suspends {Browse pastIt} ------------------------------------------ D. operational semantics (skip formal semantics details) ------------------------------------------ SYNTAX Syntax: ::= ... | {NewPort } | {Send } {NewPort S P} creates port named P with stream S {Send P X} appends X to stream corresponding to P ------------------------------------------ Why can't we write this in the declarative model? ------------------------------------------ MUTABLE STORE m in MutableStore = Variable -> Variable Operations: {}: MutableStore update: MutableStore -> Variable x Variable -> MutableStore lookup: MutableStore x Variable -> Variable lookup(update(m)(y,x), z) = if z == y then x else lookup(m,z) Notation: write x:y for a binding in MutableStore typical m is {p1:s1, p2:s2} update({p1:s1})(p2, s2) = {p1:s1, p2:s2} lookup({p1:s1, p2:s2}, p1) = s1 ------------------------------------------ ------------------------------------------ SEMANTICS Sequential (-d->) configurations: (ST,s,m) in State = Stack x Store x MutableStore + Message Message = String Stack = ( x Environment)* Store = Variable -> Value T = Message + { (nil,s,m) | s in Store, m in MutableStore} [NewPort call] (({NewPort X Y},E)|Rest, s, m) -d-> (Rest, s', m') where undetermined(s,E(Y)) and n is a port name and n not in range(s) and s' = bind(s)(s(E(Y)),n) and m' = update(m)(E(Y), E(X)) [Send call] (({Send X Y},E)|Rest, s, m) -d-> (Rest, s'', m') where determined(s,E(X)) and s(E(X)) is a port name and lookup(m, E(X)) = z and z'#s' = alloc(s) and m' = update(m)(E(X), z') and l = '|'(E(Y) z') and s'' = bind(s')(s'(z), l) ------------------------------------------ What should happen if we do {NewPort S P} and P is already determined? What should happen if we do {Send P Y} and P is not a port? What should happen if we do {Send P Y} and P is not determined? What should happen if we do {Send P Y} and Y is not determined? What should happen in [Send call] if m(E(X)) is not defined? Does this affect garbage collection? II. Port Objects or Agents or Active Objects (5.2) A. definition For review, what's a stream object? ------------------------------------------ PORT OBJECTS / AGENTS / ACTIVE OBJECTS (5.2) def: a port object is a "combination of one or more ports and a stream object" Extends stream objects: - allows multiple sends - can be embedded in data structures ------------------------------------------ ------------------------------------------ EXAMPLES OF PORT OBJECTS declare fun {SumPortMaker} Strm % declaring the input stream fun {Loop Nums N} case Nums of X|Xs then {Loop Xs N+X} end end in thread _={Loop Strm 0} end {NewPort Strm} end % clients can do... PSum = {SumPortMaker} {Send PSum 3} {Send PSum 7} ------------------------------------------ How could we get information out of such a port object? B. stateful port objects ------------------------------------------ A MORE USEFUL PORT OBJECT WITH MESSAGES declare fun {SumAgentMaker} Strm % declaring the input stream fun {Loop MsgStrm N} case MsgStrm of add(X)|Msgs then {Loop Msgs N+X} [] get(Z)|Msgs then Z=N {Loop Msgs N} end end in thread _={Loop Strm 0} end {NewPort Strm} end % clients can do... declare SumPObj = {SumAgentMaker} {Send SumPObj add(3)} {Send SumPObj add(7)} local Z in {Send SumPObj get(Z)} {Test Z '==' 10} end {Send SumPObj add(27)} {Test {Send SumPObj get($)} '==' 37} ------------------------------------------ What's another example that would be similar? 1. abstraction: NewPortObject function How would you make an abstraction of this kind of thing? ------------------------------------------ ABSTRACTION FOR MAKING PORTS WITH STATE (5.2.1) declare fun {NewPortObject Init Fun} Sin % declaring the input stream in thread _={FoldL Sin Fun Init} end {NewPort Sin} end ------------------------------------------ What's the first argument passed to Fun? The second? How would you write SumAgentMaker using NewPortObject? ------------------------------------------ USING NEWPORTOBJECT \insert 'NewPortObject.oz' declare fun {SumAgentMaker} {NewPortObject 0 fun {$ N Msg} case Msg of add(X) then N+X [] get(Z) then Z=N N end end} end ------------------------------------------ After executing P = {NewPortObject I0 F} {Send P a} {Send P b} {Send P c} what is the state inside the port object? What does NewPortObject return? What happens to the final state? 2. example ------------------------------------------ FINITE STATE PORT OBJECTS Key idea: Encode a finite state machine as a port object using NewPortObject Example: NewBankAccount: > Messages (records) inquire(WhatAmt) deposit(Dollars) withdraw(Dollars) close declare Acct = {NewBankAccount} {Test {Send Acct inquire($)} '==' 0} {Send Acct deposit(10)} {Test {Send Acct inquire($)} '==' 10} {Send Acct deposit(10)} {Test {Send Acct inquire($)} '==' 20} {Send Acct withdraw(5)} {Test {Send Acct inquire($)} '==' 15} {Send Acct withdraw(15)} {Test {Send Acct inquire($)} '==' 0} {Send Acct close} ------------------------------------------ ------------------------------------------ IMPLEMENTING THE BANK ACCOUNT Key idea: identify states and transitions State Machine Diagram: deposit(M) /-------\ | | | | \ / v deposit(N) \ v |------------| ----------------- |-----------| | Opened |/ \->| Opened | | Balance==0 | | Balance>0 | |------------|<\ /|-----------| | \----------------/ ^ | | close withdraw(N) | | v & N==Balance \-----/ |------------| withdraw(M) | Closed | & M) getValue() and that acts like a dataflow variable. ------------------------------------------ C. stateless port objects 1. Stateless port object abstraction ------------------------------------------ MAKING STATELESS PORT OBJECTS (5.2.1) declare fun {NewPortObject2 Proc} Sin in thread for Msg in Sin do {Proc Msg} end end {NewPort Sin} end ------------------------------------------ 2. stateless example (5.2.2) ------------------------------------------ TO DO Write MinServer: > such that the following tests work. declare MinP = {MinServer} {Test {Send MinP min(3 7 $)} '==' 3} {Test {Send MinP min(7 3 $)} '==' 3} {Test {Send MinP min(6 4 $)} '==' 4} {Test {Send MinP min(0 94 $)} '==' 0} ------------------------------------------ 3. Reasoning (5.2.3) a. For stateless port objects (NewPortObject2) ------------------------------------------ REASONING ABOUT STATELESS PORT OBJECTS ------------------------------------------ b. For stateful port objects (NewPortObject) ------------------------------------------ REASONING ABOUT STATEFUL PORT OBJECTS ------------------------------------------ c. For the entire system of agents ------------------------------------------ REASONING ABOUT SYSTEMS OF AGENTS ------------------------------------------ III. Simple message protocols (5.3) What's a protocol? ------------------------------------------ PROTOCOLS (5.3) def: a *protocol* is ------------------------------------------ A. RMI (5.3.1) ------------------------------------------ PROTOCOL EXAMPLE: RMI (= RPC) % Example: file RMI.oz proc {ServerProc Msg} case Msg of calc(X Y) then Y=X*X+2.0*X+2.0 end end Server = {NewPortObject2 ServerProc} proc {ClientProc Msg} case Msg of work(Y) then Y1 Y2 in {Send Server calc(10.0 Y1)} {Wait Y1} {Send Server calc(20.0 Y2)} {Wait Y2} Y=Y1+Y2 end end Client = {NewPortObject2 ClientProc} {Browse {Send Client work($)}} ------------------------------------------ Are these port objects symmetric? Are message executed concurrently by the server? So is there concurrency inside the server? B. Asynchronous RMI (5.3.2) How would you make the sends asynchronous? C. Callbacks (5.3.3-5) ------------------------------------------ CALLBACKS (5.3.3-5.3.5) def: a *callback* occurs when ------------------------------------------ What's a callback? How would you implement callbacks? What if the client's callback has to call the server again? D. Exceptions (5.3.6) What would you do if the server can encounter exceptions? IV. program design for concurrency (5.4) A. programming with concurrent components (5.4.1-2) ------------------------------------------ PROGRAM DESIGN (MULTIAGENT SYSTEMS) Requirements: system overall is a (set of) port(s) specify causal relations on its I/O streams |-----------------| I1-->| |-->O1 | System | I2-->| |-->O2 I3-->| | /-->| |-\ | |-----------------| | | | \----------<------------/ Architecture model system is set of agents communicate by message passing components are procedures: when invoked, creates an instance, which is a port object that uses streams for I/O Detailed design: describe - state of each agent (if any) including state diagrams - interaction protocol - scheduling constraints Test it! ------------------------------------------ B. building blocks ------------------------------------------ LIST OPERATIONS AS CONCURRENCY PATTERNS Example: Map broadcasts queries collects replies in a list AL = {Map PL fun {$ P} {Send P query(foo $)} end} ------------------------------------------ Can you use FoldL as a concurrency pattern? V. lift control system example (5.5) VI. Using the message-passing model directly (5.6) A. port objects that share a single thread (5.6.1) ------------------------------------------ SHARING A SINGLE THREAD Protocol: {AddPortObject PO Proc} {Call PO Msg} System/scheduler created by NewPortObjects ------------------------------------------ ------------------------------------------ NEWPORTOBJECTS (FIG 5.14) declare proc {NewPortObjects ?AddPortObject ?Call} Sin P={NewPort Sin} proc {MsgLoop S1 Procs} case S1 of add(I Proc Sync)|S2 then Procs2 in Procs2={AdjoinAt Procs I Proc} Sync=unit {MsgLoop S2 Procs2} [] msg(I M)|S2 then try {Procs.I M} catch _ then skip end {MsgLoop S2 Procs} [] nil then skip end end in thread {MsgLoop Sin procs} end proc {AddPortObject I Proc} Sync in {Send P add(I Proc Sync)} {Wait Sync} end proc {Call I M} {Send P msg(I M)} end end ------------------------------------------ How does that work? What do you have to be careful of if you use this? What are the advantages and disadvantages? B. Concurrent queue How would you program a concurrent queue using ports? ------------------------------------------ CONCURRENT QUEUE NewQueue returns queue(put:PutProc get:GetProc) record % Example: in QueueTest.oz \insert 'NewQueue.oz' declare Q in thread Q={NewQueue} end {Q.put 1} {Browse {Q.get $}} {Browse {Q.get $}} {Browse {Q.get $}} {Q.put 2} {Q.put 3} ------------------------------------------ C. thread termination detection (5.6.3) (skip?) ------------------------------------------ TERMINATION DETECTION OF THREADS Problem: thread S end - can create new threads inside S - want to know when they all finish - no change to interfaces Specification: {NewThread P ?SubThread} -- creates thread to run P, initializes procedure SubThread -- returns only after P and all subthreads are finished {SubThread P2} -- creates a subthread running P2 ------------------------------------------ How would you write this? Is there a relative ordering defined on sends from different threads? D. Eliminating useless sequential dependencies (5.6.4) (skip) ------------------------------------------ FILTER HAS SEQUENTIAL DEPENDENCIES declare fun {Filter L F} case L of nil then nil [] X|Lr then if {F X} then X|{Filter Lr F} else {Filter Lr F} end end end declare A B thread Out={Filter [A 5 1 B 4 0 6] fun {$ X} X>2 end} end {Browse Out} ------------------------------------------ VII. The Erlang Language (5.7) ------------------------------------------ ERLANG (5.7) From Ericsson telecommunications - automatic memory management - hides internal representation of data - efficient threads - high-performance s/w fault tolerance - dynamically typed - first class functions - asynchronous message passing - used in Open Telecom Platform - updated while running ------------------------------------------ What other language do we know that hides the representation of data? A. computation model (5.7.1) ------------------------------------------ COMPUTATION MODEL program = process* processes = port + mailbox port = stream + recursive function pattern matching can - wait for messages - remove some from mailbox - leave others message passing asynchronous processes are independent no shared references between processes transparent distribution failure detection (linking), - sends message when another process fails replicated database ------------------------------------------ What features are useful for reliability? B. Semantics of Erlang (5.7.2) ------------------------------------------ CONCURRENCY AND MESSAGE PASSING PRIMITIVES IN ERLANG PID = spawn(M,F,A) -- creates new process with id PID running module M's function F with argument list A Pid!Msg -- sends Msg to process with id Pid receive Pattern1 [when Guard1] -> Body1; ... PatternN [when GuardN] -> BodyN; [ after Expr -> BodyT; ] -- blocks until a message matches one of the Patterns (with true guard) removes it from mailbox, binds values to pattern variables, executes the corresponding body -- the after clause is optional, it gives a timeout, after which BodyT is executed ------------------------------------------ ------------------------------------------ COMPARISON OF RECEIVE WITH CASE IN OZ ------------------------------------------ How are the parts of Erlang's "receive" like "case" in Oz? 1. translation into Oz ------------------------------------------ TRANSLATION INTO OZ process ~~> thread + port send to process ~~> send to port mailbox ~~> receive ~~> function on 2 streams (I/O) ------------------------------------------ a. without timeout ------------------------------------------ TRANSLATION OF RECEIVE WITHOUT TIMEOUT Let B, B1, B2, be Erlang body statements We simulate the effect of B on a list representing the Erlang mailbox's current contents: For a sequence B1 B2: T((B1 B2) Sin Sout) ~~> local Sout2 in T(B1 Sin Sout2) T(B2 Sout2 Sout) end For a simple (atomic) statement, where receive doesn't appear in B: T(B Sin Sout) ~~> B Sout=Sin For a receive statement: T(receive P1 -> B1; ... PN -> BN; Sin Sout) ~~> local fun {Loop S L#E Sout} case S of M|S1 then case M of T(P1) then E=S1 T(B1 L Sout) ... [] T(PN) then E=S1 T(BN L Sout) else E1 in E=M|E1 {Loop S1 L#E1 Sout} end end end L in {Loop Sin L#L Sout} end ------------------------------------------ What is T? What is L? What is S1 above? What is L#L doing? How would you deal with the guards? ------------------------------------------ EXAMPLE TRANSLATION TO OZ Consider the Erlang receive {ones, X} -> R=X; {tens, Y} -> R=Y*10; receive {factor, Z} -> Ans=Z*R The translation of this into Oz using Sin and Sout would be: T(receive {ones, X} -> R=X; {tens, Y} -> R=Y*10; receive {factor, Z} -> Ans=Z*R Sin Sout) ~~> (by rule for sequencing) local Sout2 in T(receive {ones, X} -> R=X; {tens, Y} -> R=Y*10 Sin Sout2) T(receive {factor, Z} -> Ans=Z*R Sout2 Sout) end Let's focus on the first one of these: T(receive {ones, X} -> R=X; {tens, Y} -> R=Y*10 Sin Sout2) ~~> (by rule for receive, translating patterns) local fun {Loop S L#E Sout} case S of M|S1 then case M of ones(X) then E=S1 T(R=X L Sout) [] tens(Y) then E=S1 T(R=Y*10 L Sout) else E1 in E=M|E1 {Loop S1 L#E1 Sout} end end end L in {Loop Sin L#L Sout2} end ~~> (by rule for atomic statements, twice) local fun {Loop S L#E Sout} case S of M|S1 then case M of ones(X) then E=S1 R=X Sout=L) [] tens(Y) then E=S1 R=Y*10 Sout=L else E1 in E=M|E1 {Loop S1 L#E1 Sout} end end end L in {Loop Sin L#L Sout2} end ------------------------------------------ ------------------------------------------ EXAMPLE WITH DATA Suppose the initially the mailbox is (in Oz list notation): factor(7)|tens(4)|END What happens when we execute this with Sin = factor(7)|tens(4)|END Sout2 = _ ------------------------------------------ b. with timeout How would you handle a non-zero timeout? How would you handle a zero timeout? VIII. Advanced Topics (5.8) (skip) A. nondeterministic concurrent model (5.8.1) ------------------------------------------ NONDETERMINISTIC CONCURRENT MODEL (5.8.1) Used in concurrent logic programming AKA: process model of logic programming = declarative concurrent model + WaitTwo {WaitTwo X Y} == 1 if X is defined {WaitTwo X Y} == 2 if Y is defined blocks otherwise ------------------------------------------ Can this be programmed in the declarative concurrent model? If we add exceptions? IsDet? Ports? What does this allow us to program? ------------------------------------------ PROBLEMS Practical programming problems: Inefficient - stream mergers require a thread Not very expressive - can't reference server directly can only reference stream ------------------------------------------