python:programmes:multiprocessing1
Différences
Ci-dessous, les différences entre deux révisions de la page.
Les deux révisions précédentesRévision précédenteProchaine révision | Révision précédente | ||
python:programmes:multiprocessing1 [2012/11/11 17:45] – root | python:programmes:multiprocessing1 [2013/03/12 23:07] (Version actuelle) – root | ||
---|---|---|---|
Ligne 1: | Ligne 1: | ||
- | ====== Multithreading (module | + | Pourquoi il faut utiliser le multiprocessing |
- | < | + | ====== Module multiprocessing ====== |
+ | |||
+ | Méthode avec Pool : | ||
+ | |||
+ | < | ||
# | # | ||
Ligne 26: | Ligne 30: | ||
| | ||
print " | print " | ||
+ | </ | ||
+ | |||
+ | Méthode avec Queue (échanges interprocess) : | ||
+ | Extrait de http:// | ||
+ | <code python multiprocessing2.py> | ||
+ | # | ||
+ | |||
+ | from multiprocessing import Queue, Process, current_process | ||
+ | |||
+ | def worker(tasks, | ||
+ | # Get a tasks | ||
+ | t = tasks.get() | ||
+ | |||
+ | # Do operation | ||
+ | result = t * 2 | ||
+ | |||
+ | # Put the result in results queue | ||
+ | results.put([current_process().name, | ||
+ | |||
+ | if __name__ == ' | ||
+ | # Number of processes | ||
+ | n = 100 | ||
+ | |||
+ | # Create my tasks and results Queue | ||
+ | myTasks = Queue() | ||
+ | myResults = Queue() | ||
+ | |||
+ | # Create n process | ||
+ | Workers = [ Process(target=worker, | ||
+ | |||
+ | # Start processes | ||
+ | for each in Workers: | ||
+ | each.start() | ||
+ | |||
+ | # Create tasks | ||
+ | for each in range(n): | ||
+ | myTasks.put(each) | ||
+ | |||
+ | # Get the results | ||
+ | for each in range(n): | ||
+ | result = myResults.get() | ||
+ | print(" | ||
+ | </ | ||
+ | |||
+ | ====== Module threading ====== | ||
+ | |||
+ | Exemple de bench IO avec des threads : | ||
+ | |||
+ | <code python threading.py> | ||
+ | # | ||
+ | # | ||
+ | # Copyright (c) 2008-2010 Benjamin Schweizer and others. | ||
+ | # | ||
+ | # Permission to use, copy, modify, and/or distribute this software for any | ||
+ | # purpose with or without fee is hereby granted, provided that the above | ||
+ | # copyright notice and this permission notice appear in all copies. | ||
+ | # | ||
+ | # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | ||
+ | # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | ||
+ | # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | ||
+ | # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | ||
+ | # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | ||
+ | # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | ||
+ | # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||
+ | # | ||
+ | # | ||
+ | # Abstract | ||
+ | # ~~~~~~~~ | ||
+ | # Benchmark disk IOs | ||
+ | # | ||
+ | # Authors | ||
+ | # ~~~~~~~ | ||
+ | # Benjamin Schweizer, http:// | ||
+ | # Uwe Menges | ||
+ | # | ||
+ | # Changes | ||
+ | # ~~~~~~~ | ||
+ | # 2010-08-12, benjamin/ | ||
+ | # 2010-07-22, benjamin: fixed 32bit ioctls on bsd | ||
+ | # 2010-07-21, benjamin: freebsd/osx support, switched to isc license | ||
+ | # 2009-09-16, uwe: changed formatting, fixed last block bug | ||
+ | # 2008-10-16, benjamin: initial release | ||
+ | # | ||
+ | # Todo | ||
+ | # ~~~~ | ||
+ | # - check/add netbsd/ | ||
+ | # | ||
+ | |||
+ | USAGE = """ | ||
+ | |||
+ | usage: | ||
+ | |||
+ | iops [-n|--num_threads threads] [-t|--time time] < | ||
+ | |||
+ | threads := number of concurrent io threads, default 1 | ||
+ | time := time in seconds, default 10 | ||
+ | device | ||
+ | |||
+ | example: | ||
+ | |||
+ | iops /dev/sda | ||
+ | iops -n 8 -t 2 /dev/disk0 | ||
+ | |||
+ | """ | ||
+ | |||
+ | import sys | ||
+ | import fcntl | ||
+ | import array | ||
+ | import struct | ||
+ | import random | ||
+ | import time | ||
+ | import threading | ||
+ | |||
+ | def mediasize(dev): | ||
+ | """ | ||
+ | # caching | ||
+ | global _mediasizes | ||
+ | if not ' | ||
+ | if dev in _mediasizes: | ||
+ | return _mediasizes[dev] | ||
+ | |||
+ | mediasize = 0 # bytes | ||
+ | |||
+ | if sys.platform in [' | ||
+ | # mac os x 10.5+ sysctl from sys/disk.h | ||
+ | DKIOCGETBLOCKSIZE = 0x40046418 | ||
+ | DKIOCGETBLOCKCOUNT = 0x40086419 | ||
+ | |||
+ | fh = open(dev, ' | ||
+ | buf = array.array(' | ||
+ | r = fcntl.ioctl(fh.fileno(), | ||
+ | blocksize = struct.unpack(' | ||
+ | buf = array.array(' | ||
+ | r = fcntl.ioctl(fh.fileno(), | ||
+ | blockcount = struct.unpack(' | ||
+ | fh.close() | ||
+ | mediasize = blocksize*blockcount | ||
+ | |||
+ | elif sys.platform in [' | ||
+ | # freebsd 8 sysctl from sys/disk.h | ||
+ | DIOCGMEDIASIZE = 0x40086481 | ||
+ | |||
+ | fh = open(dev, ' | ||
+ | buf = array.array(' | ||
+ | r = fcntl.ioctl(fh.fileno(), | ||
+ | mediasize = struct.unpack(' | ||
+ | fh.close() | ||
+ | |||
+ | else: # linux or compat | ||
+ | # linux 2.6 lseek from fcntl.h | ||
+ | SEEK_SET=0 | ||
+ | SEEK_CUR=1 | ||
+ | SEEK_END=2 | ||
+ | |||
+ | fh = open(dev, ' | ||
+ | fh.seek(0, | ||
+ | mediasize = fh.tell() | ||
+ | fh.close() | ||
+ | |||
+ | if not mediasize: | ||
+ | raise Exception(" | ||
+ | |||
+ | _mediasizes[dev] = mediasize | ||
+ | return mediasize | ||
+ | |||
+ | def greek(value, | ||
+ | """ | ||
+ | # Copyright (c) 1999 Martin Pohl, copied from | ||
+ | # http:// | ||
+ | if prefix: | ||
+ | # Use SI (10-based) units | ||
+ | _abbrevs = [ | ||
+ | (10**15, ' | ||
+ | (10**12, ' | ||
+ | (10** 9, ' | ||
+ | (10** 6, ' | ||
+ | (10** 3, ' | ||
+ | (1 , ' ') | ||
+ | ] | ||
+ | else: | ||
+ | # Use IEC (2-based) units | ||
+ | _abbrevs = [ | ||
+ | (1<< | ||
+ | (1<< | ||
+ | (1<< | ||
+ | (1<< | ||
+ | (1<< | ||
+ | (1 , ' | ||
+ | ] | ||
+ | |||
+ | for factor, suffix in _abbrevs: | ||
+ | if value >= factor: | ||
+ | break | ||
+ | |||
+ | if precision == 0: | ||
+ | return "%3.d %s" % (int(value/ | ||
+ | else: | ||
+ | fmt=" | ||
+ | return fmt % (float(value)/ | ||
+ | |||
+ | |||
+ | def iops(dev, blocksize=512, | ||
+ | """ | ||
+ | Perform random 512b aligned reads of blocksize bytes on fh for t seconds | ||
+ | and print a stats line | ||
+ | Returns: IOs/s | ||
+ | """ | ||
+ | |||
+ | fh = open(dev, ' | ||
+ | count = 0 | ||
+ | start = time.time() | ||
+ | while time.time() < start+t: | ||
+ | count += 1 | ||
+ | pos = random.randint(0, | ||
+ | pos &= ~0x1ff | ||
+ | fh.seek(pos) | ||
+ | blockdata = fh.read(blocksize) | ||
+ | end = time.time() | ||
+ | |||
+ | t = end - start | ||
+ | |||
+ | fh.close() | ||
+ | |||
+ | return count/t | ||
+ | |||
+ | |||
+ | if __name__ == ' | ||
+ | # parse cli | ||
+ | t = 10 | ||
+ | num_threads = 1 | ||
+ | dev = None | ||
+ | |||
+ | if len(sys.argv) < 2: | ||
+ | raise SystemExit(USAGE) | ||
+ | |||
+ | while sys.argv: | ||
+ | arg = sys.argv.pop(0) | ||
+ | if arg in [' | ||
+ | num_threads = int(sys.argv.pop(0)) | ||
+ | elif arg in [' | ||
+ | t = int(sys.argv.pop(0)) | ||
+ | else: | ||
+ | dev = arg | ||
+ | |||
+ | # run benchmark | ||
+ | blocksize = 512 | ||
+ | try: | ||
+ | print "%s, %sB, %d threads:" | ||
+ | _iops = num_threads+1 # initial loop | ||
+ | while _iops > num_threads and blocksize < mediasize(dev): | ||
+ | # threading boilerplate | ||
+ | threads = [] | ||
+ | results = [] | ||
+ | | ||
+ | def results_wrap(results, | ||
+ | """ | ||
+ | result = func(*__args, | ||
+ | results.append(result) | ||
+ | |||
+ | for i in range(0, num_threads): | ||
+ | _t = threading.Thread(target=results_wrap, | ||
+ | _t.start() | ||
+ | threads.append(_t) | ||
+ | |||
+ | for _t in threads: | ||
+ | _t.join() | ||
+ | _iops = sum(results) | ||
+ | |||
+ | bandwidth = int(blocksize*_iops) | ||
+ | print " %sB blocks: %6.1f IO/s, %sB/s (%sbit/ | ||
+ | greek(bandwidth, | ||
+ | |||
+ | blocksize *= 2 | ||
+ | except IOError, (err_no, err_str): | ||
+ | raise SystemExit(err_str) | ||
+ | except KeyboardInterrupt: | ||
+ | print " | ||
+ | |||
+ | # eof. | ||
</ | </ |
python/programmes/multiprocessing1.1352655902.txt.gz · Dernière modification : 2012/11/11 17:45 de root