''' ========================================================================== Python for Parallelism in Introductory Computer Science Education SC '13 HPC Educators Program Steven Bogaerts, Wittenberg University Joshua Stough, Washington and Lee http://www.joshuastough.com/SC13 MIT License: see README_LICENSE.txt file: communication.py author: bogaerts Summary:These examples demonstrate the use of a queue to pass information between processes. ========================================================================== ''' from multiprocessing import * ''' ------------------------- Communicating Via a Queue ------------------------- This simple example shows one process receiving data from another via a queue. ''' def greet(q): print "(child process) Waiting for name..." name = q.get() print "(child process) Well, hi", name def sendName(): q = Queue() p1 = Process(target=greet, args=(q,)) p1.start() time.sleep(5) # wait for 5 seconds print "(main process) Ok, I'll send the name" q.put("Jimmy") ''' KEY IDEAS - A queue is like standing in line. The first thing in is the first thing out. - put and get can be used for any processes to communicate by sending and receiving data. - If a process tries to get from an empty queue, it will sleep until some other process puts something on the queue. ''' ''' Exercise: Copy the code above as a basis for greet2 and sendName2. Modify the code so that greet2 expects to receive 5 names, which are sent by sendName2. Each function should accomplish this by sending/receiving one name at a time, in a loop. ''' ''' def greet2(): for 5 times get name from queue say hello def sendName2(): queue make a child process, give it the queue start it for 5 times sleep for a bit put another name in the queue ''' from random import randint def greet2(q): for i in range(5): print print "(child process) Waiting for name", i name = q.get() print "(child process) Well, hi", name def sendName2(): q = Queue() p1 = Process(target=greet2, args=(q,)) p1.start() for i in range(5): sleep(randint(1,4)) print "(main process) Ok, I'll send the name" q.put("George"+str(i)) ''' KEY IDEAS - Sometimes you can get by without using locks if processes are waiting for other reasons. - import moduleName means I have to say moduleName.functionName (or whatever). from moduleName import functionName means I can just say functionName ''' ''' This example illustrates the join method. ''' def slowpoke(lock): sleep(10) lock.acquire() print "Slowpoke: Ok, I'm coming" lock.release() def haveToWait(): lock = Lock() p1 = Process(target=slowpoke, args=(lock,)) p1.start() print "Waiter: Any day now..." p1.join() print "Waiter: Finally! Geez." ''' KEY IDEAS - p.join() makes the currently-running process wait for p to finish. Be careful! It's not the other way around. - join will slow things down (since it makes a process wait), but it can be used to enforce a particular order of activity. - Anonymous processes can't be used with join, because you have to both start it and join with it, which means you'll need a variable to be able to refer to the process. ''' ''' Exercise: Consider two functions: addTwoNumbers, and addTwoPar. addTwoNumbers takes two numbers as arguments, adds them, and places the result on a queue (which was also passed as an argument). addTwoPar asks the user to enter two numbers, passes them and a queue to addTwoNumbers in a new process, waits for the result, and then prints it. Finish addTwoPar. ''' def addTwoNumbers(a, b, q): # sleep(5) # In case you want to slow things down to see what is happening. q.put(a+b) def addTwoPar(): x = input("Enter first number: ") y = input("Enter second number: ") q = Queue() p1 = Process(target=addTwoNumbers, args=(x, y, q)) p1.start() # p1.join() result = q.get() print "The sum is:", result ''' KEY IDEAS - This illustrates the bigger concept of making a child process to do some (possibly complex) task while the parent goes on to other things. You could even have multiple child processes working on parts of the problem. - You don't need the join here, because the q.get() will wait until the child puts something on the queue anyway. ''' ''' -------------------------------------------- Using a Queue to Merge Child Process Results -------------------------------------------- This example demonstrates child processes sending information back to the parent in order to obtain a final answer. ''' from random import randint import time def addManyNumbers(numNumbers, q): s = 0 for i in range(numNumbers): s = s + randint(1, 100) q.put(s) def addManyPar(): totalNumNumbers = 1000000 q = Queue() p1 = Process(target=addManyNumbers, args=(totalNumNumbers/2, q)) p2 = Process(target=addManyNumbers, args=(totalNumNumbers/2, q)) startTime = time.time() p1.start() p2.start() # p1.join() # p2.join() answerA = q.get() answerB = q.get() endTime = time.time() timeElapsed = endTime - startTime print "Time:", timeElapsed print "Sum:", answerA + answerB ''' KEY IDEAS - This demonstrates the idea of splitting a task up evenly among multiple processes. Each one reports back to the parent processes via a queue. ''' ''' ------------------------ Sharing a Data Structure ------------------------ This example illustrates that memory is not shared between processes, so mutation of arguments does not occur in the same way as in a single- process program. ''' import random ''' Sequential program showing mutation of a parameter, to serve as a reminder to students. ''' def addItem(ls): ls.append(random.randint(1,100)) def sequentialDS(): ls = [] addItem(ls) addItem(ls) print ls ''' Attempting to do the same thing in parallel does not produce the same results. ''' # Doesn't work as you might hope! def parallelShareDS1(): ls = [] p1 = Process(target = addItem, args = (ls,)) p2 = Process(target = addItem, args = (ls,)) p1.start() p2.start() p1.join() p2.join() print ls ''' Communication between processes must occur via special structures, such as a queue. ''' def addItem2(ls, q): q.put(random.randint(1,100)) def parallelShareDS2(): ls = [] q = Queue() p1 = Process(target = addItem2, args = (ls,q)) p2 = Process(target = addItem2, args = (ls,q)) p1.start() p2.start() # Now do whatever we need to with the results from the spawned processes ls.append(q.get()) ls.append(q.get()) print ls ''' ------------- Shared Memory ------------- Shared memory is considered "unreliable" in the multiprocessing module. ''' def setVar(var): var[0] = "Jimmy" def sharingVariables(): manager = Manager() var = [0] var = manager.list(var) p = Process(target=setVar, args=(var,)) p.start() p.join() print var[0] #if __name__ == '__main__': # sendMessage() # sendName() # sendName2() # addPar() # sequentialDS() # parallelShareDS1() # parallelShareDS2()