python:programmes:multiprocessing1
Pourquoi il faut utiliser le multiprocessing et non le threading sous python : http://blog.unix4fun.net/post/2007/09/17/python-le-GIL-pourquoi-et-comment-faire
Module multiprocessing
Méthode avec Pool :
- multiprocessing1.py
#!/usr/bin/env python from multiprocessing import Pool, cpu_count from time import sleep def f(x,y): sleep(2) return x*y def k(x,y): sleep(2) return x+y if __name__ == '__main__': result=[] pool = Pool(processes = cpu_count()) # start x worker processes result.append(pool.apply_async(f, (5,10,))) # evaluate "f(10)" asynchronously result.append(pool.apply_async(k, (10,10,))) # evaluate "f(10)" asynchronously for r in result: try: print r.get(timeout=3) # timeout 3 secs except: print "timeout..."
Méthode avec Queue (échanges interprocess) : Extrait de http://download.velannes.com/Python/processing.pdf
- multiprocessing2.py
#!/usr/bin/env python from multiprocessing import Queue, Process, current_process def worker(tasks, results): # Get a tasks t = tasks.get() # Do operation result = t * 2 # Put the result in results queue results.put([current_process().name, t, "*", 2, result]) if __name__ == '__main__': # Number of processes n = 100 # Create my tasks and results Queue myTasks = Queue() myResults = Queue() # Create n process Workers = [ Process(target=worker, args=(myTasks, myResults)) for i in range(n) ] # Start processes for each in Workers: each.start() # Create tasks for each in range(n): myTasks.put(each) # Get the results for each in range(n): result = myResults.get() print("Res : %s" % result)
Module threading
Exemple de bench IO avec des threads :
- threading.py
#!/usr/bin/env python # # Copyright (c) 2008-2010 Benjamin Schweizer and others. # # Permission to use, copy, modify, and/or distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # # # Abstract # ~~~~~~~~ # Benchmark disk IOs # # Authors # ~~~~~~~ # Benjamin Schweizer, http://benjamin-schweizer.de/contact # Uwe Menges # # Changes # ~~~~~~~ # 2010-08-12, benjamin/uwe: added multi-threading support # 2010-07-22, benjamin: fixed 32bit ioctls on bsd # 2010-07-21, benjamin: freebsd/osx support, switched to isc license # 2009-09-16, uwe: changed formatting, fixed last block bug # 2008-10-16, benjamin: initial release # # Todo # ~~~~ # - check/add netbsd/openbsd mediasize ioctls # USAGE = """Copyright (c) 2008-2010 Benjamin Schweizer and others. usage: iops [-n|--num_threads threads] [-t|--time time] <device> threads := number of concurrent io threads, default 1 time := time in seconds, default 10 device := some block device, like /dev/sda example: iops /dev/sda iops -n 8 -t 2 /dev/disk0 """ import sys import fcntl import array import struct import random import time import threading def mediasize(dev): """report the media size for a device""" # caching global _mediasizes if not '_mediasizes' in globals(): _mediasizes = {} if dev in _mediasizes: return _mediasizes[dev] mediasize = 0 # bytes if sys.platform in ['darwin']: # mac os x 10.5+ sysctl from sys/disk.h DKIOCGETBLOCKSIZE = 0x40046418 DKIOCGETBLOCKCOUNT = 0x40086419 fh = open(dev, 'r') buf = array.array('B', range(0,4)) # uint32 r = fcntl.ioctl(fh.fileno(), DKIOCGETBLOCKSIZE, buf, 1) blocksize = struct.unpack('I', buf)[0] buf = array.array('B', range(0,8)) # uint64 r = fcntl.ioctl(fh.fileno(), DKIOCGETBLOCKCOUNT, buf, 1) blockcount = struct.unpack('Q', buf)[0] fh.close() mediasize = blocksize*blockcount elif sys.platform in ['freebsd8']: # freebsd 8 sysctl from sys/disk.h DIOCGMEDIASIZE = 0x40086481 fh = open(dev, 'r') buf = array.array('B', range(0,8)) # off_t / int64 r = fcntl.ioctl(fh.fileno(), DIOCGMEDIASIZE, buf, 1) mediasize = struct.unpack('q', buf)[0] fh.close() else: # linux or compat # linux 2.6 lseek from fcntl.h SEEK_SET=0 SEEK_CUR=1 SEEK_END=2 fh = open(dev, 'r') fh.seek(0,SEEK_END) mediasize = fh.tell() fh.close() if not mediasize: raise Exception("cannot determine media size") _mediasizes[dev] = mediasize return mediasize def greek(value, precision=0, prefix=None): """Return a string representing the IEC or SI suffix of a value""" # Copyright (c) 1999 Martin Pohl, copied from # http://mail.python.org/pipermail/python-list/1999-December/018519.html if prefix: # Use SI (10-based) units _abbrevs = [ (10**15, 'P'), (10**12, 'T'), (10** 9, 'G'), (10** 6, 'M'), (10** 3, 'k'), (1 , ' ') ] else: # Use IEC (2-based) units _abbrevs = [ (1<<50L, 'Pi'), (1<<40L, 'Ti'), (1<<30L, 'Gi'), (1<<20L, 'Mi'), (1<<10L, 'Ki'), (1 , ' ') ] for factor, suffix in _abbrevs: if value >= factor: break if precision == 0: return "%3.d %s" % (int(value/factor), suffix) else: fmt="%%%d.%df %%s" % (4+precision, precision) return fmt % (float(value)/factor, suffix) def iops(dev, blocksize=512, t=10): """measure input/output operations per second Perform random 512b aligned reads of blocksize bytes on fh for t seconds and print a stats line Returns: IOs/s """ fh = open(dev, 'r') count = 0 start = time.time() while time.time() < start+t: count += 1 pos = random.randint(0, mediasize(dev) - blocksize) # need at least one block left pos &= ~0x1ff # freebsd8: pos needs 512B sector alignment fh.seek(pos) blockdata = fh.read(blocksize) end = time.time() t = end - start fh.close() return count/t if __name__ == '__main__': # parse cli t = 10 num_threads = 1 dev = None if len(sys.argv) < 2: raise SystemExit(USAGE) while sys.argv: arg = sys.argv.pop(0) if arg in ['-n', '--num-threads']: num_threads = int(sys.argv.pop(0)) elif arg in ['-t', '--time']: t = int(sys.argv.pop(0)) else: dev = arg # run benchmark blocksize = 512 try: print "%s, %sB, %d threads:" % (dev, greek(mediasize(dev), 2, 'si'), num_threads) _iops = num_threads+1 # initial loop while _iops > num_threads and blocksize < mediasize(dev): # threading boilerplate threads = [] results = [] def results_wrap(results, func, *__args, **__kw): """collect return values from func""" result = func(*__args, **__kw) results.append(result) for i in range(0, num_threads): _t = threading.Thread(target=results_wrap, args=(results, iops, dev, blocksize, t,)) _t.start() threads.append(_t) for _t in threads: _t.join() _iops = sum(results) bandwidth = int(blocksize*_iops) print " %sB blocks: %6.1f IO/s, %sB/s (%sbit/s)" % (greek(blocksize), _iops, greek(bandwidth, 1), greek(8*bandwidth, 1, 'si')) blocksize *= 2 except IOError, (err_no, err_str): raise SystemExit(err_str) except KeyboardInterrupt: print "caught ctrl-c, bye." # eof.
python/programmes/multiprocessing1.txt · Dernière modification : 2013/03/12 23:07 de root