python:programmes:multiprocessing1
Différences
Ci-dessous, les différences entre deux révisions de la page.
| Les deux révisions précédentesRévision précédenteProchaine révision | Révision précédente | ||
| python:programmes:multiprocessing1 [2012/02/12 14:04] – root | python:programmes:multiprocessing1 [2013/03/12 23:07] (Version actuelle) – root | ||
|---|---|---|---|
| Ligne 1: | Ligne 1: | ||
| - | ====== Multithreading (module | + | Pourquoi il faut utiliser le multiprocessing |
| - | Doc : http:// | + | ====== Module |
| - | ===== Simple code ===== | + | Méthode avec Pool : |
| - | < | + | < |
| # | # | ||
| - | from multiprocessing import Process | ||
| - | import os | ||
| - | import time | ||
| - | def sleeper(name, seconds): | + | from multiprocessing import Pool, cpu_count |
| - | print(' | + | from time import |
| - | | + | |
| - | | + | |
| - | time.sleep(seconds) | + | |
| - | | + | |
| + | def f(x,y): | ||
| + | sleep(2) | ||
| + | return x*y | ||
| + | |||
| + | def k(x,y): | ||
| + | sleep(2) | ||
| + | return x+y | ||
| if __name__ == ' | if __name__ == ' | ||
| - | print("in parent process (id %s)" % os.getpid()) | + | result=[] |
| - | p = Process(target=sleeper, args=(' | + | pool = Pool(processes = cpu_count()) # start x worker processes |
| - | | + | |
| - | print("in parent process after child process start" | + | |
| - | print(" | + | |
| - | p.join() | + | for r in result: |
| - | print(" | + | try: |
| - | | + | print r.get(timeout=3) # timeout 3 secs |
| - | | + | except: |
| + | print "timeout..." | ||
| </ | </ | ||
| - | ===== Avec les queues ===== | + | Méthode avec Queue (échanges interprocess) : |
| + | Extrait de http:// | ||
| + | <code python multiprocessing2.py> | ||
| + | # | ||
| - | < | + | from multiprocessing import Queue, Process, current_process |
| - | import | + | |
| + | def worker(tasks, | ||
| + | # Get a tasks | ||
| + | t = tasks.get() | ||
| + | |||
| + | # Do operation | ||
| + | result = t * 2 | ||
| + | |||
| + | # Put the result in results queue | ||
| + | results.put([current_process().name, | ||
| + | |||
| + | if __name__ == ' | ||
| + | # Number of processes | ||
| + | n = 100 | ||
| + | |||
| + | # Create my tasks and results Queue | ||
| + | myTasks = Queue() | ||
| + | myResults = Queue() | ||
| + | |||
| + | # Create n process | ||
| + | Workers = [ Process(target=worker, | ||
| + | |||
| + | # Start processes | ||
| + | for each in Workers: | ||
| + | each.start() | ||
| + | |||
| + | # Create tasks | ||
| + | for each in range(n): | ||
| + | myTasks.put(each) | ||
| + | |||
| + | # Get the results | ||
| + | for each in range(n): | ||
| + | result = myResults.get() | ||
| + | print(" | ||
| + | </code> | ||
| + | |||
| + | ====== Module threading ====== | ||
| + | |||
| + | Exemple de bench IO avec des threads : | ||
| + | |||
| + | <code python threading.py> | ||
| + | # | ||
| + | # | ||
| + | # Copyright (c) 2008-2010 Benjamin Schweizer and others. | ||
| + | # | ||
| + | # Permission to use, copy, modify, and/or distribute this software for any | ||
| + | # purpose with or without fee is hereby granted, provided that the above | ||
| + | # copyright notice and this permission notice appear in all copies. | ||
| + | # | ||
| + | # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | ||
| + | # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | ||
| + | # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | ||
| + | # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | ||
| + | # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | ||
| + | # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | ||
| + | # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||
| + | # | ||
| + | # | ||
| + | # Abstract | ||
| + | # ~~~~~~~~ | ||
| + | # Benchmark disk IOs | ||
| + | # | ||
| + | # Authors | ||
| + | # ~~~~~~~ | ||
| + | # Benjamin Schweizer, http:// | ||
| + | # Uwe Menges | ||
| + | # | ||
| + | # Changes | ||
| + | # ~~~~~~~ | ||
| + | # 2010-08-12, benjamin/ | ||
| + | # 2010-07-22, benjamin: fixed 32bit ioctls on bsd | ||
| + | # 2010-07-21, benjamin: freebsd/osx support, switched to isc license | ||
| + | # 2009-09-16, uwe: changed formatting, fixed last block bug | ||
| + | # 2008-10-16, benjamin: initial release | ||
| + | # | ||
| + | # Todo | ||
| + | # ~~~~ | ||
| + | # - check/add netbsd/ | ||
| + | # | ||
| + | |||
| + | USAGE = """ | ||
| + | |||
| + | usage: | ||
| + | |||
| + | iops [-n|--num_threads threads] [-t|--time time] < | ||
| + | |||
| + | threads := number of concurrent io threads, default 1 | ||
| + | time := time in seconds, default 10 | ||
| + | device | ||
| + | |||
| + | example: | ||
| + | |||
| + | iops /dev/sda | ||
| + | iops -n 8 -t 2 / | ||
| + | |||
| + | """ | ||
| + | |||
| + | import | ||
| + | import fcntl | ||
| + | import array | ||
| + | import struct | ||
| + | import random | ||
| import time | import time | ||
| + | import threading | ||
| - | class Consumer(multiprocessing.Process): | + | def mediasize(dev): |
| - | + | | |
| - | | + | |
| - | | + | global _mediasizes |
| - | | + | if not ' |
| - | self.result_queue = result_queue | + | if dev in _mediasizes: |
| + | | ||
| - | | + | |
| - | proc_name | + | |
| - | while True: | + | |
| - | next_task = self.task_queue.get() | + | |
| - | if next_task is None: | + | |
| - | | + | |
| - | print '%s: Exiting' | + | |
| - | self.task_queue.task_done() | + | |
| - | break | + | |
| - | print '%s: %s' % (proc_name, next_task) | + | |
| - | answer = next_task() | + | |
| - | self.task_queue.task_done() | + | |
| - | self.result_queue.put(answer) | + | |
| - | return | + | |
| + | if sys.platform in [' | ||
| + | # mac os x 10.5+ sysctl from sys/disk.h | ||
| + | DKIOCGETBLOCKSIZE = 0x40046418 | ||
| + | DKIOCGETBLOCKCOUNT = 0x40086419 | ||
| - | class Task(object): | + | fh = open(dev, ' |
| - | def __init__(self, a, b): | + | buf = array.array(' |
| - | | + | |
| - | | + | blocksize |
| - | def __call__(self): | + | |
| - | | + | r = fcntl.ioctl(fh.fileno(), |
| - | | + | |
| - | def __str__(self): | + | fh.close() |
| - | | + | mediasize = blocksize*blockcount |
| + | |||
| + | elif sys.platform in [' | ||
| + | | ||
| + | DIOCGMEDIASIZE = 0x40086481 | ||
| + | |||
| + | fh = open(dev, ' | ||
| + | buf = array.array(' | ||
| + | r = fcntl.ioctl(fh.fileno(), | ||
| + | | ||
| + | fh.close() | ||
| + | |||
| + | else: # linux or compat | ||
| + | # linux 2.6 lseek from fcntl.h | ||
| + | SEEK_SET=0 | ||
| + | SEEK_CUR=1 | ||
| + | SEEK_END=2 | ||
| + | |||
| + | fh = open(dev, ' | ||
| + | fh.seek(0,SEEK_END) | ||
| + | mediasize = fh.tell() | ||
| + | fh.close() | ||
| + | |||
| + | | ||
| + | raise Exception(" | ||
| + | |||
| + | _mediasizes[dev] = mediasize | ||
| + | return mediasize | ||
| + | |||
| + | def greek(value, precision=0, | ||
| + | """ | ||
| + | # Copyright (c) 1999 Martin Pohl, copied from | ||
| + | # http:// | ||
| + | if prefix: | ||
| + | | ||
| + | _abbrevs = [ | ||
| + | (10**15, | ||
| + | (10**12, ' | ||
| + | (10** 9, ' | ||
| + | (10** 6, ' | ||
| + | (10** 3, ' | ||
| + | (1 , ' ') | ||
| + | ] | ||
| + | else: | ||
| + | # Use IEC (2-based) units | ||
| + | _abbrevs = [ | ||
| + | (1<< | ||
| + | (1<< | ||
| + | (1<< | ||
| + | (1<< | ||
| + | (1<< | ||
| + | (1 , ' | ||
| + | ] | ||
| + | |||
| + | for factor, suffix in _abbrevs: | ||
| + | if value >= factor: | ||
| + | break | ||
| + | |||
| + | if precision == 0: | ||
| + | return " | ||
| + | else: | ||
| + | fmt=" | ||
| + | return fmt % (float(value)/ | ||
| + | |||
| + | |||
| + | def iops(dev, blocksize=512, | ||
| + | """ | ||
| + | Perform random 512b aligned reads of blocksize bytes on fh for t seconds | ||
| + | and print a stats line | ||
| + | Returns: IOs/s | ||
| + | """ | ||
| + | |||
| + | fh = open(dev, ' | ||
| + | count = 0 | ||
| + | start = time.time() | ||
| + | while time.time() < start+t: | ||
| + | count += 1 | ||
| + | pos = random.randint(0, | ||
| + | pos &= ~0x1ff | ||
| + | fh.seek(pos) | ||
| + | blockdata = fh.read(blocksize) | ||
| + | end = time.time() | ||
| + | |||
| + | t = end - start | ||
| + | |||
| + | fh.close() | ||
| + | |||
| + | return count/t | ||
| if __name__ == ' | if __name__ == ' | ||
| - | # Establish communication queues | + | # parse cli |
| - | | + | |
| - | | + | num_threads = 1 |
| - | + | dev = None | |
| - | # Start consumers | + | |
| - | | + | if len(sys.argv) < 2: |
| - | print ' | + | raise SystemExit(USAGE) |
| - | | + | |
| - | for i in xrange(num_consumers) ] | + | |
| - | for w in consumers: | + | arg = sys.argv.pop(0) |
| - | w.start() | + | if arg in [' |
| - | + | | |
| - | # Enqueue jobs | + | elif arg in [' |
| - | | + | t = int(sys.argv.pop(0)) |
| - | for i in xrange(num_jobs): | + | else: |
| - | tasks.put(Task(i, i)) | + | dev = arg |
| - | + | ||
| - | # Add a poison pill for each consumer | + | |
| - | | + | blocksize = 512 |
| - | | + | try: |
| + | | ||
| + | _iops = num_threads+1 # initial loop | ||
| + | while _iops > num_threads and blocksize < mediasize(dev): | ||
| + | # threading boilerplate | ||
| + | threads | ||
| + | results = [] | ||
| + | |||
| + | def results_wrap(results, func, *__args, **__kw): | ||
| + | """ | ||
| + | result = func(*__args, | ||
| + | | ||
| + | |||
| + | | ||
| + | _t = threading.Thread(target=results_wrap, | ||
| + | _t.start() | ||
| + | threads.append(_t) | ||
| + | |||
| + | | ||
| + | _t.join() | ||
| + | _iops = sum(results) | ||
| + | |||
| + | | ||
| + | print " %sB blocks: %6.1f IO/s, %sB/s (%sbit/ | ||
| + | greek(bandwidth, | ||
| + | |||
| + | | ||
| + | | ||
| + | | ||
| + | except KeyboardInterrupt: | ||
| + | print " | ||
| - | | + | # eof. |
| - | tasks.join() | + | |
| - | + | ||
| - | # Start printing results | + | |
| - | while num_jobs: | + | |
| - | result = results.get() | + | |
| - | print ' | + | |
| - | num_jobs -= 1 | + | |
| </ | </ | ||
python/programmes/multiprocessing1.1329055460.txt.gz · Dernière modification : de root
