''' ========================================================================== Python for Parallelism in Introductory Computer Science Education SC '13 HPC Educators Program Steven Bogaerts, Wittenberg University Joshua Stough, Washington and Lee http://www.joshuastough.com/SC13 MIT License: see README_LICENSE.txt file: applications.py author: bogaerts Summary: Various applications, like K-Medoid Clustering and pipelined floating point addition. ========================================================================== ''' ############################################################################ ''' ---------------------- Timing Message Passing ---------------------- This example allows students to experiment with the time it takes to pass messages of various sizes. Students may be surprised to learn that even very long strings can be passed quite quickly. In fact, the execution time is dominated by the time to construct the message, not pass it. Prerequisites: - Simple for loops - Accumulation on a string - Writing and using multiple functions, including returning from a function ''' import time def sender(m, q, numMessages): for i in range(numMessages): q.put(m) def receiver(q, numMessages): for i in range(numMessages): q.get() def makeMessage(messageSize): s = "" for i in range(messageSize): s = s + "a" return s def timeMessagePassing(): messageSize = 10000000 # Play with this number and observe the results print "Making message...", m = makeMessage(messageSize) print "done!" numMessages = 10 send = True q = Queue() p1 = Process(target=sender, args=(m, q, numMessages)) p2 = Process(target=receiver, args=(q, numMessages)) startTime = time.time() p1.start() p2.start() p1.join() p2.join() timeElapsed = time.time() - startTime print "Time elapsed:", timeElapsed print "Average per message:", timeElapsed/numMessages ############################################################################## ''' ----------------- Quadratic Formula ----------------- The quadratic formula, while too simple to be usefully parallelized, is simple and familiar to students. Furthermore, there are many ways it can be parallelized, leaving many options for students to explore. Here is one possible implementation. If students do not actually program multiple quadratic formula implementations, a useful written exercise might be for them to outline multiple parallelization strategies. Prerequisites: - Writing and using multiple functions - If statements ''' import math from multiprocessing import * def computeDisc(a,b,c,q): q.put(b*b - 4*a*c) def compute2a(a,q): q.put(2*a) ''' mult is 1 or -1, for the "plus or minus" part of the quadratic formula ''' def computeQuad(b, disc, twoA, mult, q): q.put((-b + mult*math.sqrt(disc))/twoA) def quadPar(): a,b,c = input("a,b,c: ") q1 = Queue() q2 = Queue() p1 = Process(target=computeDisc, args=(a,b,c,q1)) p2 = Process(target=compute2a, args=(a,q2)) p1.start() p2.start() p1.join() p2.join() disc = q1.get() twoA = q2.get() if disc >= 0: p1 = Process(target=computeQuad, args=(b, disc, twoA, 1, q1)) p2 = Process(target=computeQuad, args=(b, disc, twoA, -1, q2)) p1.start() p2.start() p1.join() p2.join() print "The roots are", q1.get(), "and", q2.get() else: print "No real roots." # ============================================================================ ''' ------------------------- Finding the Nearest Point ------------------------- This exercise statement is based on p. 92, exercise 3.15 of "Parallel Programming in C with MPI and OpenMP" by Michael J. Quinn. You are given an array of n records, each containing the x and y coordinates of a house. You are also given the x and y coordinates of a railroad station. Design a parallel algorithm to find the house closest to the railroad station (as the crow flies). Prerequisites: - Lists - Writing and using multiple functions with return values - Simple for loops - If statements - Accumulation in a list - ''' from multiprocessing import * import math import time import random ''' Compute the distance between two points. ''' def distance(p1, p2): d0 = p1[0] - p2[0] d1 = p1[1] - p2[1] return math.sqrt(d0*d0 + d1*d1) ''' ------------------------------------------------------------------------ Find the home nearest to the station, sequentially. ''' def findNearestSeq(houses, station): closestID = findNearestSeqHelper(houses, station) print "(Seq) The house at", houses[closestID], "is closest to", station def findNearestSeqHelper(houses, station): closestID = 0 closestDist = distance(houses[closestID], station) for currID in range(len(houses)): currDist = distance(houses[currID], station) if currDist < closestDist: closestDist = currDist closestID = currID return closestID ''' ------------------------------------------------------------------------ Find the home nearest to the station, in parallel. ''' def findNearestPar(houses, station): q1 = Queue() q2 = Queue() p1 = Process(target = findNearestParHelper, args = (houses[:len(houses)/2], station, q1)) p2 = Process(target = findNearestParHelper, args = (houses[len(houses)/2:], station, q2)) p1.start() p2.start() p1.join() p2.join() houseID1 = q1.get() houseID2 = q2.get() + len(houses)/2 if distance(houses[houseID1], station) < distance(houses[houseID2], station): closestID = houseID1 else: closestID = houseID2 print "(Par) The house at", houses[closestID], "is closest to", station def findNearestParHelper(houses, station, q): closestID = 0 closestDist = distance(houses[closestID], station) for currID in range(len(houses)): currDist = distance(houses[currID], station) if currDist < closestDist: closestDist = currDist closestID = currID q.put(closestID) ''' ------------------------------------------------------------------------ Compare the performance of the sequential and parallel algorithms for finding the home nearest to the station. ''' def findNearestCompareSeqPar(): houses, station = generateHousesAndStation(1000) print station print houses print startTime = time.time() findNearestSeq(houses, station) elapsedTime = time.time() - startTime print "Time:", elapsedTime print startTime = time.time() findNearestPar(houses, station) elapsedTime = time.time() - startTime print "Time:", elapsedTime def generateHousesAndStation(numHouses): # houses = [[5,6],[6,5],[7,5],[5,8],[7,9],[20,19]] # station = [18,17] houses = [] for i in range(numHouses): houses.append([random.randint(0, 100), random.randint(0, 100)]) station = [random.randint(0, 100), random.randint(0, 100)] return houses, station # ======================================================================= ''' ----------------------- Finding the Largest Key ----------------------- This program finds the largest key from a distinct-valued list, (no duplicates) by splitting up the list and distributing to spawned processes. Two algorithms for generating the distinct-valued list are provided. ''' from multiprocessing import * import random ''' Generates a list with size integers, between min and max, inclusive. This is done by re-generating random numbers until unique ones are obtained. ''' def generateDistinctValuedList1(min, max, size): if max-min < size: return None # Not enough values to make a distinct-valued list! ls = [] while len(ls) < size: num = random.randint(min, max) if num not in ls: ls.append(num) return ls ''' Generates a list with size integers, between min and max, inclusive. This is done by generating a list of all numbers in that range, and then randomly removing items from it, to be put in a second list. ''' def generateDistinctValuedList2(min, max, size): if max-min < size: return None # Not enough values to make a distinct-valued list! result = [] remainingNums = range(min, max) for i in range(size): pos = random.randint(0, len(remainingNums)-1) result.append(remainingNums.pop(pos)) return result def findLargestInSublist(sublist, q): print "Find largest:", sublist largest = sublist[0] for i in range(1, len(sublist)): if largest < sublist[i]: largest = sublist[i] q.put(largest) def findLargest(): ls = generateDistinctValuedList2(1,100,50) numProcesses = 3 # Must be at least 2 numsPerProc = len(ls) / numProcesses # It's ok if this doesn't divide evenly queue = Queue() # Create and start the processes procs = [] for procID in range(numProcesses-1): procs.append(Process(target = findLargestInSublist, args = (ls[procID*numsPerProc: (procID+1)*numsPerProc], queue))) procs[procID].start() # Put all remaining in last processes (in case uneven amount distributed among processes) procID = procID + 1 procs.append(Process(target = findLargestInSublist, args = (ls[procID*numsPerProc:], queue))) procs[procID].start() # Wait for the processes to finish for procID in range(numProcesses): procs[procID].join() # Get the results from the queues and report largest = ls[0] for i in range(numProcesses): num = queue.get() print "Largest in a group:", num if num > largest: largest = num print "Largest:", largest # Do the following, just to verify ls.sort() print ls # ======================================================================== ''' ---------------- Pattern Matching ---------------- Find pattern in a large piece of text, by splitting the text up into separate pieces, one per process. The separate pieces must overlap slightly, in order to catch any pattern matches that straddle a border between two pieces. Prerequisites: - Nested for loops - More complex string indexing and slicing - If statements ''' from multiprocessing import * ''' Search for the pattern in the given (sub)text ''' def findStringP(pattern, text, q): foundLocations = [] for i in range(len(text)): if text[i:i+len(pattern)] == pattern: foundLocations.append(i) q.put(foundLocations) def findString(): pattern = "gct" text = "gcagcatcgagctagctagagcttcagctactgatcgacgatcgacctagtcgctactgctcttcgaacgctaactgctaa" numProcesses = 3 # There must be some overlap between the text of the different processes, to handle pattern # matches that straddle a border overflowTextAmount = len(pattern) - 1 textPerProc = len(text)/numProcesses # Won't necessarily divide evenly, but that's ok procs = [] queues = [] # Create the queues, and create and start the processes for procID in range(numProcesses): queues.append(Queue()) procs.append(Process(target = findStringP, args = (pattern, text[procID*textPerProc: (procID+1)*textPerProc+overflowTextAmount], queues[procID]))) procs[procID].start() # Wait for the processes to finish for procID in range(numProcesses): procs[procID].join() # Get the results from the queues, merge together, and report foundLocations = [] for procID in range(numProcesses): foundFromCurrProc = queues[procID].get() for loc in foundFromCurrProc: foundLocations.append(loc + procID * textPerProc) print foundLocations ############################################################################ ''' ------------------- K-Medoid Clustering ------------------- This example demonstrates a parallel clustering algorithm. The pseudocode for the algorithm is as follows: - Arbitrarily select k points to be the "centers" of k groups. - Repeat - Assign every non-center point to whichever group has the closest center. - Within each group, find the point with the minimum average distance to all other points in the group. Make this point the new center. - Until no changes to group centers There are many opportunities for parallelism in this algorithm. In this example, the process of selecting a new center for each group is parallelized. The data to cluster can be obtained from the students. For example, students can fill out a survey: From 1 to 10, 1 being not at all interested, 10 being extremely interested, how interested are you in: 1) Sports 2) Music This will give a collection of 2-dimensional data points to cluster. Prerequisites: - For loops - If statements - Writing and using multiple functions, including returning from a function - Accumulation of a sum - Lists of lists ''' import math ''' Compute the distance between two points. ''' def distance(p1, p2): d0 = p1[0] - p2[0] d1 = p1[1] - p2[1] return math.sqrt(d0*d0 + d1*d1) ''' Add the given dataPoint to the proper group with the given centers. Questions about addToNearestGroup(): Question: How might the process of adding data points to groups be parallelized? Answer: A process could be spawned for each point (or for disjoint subsets of points), which is then added to the correct group. Question: Why might this be a little more challenging than other parallelization strategies we've seen? Answer: This could be more challenging because it requires each process to share the groups data structure. ''' def addToNearestGroup(groups, centers, dataPoint): minDist = 999 minGroupID = None for currGroupID in range(len(groups)): currDist = distance(centers[currGroupID], dataPoint) if currDist < minDist: minDist = currDist minGroup = currGroupID groups[minGroup].append(dataPoint) ''' Compute the average distance of all points in a group to the given center. ''' def averageDistance(group, center): sum = 0 for pt in group: sum = sum + distance(pt, center) return float(sum) / len(group) ''' Of all points in a group, choose one to be the center. Actually, this operation could be parallelized as well. ''' def chooseNewCenterForGroup(group, center, q): minDist = 999 minCenter = None for currPt in group: # For each possible center in a group currDist = averageDistance(group, currPt) if currDist < minDist: minDist = currDist minCenter = currPt q.put(minCenter) ''' Choose a new center for each group, in parallel. ''' def chooseNewCenters(groups, centers): changes = False procs = [] queues = [] # Start one process for each group for i in range(len(groups)): # For each group queues.append(Queue()) procs.append(Process(target=chooseNewCenterForGroup, args=(groups[i], centers[i], queues[i]))) procs[i].start() # Wait for processes to finish for p in procs: p.join() # Get results for i in range(len(queues)): newCenter = queues[i].get() # Get result for process/group i if centers[i] != newCenter: # If they're not already equal changes = True centers[i] = newCenter return changes ''' Below are sequential versions of the above functions, for reference. ''' ''' def chooseNewCenterForGroup(group, center): minDist = 999 minCenter = None for currPt in group: # For each possible center in a group currDist = averageDistance(group, currPt) if currDist < minDist: minDist = currDist minCenter = currPt return minCenter def chooseNewCenters(groups, centers): changes = False for i in range(len(groups)): # For each group newCenter = chooseNewCenterForGroup(groups[i], centers[i]) if centers[i] != newCenter: # If they're not already equal changes = True centers[i] = newCenter return changes ''' ''' Create the groups data structure. ''' def makeEmptyGroups(k): groupMembers = [] for i in range(k): groupMembers.append([]) return groupMembers ''' The main clustering algorithm. ''' def cluster(): allData = [[1,1], [2,3], [8,7], [5,1], [5,2], [8,9], [1,2], [8,8], [6,1], [9,9], [7,9]] k = 3 # The number of groups # Choose initial centers arbitrarily centers = allData[:k] changes = True # Go into loop at least once while(changes): # Loop until no changes in center of each group groupMembers = makeEmptyGroups(k) # Reset the groups data structure # Put each point in the appropriate group for dataPoint in allData: addToNearestGroup(groupMembers, centers, dataPoint) # Now, groupMembers organizes data points (including centers) by group, # and centers has the center for each group # Choose new centers for the now-populated groups changes = chooseNewCenters(groupMembers, centers) print groupMembers print print centers return groupMembers, centers ''' Shows a graphical representation of the points, with centers colored in red. Uses John Zelle's graphics module. from graphics import * def drawCluster(): groupMembers, centers = cluster() win = GraphWin("Cluster Graph") win.setCoords(0, 0, 11, 11) for groupID in range(len(groupMembers)): for pt in groupMembers[groupID]: Point(pt[0], pt[1]).draw(win) cPoint = Point(centers[groupID][0], centers[groupID][1]) cPoint.setFill("red") cPoint.draw(win) x = input("Press enter to exit") ''' ''' ============================================================================= A Pipeline for Floating-Point Addition This example is adapted from "Parallel Programming on the Wittenberg Advanced Research Processors (WARP) Cluster" by Jim Giuliani and J. L. Noyes. This example illustrates pipeline parallelism, applied to the adding of pairs of positive floating-point numbers. Each floating-point number is represented as .d1d2d3...*B^p. For example, .5432 * 10^2. The d's are the mantissa digits. The p is the exponent. The multi-step process of adding a pair of floating-point numbers is shown below: 1) Modify the mantissa and exponent of the number with the smaller exponent, so that both exponents are the same. 2) Add the mantissas together. It may also be necessary to normalize the sum so that the most significant digit is strictly greater than 0. 3) Round the sum to produce the final result. For example: .5432*10^2 + .9876*10^3 1) Modify: .05432*10^3 and .98760*10^3 2) Add: .05432*10^3 + .98760*10^3 = 1.04192*10^3 Normalize: .104192*10^4 3) Round: .1042*10^4 These steps can be used to form a pipeline to add many pairs of floating point numbers. This is a lengthy example. It can be broken down into parts, perhaps with students given some of the code ahead of time. Alternatively, students could be given all the code for the pieces, and simply be instructed to construct a pipeline from the pieces. Once students are familiar with writing classes, a nice exercise for this example might be to write a very simple FloatNumber class that encapsulates the mantissa and exponent along with some basic methods, rather than exposing all of this through the use of strings. Prerequisites: - Managing comparatively large programs, black-box thinking - Writing and using multiple functions, - Returning multiple values from a function and multiple assignment - For loops - If statements - String indexing and slicing - Accumulation of a string - Accumulation in a list - Lists of lists ''' ''' --------------------------------------------------------------------------- 1) Modify m1 is the mantissa, and e1 the exponent, for the first floating point number. m2 is the mantissa, and e2 the exponent, for the second floating point number. ''' def modify(m1, e1, m2, e2): # Get the difference in the exponent expDiff = abs(e1-e2) if e1 < e2: # add zeroes to left of m1, and change exponent to match m1 = addZeroesLeft(m1, expDiff) e1 = e2 # add zeroes to right of m2 (doesn't change number at all) m2 = addZeroesRight(m2, expDiff) else: # add zeroes to left of m2, and change exponent to match m2 = addZeroesLeft(m2, expDiff) e2 = e1 # add zeroes to right of m1 (doesn't change number at all) m1 = addZeroesRight(m1, expDiff) return m1, e1, m2, e2 ''' The following two functions provide students a good example of the subtle differences in two accumulation actions. ''' def addZeroesLeft(m, amount): for i in range(amount): m = "0" + m return m def addZeroesRight(m, amount): for i in range(amount): m = m + "0" return m ''' --------------------------------------------------------------------------- 2) Add Add the corresponding digits of two mantissas, a and b, each of which has the same exponent, e. ''' def add(m1, m2, e): # m1 and m2 should be the same length result = "" carry = 0 for i in range(len(m1)-1, -1, -1): # Start at the right end of the strings digitSum = carry + int(m1[i]) + int(m2[i]) # Add corresponding digits if digitSum < 10: # No need to carry result = str(digitSum) + result # concatenate sum to the result string carry = 0 else: # Carry a 1 result = str(digitSum - 10) + result # concatenate sum to the result string carry = 1 # Handle possibility of one last carry, left of the leftmost digits if carry == 1: result = str(carry) + result return result, e+1 # Normalize, because of this final carry else: return result, e ''' --------------------------------------------------------------------------- 3) Round Round the value m * 10^e to get the desired number of digits. ''' def round(m, e, goalDigits): if len(m) == goalDigits: # No rounding required return m, e else: mostSig = m[:goalDigits] # Get the most significant digits # Check next digit if int(m[goalDigits]) < 5: # Round down (so no changes in numbers) return mostSig, e else: # Make a string of all 0's, with a one at the end, # to add to addIn = "" for i in range(goalDigits-1): addIn = addIn + "0" addIn = addIn + "1" mostSig, e = add(mostSig, addIn, e) # Might have to normalize even from rounding up! return mostSig, e ''' --------------------------------------------------------------------------- A test function to add a single pair of floating-point numbers. ''' def floatAdd(): ''' m1 = "5432" e1 = 2 m2 = "9876" e2 = 3 # ans: 1042 4 ''' ''' m1 = "456" e1 = 3 m2 = "999" e2 = 5 # ans: 100 6 ''' m1 = '50002' e1 = 4 m2 = '44078' e2 = 4 numDigits = len(m1) m1, e1, m2, e2 = modify(m1, e1, m2, e2) print m1, m2 # Now, e1 == e2, and m1 and m2 still have the same number of digits sum, e = add(m1, m2, e1) print sum, e # Now, sum is the mantissa of the sum, and e is the exponent sum, e = round(sum, e, numDigits) print sum, e # Now, the mantissa has been properly rounded import random ''' --------------------------------------------------------------------------- Functions to generate random numbers to add. Makes a random string of numbers to represent the mantissa ''' def makeRandomMantissa(numDigits): m = "" # First digit can't be 0 m = str(random.randint(1, 9)) # Possibly more random digits for i in range(numDigits-1): m = m + str(random.randint(0,9)) # can be 0 now return m ''' Makes two lists of floating-point numbers, where the corresponding numbers' mantissas have the same length. ''' def makeRandomFloatLists(n): # Some hardcoded lists may be useful for debugging and study. # Just make sure the length of the list matches n. # ls1 = [('804', 2), ('66866', 8), ('925', 10), ('8', 0), ('83607', 8)] # ls2 = [('407', 5), ('49118', 9), ('280', 6), ('5', 9), ('50833', 10)] # ls1 = [('804', 2), ('50002', 4), ('66866', 8), ('66866', 8), ('66866', 8)] # ls2 = [('407', 5), ('44078', 4), ('49118', 9), ('49118', 9), ('49118', 9)] # return ls1, ls2 ls1 = [] ls2 = [] for i in range(n): # Determine length of this pair of numbers numDigits = random.randint(1,7) # Determine mantissa for first number m = makeRandomMantissa(numDigits) # Determine exponent for first number e = random.randint(0,10) # Append generated number, as a tuple, onto list ls1.append((m, e)) # Determine mantissa for second number m = makeRandomMantissa(numDigits) # Determine exponent for second number e = random.randint(0,10) # Append generated number, as a tuple, onto list ls2.append((m, e)) return ls1, ls2 ''' --------------------------------------------------------------------------- Functions for Sequential Addition ''' ''' Sequentially adds corresponding floating-point numbers in two lists. ''' def manyFloatAddSeq(): numberOfNumbers = 5 # Make two lists of random floating-point numbers ls1, ls2 = makeRandomFloatLists(numberOfNumbers) print ls1 print ls2 addInSequence(ls1, ls2) ''' Adds two given lists of numbers, sequentially. ''' def addInSequence(ls1, ls2): for i in range(len(ls1)): numDigits = len(ls1[i][0]) m1, e1, m2, e2 = modify(ls1[i][0], ls1[i][1], ls2[i][0], ls2[i][1]) sum, e = add(m1, m2, e1) sum, e = round(sum, e, numDigits) print (sum, e) ''' --------------------------------------------------------------------------- Functions for Parallel Addition ''' ''' Adds to lists of numbers, in parallel. ''' def manyFloatAddPar(): numberOfNumbers = 5 # Make two lists of random floating-point numbers ls1, ls2 = makeRandomFloatLists(numberOfNumbers) print ls1 print ls2 addInParallel(ls1, ls2) ''' Adds two given lists of numbers, in parallel. It does this by setting up a pipeline for doing the floating-point addition. Main idea: pModify does the Modify step, and passes each result to pAdd via qToAdd. pAdd checks qToAdd for a Modify result, does the Add step, and passes the result to pRound via qToRound. pRound checks qToRound for an Add result, does the Round step, and passes the result to qToResult. The main process checks qToResult and prints the result. ''' def addInParallel(ls1, ls2): numberOfNumbers = len(ls1) # Set up the queues to pass results between processes qToAdd = Queue() qToRound = Queue() qToResult = Queue() # Set up the processes pModify = Process(target = modifyAll, args = (ls1, ls2, qToAdd)) pAdd = Process(target = addAll, args = (numberOfNumbers, qToAdd, qToRound)) # Below, ls1 is passed so that we know to what length each resulting mantissa should be rounded pRound = Process(target = roundAll, args = (numberOfNumbers, ls1, qToRound, qToResult)) # Start the processes, and then wait for them to finish. pModify.start() pAdd.start() pRound.start() pModify.join() pAdd.join() pRound.join() # Print the results for i in range(numberOfNumbers): print qToResult.get() ''' The target of the "modify" spawned process. Modify each pair of numbers, and pass the result via the queue. ''' def modifyAll(ls1, ls2, qSendResults): for i in range(len(ls1)): # Put results of the modify, in the form (m1, e1, m2, e2) qSendResults.put(modify(ls1[i][0], ls1[i][1], ls2[i][0], ls2[i][1])) ''' The target of the "add" spawned process. Add each pair of numbers from qArgs, and pass the result via qSendResults. ''' def addAll(numItems, qArgs, qSendResults): # Each item in qArgs is a tuple (m1, e1, m2, e2) for i in range(numItems): # Get results of the modify m1, e1, m2, e2 = qArgs.get() # Put results of the add, in the form (sum, e) qSendResults.put(add(m1, m2, e1)) ''' The target of the "round" spawned process. Round each number from qArgs, and pass the result via qSendResults. ''' def roundAll(numItems, ls1, qArgs, qSendResults): for i in range(numItems): # Get results of the add sum, e = qArgs.get() # Determine how many digits this result mantissa should be goalDigits = len(ls1[i][0]) # Put results of the round, in the form (sum, e) qSendResults.put(round(sum, e, goalDigits)) ''' ------------------------------------------------------------------ Compares time performance of the sequential and parallel algorithms. ''' def compareAddSeqPar(): numberOfNumbers = 5 # Make two lists of random floating-point numbers ls1, ls2 = makeRandomFloatLists(numberOfNumbers) print "Lists to be added:" print ls1 print ls2 print print "Parallel results:" startTime = time.time() addInParallel(ls1, ls2) timeElapsed = time.time() - startTime print "Time:", timeElapsed print print "Sequential results:" startTime = time.time() addInSequence(ls1, ls2) timeElapsed = time.time() - startTime print "Time:", timeElapsed # ======================================================================== #if __name__ == '__main__': # sequentialDS() # parallelShareDS1() # parallelShareDS2() # cluster() # drawCluster() # compareAddSeqPar() # findNearestCompareSeqPar() # findString() # findLargest()