I. Message-Passing Concurrency (Ch 5) A. quiz ------------------------------------------ QUIZ FOR CHAPTER 5 [message passing] What is message passing? What about it is asynchronous? How is it used? [ports] What is a port? How does it overcome the limitations of the declarative concurrent model? [semantics] Why is a mutable store needed to describe the semantics of NewPort and Send? [port objects] What is a port object? How is it different from a port? Can port objects have state? [agents] What is an agent? [RMI] How do you simulate RMI using Send? How to simulate asynchronous RMI? [callbacks] How to implement callbacks? [Exeptions] What happens if the server encounters an exception? [Design] How do you design a multiagent system? [Design] In what way do list operations correspond to concurrency patterns? [Architecture] What if you don't want a lot of threads? [Data] How would you program a concurrent queue using ports? [Termination] How can you detect termination or other resource use in a modular way? [Erlang] What are the features of Erlang that make it interesting? [Erlang design] What is useful about Erlang's combination of features? [Erlang's model] How does the Erlang model differ from that of Oz? How are mailboxes modeled? [Erlang semantics] What does receive do? How would it be simulated in Oz? [Nondeterministic model] What is the nondeterministic concurrent model? How does it differ from the declarative concurrent model? From the message passing model? What are its problems? ------------------------------------------ II. Message-Passing Concurrency (Ch 5) A. motivation ------------------------------------------ MESSAGE-PASSING CONCURRENCY (5) def: message passing is a programming style in which a program consists of independent entities that interact by sending messages asynchronously Importance: ------------------------------------------ What is asynchronous? What's the importance of message passing? B. concepts ------------------------------------------ CONCEPTS new idea: asynchronous communication channel - named - all clients can always send - server can read all messages Fundamentally nondeterministic Port: channel with an associated stream send = append to stream read = read from stream ------------------------------------------ C. operational semantics ------------------------------------------ SYNTAX Syntax: ::= ... | {NewPort X Y} | {Send X Y} {NewPort S P} creates port named P with stream S {Send P X} appends X to stream corresponding to P ------------------------------------------ Why can't we write this in the declarative model? ------------------------------------------ MUTABLE STORE m in MutableStore = Variable -> Variable Operations: {}: MutableStore update: MutableStore -> Variable x Variable -> MutableStore lookup: MutableStore x Variable -> Variable lookup(update(m)(y,x), z) = if z == y then x else lookup(m,z) Notation: write x:y for a binding in MutableStore typical m is {p1:s1, p2:s2} update({p1:s1})(p2, s2) = {p1:s1, p2:s2} lookup({p1:s1, p2:s2}, p1) = s1 ------------------------------------------ ------------------------------------------ SEMANTICS Sequential (-d->) configurations: (ST,s,m) in State = Stack x Store x MutableStore + Message Message = String Stack = ( x Environment)* Store = Variable -> Value T = Message + { (nil,s,m) | s in Store, m in MutableStore} [NewPort call] (({NewPort X Y},E)|Rest, s, m) -d-> (Rest, s', m') where unbound(s,E(Y)) and n is a port name and n not in range(s) and s' = bind(s)(s(E(Y)),n) and m' = update(m)(E(Y), E(X)) [Send call] (({Send X Y},E)|Rest, s, m) -d-> (Rest, s'', m') where determined(s,E(X)) and s(E(X)) is a port name and lookup(m, E(X)) = z and z' = next(s) and s' = alloc(s) and m' = update(m)(E(X), z') and l = '|'(E(Y) z') and s'' = bind(s')(s'(z), l) ------------------------------------------ What should happen if we do {NewPort S P} and P is already bound? What should happen if we do {Send P Y} and P is not a port? What should happen if we do {Send P Y} and P is unbound? What should happen if we do {Send P Y} and Y is unbound? What should happen in [Send call] if m(E(X)) is not defined? Does this affect garbage collection? III. Port Objects (5.2) A. abstraction ------------------------------------------ PORT OBJECTS (5.2) def: a port object is a "combination of one or more ports and a stream object" Extends stream objects by: - allowing multiple - can be embedded in data structures Example: declare P in local S in {NewPort S P} thread {ForAll S Browse} end end ------------------------------------------ How would you make an abstraction of this kind of thing? ------------------------------------------ MAKING PORTS WITH STATE declare fun {NewPortObject Init Fun} Sin Sout in thread {FoldL Sin Fun Init Sout} end {NewPort Sin} end ------------------------------------------ How does {NewPortObject [a b c] F 0} execute? What does NewPortObject return? What happens to the final state, if any? ------------------------------------------ MAKING STATELESS PORTS declare fun {NewPortObject2 Proc} Sin in thread for Msg in Sin do {Proc Msg} end end {NewPort Sin} end ------------------------------------------ 1. example (5.2.2) ------------------------------------------ TO DO Write NewMerge: }: > such that the following code can be used: \insert 'NewMerge.oz' declare P S thread {Browse S} end P={NewMerge S} thread for I in 1..10 do {Send P I} {Delay 500} end end thread for J in 201..210 do {Send P J} {Delay 300} end end ------------------------------------------ 2. Reasoning (5.2.3) IV. Simple message protocols (5.3) What's a protocol? A. RMI (5.3.1) ------------------------------------------ RMI (= RPC) % Example: file RMI.oz proc {ServerProc Msg} case Msg of calc(X Y) then Y=X*X+2.0*X+2.0 end end Server = {NewPortObject2 ServerProc} proc {ClientProc Msg} case Msg of work(Y) then Y1 Y2 in {Send Server calc(10.0 Y1)} {Wait Y1} {Send Server calc(20.0 Y2)} {Wait Y2} Y=Y1+Y2 end end Client = {NewPortObject2 ClientProc} {Browse {Send Client work($)}} ------------------------------------------ Are these port objects symmetric? Are message executed concurrently by the server? B. Asynchronous RMI (5.3.2) How would you make the sends asynchronous? C. Callbacks (5.3.3-5) What's a callback? How would you implement callbacks? What if the client's callback has to call the server again? D. Exceptions (5.3.6) What would you do if the server can encounter exceptions? V. program design for concurrency (5.4) A. programming with concurrent compoents (5.4.1-2) ------------------------------------------ PROGRAM DESIGN (MULTIAGENT SYSTEMS) Requirements: system overall is a (set of) port(s) specify causal relations on its I/O streams Architecture model system is set of agents communicate by message passing components are procedures: when invoked, creates an instance, which is a port object that uses streams for I/O Detailed design: describe - state of each agent (if any) including state diagrams - interaction protocol - scheduling constraints Test it! ------------------------------------------ B. building blocks ------------------------------------------ LIST OPERATIONS AS CONCURRENCY PATTERNS Example: Map broadcasts queries collects replies in a list AL = {Map PL fun {$ P} {Send P query(foo $)} end} ------------------------------------------ Can you use FoldL as a concurrency pattern? VI. lift control system example (5.5) VII. Using the message-passing model directly (5.6) A. port objects that share a single thread (5.6.1) ------------------------------------------ SHARING A SINGLE THREAD Protocol: {AddPortObject PO Proc} {Call PO Msg} System/scheduler created by NewPortObjects ------------------------------------------ ------------------------------------------ NEWPORTOBJECTS declare proc {NewPortObjects ?AddPortObject ?Call} Sin P={NewPort Sin} proc {MsgLoop S1 Procs} case S1 of add(I Proc Sync)|S2 then Procs2 in Procs2={AdjoinAt Procs I Proc} Sync=unit {MsgLoop S2 Procs2} [] msg(I M)|S2 then try {Procs.I M} catch _ then skip end {MsgLoop S2 Procs} [] nil then skip end end in thread {MsgLoop Sin procs} end proc {AddPortObject I Proc} Sync in {Send P add(I Proc Sync)} {Wait Sync} end proc {Call I M} {Send P msg(I M)} end end ------------------------------------------ How does that work? What do you have to be careful of if you use this? What are the advantages and disadvantages? B. Concurrent queue How would you program a concurrent queue using ports? ------------------------------------------ CONCURRENT QUEUE NewQueue returns queue(put:PutProc get:GetProc) record % Example: \insert 'NewQueue.oz' declare Q in thread Q={NewQueue} end {Q.put 1} {Browse {Q.get $}} {Browse {Q.get $}} {Browse {Q.get $}} {Q.put 2} {Q.put 3} ------------------------------------------ C. thread termnation detection (5.6.3) ------------------------------------------ TERMINATION DETECTION OF THREADS Problem: thread S end - can create new threads inside S - want to know when they all finish - no change to interfaces Specification: {NewThread P ?SubThread} -- creates thread to run P, initializes procedure SubThread -- returns only after P and all subthreads are finished {SubThread P2} -- creates a subthread running P2 ------------------------------------------ How would you write this? Is there a relative ordering defined on sends from different threads? D. Eliminating useless sequential dependencies (5.6.4) ------------------------------------------ FILTER HAS SEQUENTIAL DEPENDENCIES declare fun {Filter L F} case L of nil then nil [] X|Lr then if {F X} then X|{Filter Lr F} else {Filter Lr F} end end end declare A B thread Out={Filter [A 5 1 B 4 0 6] fun {$ X} X>2 end} end {Browse Out} ------------------------------------------ VIII. The Erlang Language (5.7) ------------------------------------------ ERLANG (5.7) From Ericsson telecommunications - automatic memory management - hides internal representation of data - efficient threads - high-performance s/w fault tolerance - dynamically typed - first class functions - asynchronous message passing - used in Open Telecom Platform - updated while running ------------------------------------------ What other language do we know that hides the representation of data? A. computation model (5.7.1) ------------------------------------------ COMPUTATION MODEL program = process* processes = port + mailbox port = stream + recursive function pattern matching can - wait for messages - remove some from mailbox - leave others message passing asynchronous processes are independent no shared references between processes transparent distribution failure detection (linking), - sends message when another process fails replicated database ------------------------------------------ What features are useful for reliability? B. Semantics of Erlang (5.7.2) ------------------------------------------ CONCURRENCY AND MESSAGE PASSING PRIMITIVES IN ERLANG PID = spawn(M,F,A) -- creates new process with id PID running module M's function F with argument list A Pid!Msg -- sends Msg to process with id Pid receive Pattern1 [when Guard1] -> Body1; ... PatternN [when GuardN] -> BodyN; [ after Expr -> BodyT; ] -- blocks until a message matches one of the Patterns (with true guard) removes it from mailbox, binds values to pattern variables, executes the corresponding body -- the after clause is optional, it gives a timeout, after which BodyT is executed ------------------------------------------ 1. translation into Oz ------------------------------------------ TRANSLATION INTO OZ process ~~> thread + port send to process ~~> send to port mailbox ~~> receive ~~> function on 2 streams (I/O) ------------------------------------------ a. without timeout ------------------------------------------ TRANSLATION OF RECEIVE WITHOUT TIMEOUT T(B Sin Sout) ~~> B Sout=Sin where receive doesn't appear in B T(receive P1 -> B1; ... PN -> BN; Sin Sout) ~~> local fun {Loop S L#E Sout} case S of M|S1 then case M of T(P1) then E=S1 T(B1 L Sout) ... [] T(PN) then E=S1 T(BN L Sout) else E1 in E=M|E1 {Loop S1 T#E1 Sout} end end end L in {Loop Sin L#L Sout} end ------------------------------------------ What is T? What is L? What is S1 above? What is L#L doing? How would you deal with the guards? b. with timeout How would you handle a non-zero timeout? How would you handle a zero timeout? IX. Advanced Topics (5.8) A. nondeterministic concurrent model (5.8.1) ------------------------------------------ NONDETERMINISTIC CONCURRENT MODEL (5.8.1) Used in concurrent logic programming AKA: process model of logic programming = declarative concurrent model + WaitTwo {WaitTwo X Y} == 1 if X is defined {WaitTwo X Y} == 2 if Y is defined blocks otherwise ------------------------------------------ Can this be programmed in the declarative concurrent model? If we add exceptions? IsDet? Ports? What does this allow us to program? ------------------------------------------ PROBLEMS Practical programming problems: Inefficient - stream mergers require a thread Not very expressive - can't reference server directly can only reference stream ------------------------------------------