python:programmes:multiprocessing1
Différences
Ci-dessous, les différences entre deux révisions de la page.
Prochaine révision | Révision précédente | ||
python:programmes:multiprocessing1 [2012/02/12 12:42] – créée root | python:programmes:multiprocessing1 [2013/03/12 23:07] (Version actuelle) – root | ||
---|---|---|---|
Ligne 1: | Ligne 1: | ||
- | ====== Multithreading (module | + | Pourquoi il faut utiliser le multiprocessing |
- | ===== Simple code ===== | + | ====== Module multiprocessing ====== |
- | < | + | Méthode avec Pool : |
+ | |||
+ | < | ||
# | # | ||
- | from multiprocessing import Process | ||
- | import os | ||
- | import time | ||
- | def sleeper(name, seconds): | + | from multiprocessing import Pool, cpu_count |
- | print(' | + | from time import |
- | | + | |
- | | + | |
- | time.sleep(seconds) | + | |
- | | + | |
+ | def f(x,y): | ||
+ | sleep(2) | ||
+ | return x*y | ||
+ | |||
+ | def k(x,y): | ||
+ | sleep(2) | ||
+ | return x+y | ||
if __name__ == ' | if __name__ == ' | ||
- | print("in parent process (id %s)" % os.getpid()) | + | result=[] |
- | p = Process(target=sleeper, args=(' | + | pool = Pool(processes = cpu_count()) # start x worker processes |
- | | + | |
- | print("in parent process after child process start" | + | |
- | print(" | + | |
- | p.join() | + | for r in result: |
- | print(" | + | try: |
- | | + | print r.get(timeout=3) # timeout 3 secs |
- | | + | except: |
+ | print "timeout..." | ||
</ | </ | ||
- | ===== Avec les queues ===== | + | Méthode avec Queue (échanges interprocess) : |
+ | Extrait de http:// | ||
+ | <code python multiprocessing2.py> | ||
+ | # | ||
- | < | + | from multiprocessing import Queue, Process, current_process |
- | import | + | |
+ | def worker(tasks, | ||
+ | # Get a tasks | ||
+ | t = tasks.get() | ||
+ | |||
+ | # Do operation | ||
+ | result = t * 2 | ||
+ | |||
+ | # Put the result in results queue | ||
+ | results.put([current_process().name, | ||
+ | |||
+ | if __name__ == ' | ||
+ | # Number of processes | ||
+ | n = 100 | ||
+ | |||
+ | # Create my tasks and results Queue | ||
+ | myTasks = Queue() | ||
+ | myResults = Queue() | ||
+ | |||
+ | # Create n process | ||
+ | Workers = [ Process(target=worker, | ||
+ | |||
+ | # Start processes | ||
+ | for each in Workers: | ||
+ | each.start() | ||
+ | |||
+ | # Create tasks | ||
+ | for each in range(n): | ||
+ | myTasks.put(each) | ||
+ | |||
+ | # Get the results | ||
+ | for each in range(n): | ||
+ | result = myResults.get() | ||
+ | print(" | ||
+ | </code> | ||
+ | |||
+ | ====== Module threading ====== | ||
+ | |||
+ | Exemple de bench IO avec des threads : | ||
+ | |||
+ | <code python threading.py> | ||
+ | # | ||
+ | # | ||
+ | # Copyright (c) 2008-2010 Benjamin Schweizer and others. | ||
+ | # | ||
+ | # Permission to use, copy, modify, and/or distribute this software for any | ||
+ | # purpose with or without fee is hereby granted, provided that the above | ||
+ | # copyright notice and this permission notice appear in all copies. | ||
+ | # | ||
+ | # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | ||
+ | # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | ||
+ | # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | ||
+ | # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | ||
+ | # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | ||
+ | # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | ||
+ | # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||
+ | # | ||
+ | # | ||
+ | # Abstract | ||
+ | # ~~~~~~~~ | ||
+ | # Benchmark disk IOs | ||
+ | # | ||
+ | # Authors | ||
+ | # ~~~~~~~ | ||
+ | # Benjamin Schweizer, http:// | ||
+ | # Uwe Menges | ||
+ | # | ||
+ | # Changes | ||
+ | # ~~~~~~~ | ||
+ | # 2010-08-12, benjamin/ | ||
+ | # 2010-07-22, benjamin: fixed 32bit ioctls on bsd | ||
+ | # 2010-07-21, benjamin: freebsd/osx support, switched to isc license | ||
+ | # 2009-09-16, uwe: changed formatting, fixed last block bug | ||
+ | # 2008-10-16, benjamin: initial release | ||
+ | # | ||
+ | # Todo | ||
+ | # ~~~~ | ||
+ | # - check/add netbsd/ | ||
+ | # | ||
+ | |||
+ | USAGE = """ | ||
+ | |||
+ | usage: | ||
+ | |||
+ | iops [-n|--num_threads threads] [-t|--time time] < | ||
+ | |||
+ | threads := number of concurrent io threads, default 1 | ||
+ | time := time in seconds, default 10 | ||
+ | device | ||
+ | |||
+ | example: | ||
+ | |||
+ | iops /dev/sda | ||
+ | iops -n 8 -t 2 / | ||
+ | |||
+ | """ | ||
+ | |||
+ | import | ||
+ | import fcntl | ||
+ | import array | ||
+ | import struct | ||
+ | import random | ||
import time | import time | ||
+ | import threading | ||
- | class Consumer(multiprocessing.Process): | + | def mediasize(dev): |
- | + | | |
- | | + | |
- | | + | global _mediasizes |
- | | + | if not ' |
- | self.result_queue = result_queue | + | if dev in _mediasizes: |
+ | | ||
- | | + | |
- | proc_name | + | |
- | while True: | + | |
- | next_task = self.task_queue.get() | + | |
- | if next_task is None: | + | |
- | | + | |
- | print '%s: Exiting' | + | |
- | self.task_queue.task_done() | + | |
- | break | + | |
- | print '%s: %s' % (proc_name, next_task) | + | |
- | answer = next_task() | + | |
- | self.task_queue.task_done() | + | |
- | self.result_queue.put(answer) | + | |
- | return | + | |
+ | if sys.platform in [' | ||
+ | # mac os x 10.5+ sysctl from sys/disk.h | ||
+ | DKIOCGETBLOCKSIZE = 0x40046418 | ||
+ | DKIOCGETBLOCKCOUNT = 0x40086419 | ||
- | class Task(object): | + | fh = open(dev, ' |
- | def __init__(self, a, b): | + | buf = array.array(' |
- | | + | |
- | | + | blocksize |
- | def __call__(self): | + | |
- | | + | r = fcntl.ioctl(fh.fileno(), |
- | | + | |
- | def __str__(self): | + | fh.close() |
- | | + | mediasize = blocksize*blockcount |
+ | |||
+ | elif sys.platform in [' | ||
+ | | ||
+ | DIOCGMEDIASIZE = 0x40086481 | ||
+ | |||
+ | fh = open(dev, ' | ||
+ | buf = array.array(' | ||
+ | r = fcntl.ioctl(fh.fileno(), | ||
+ | | ||
+ | fh.close() | ||
+ | |||
+ | else: # linux or compat | ||
+ | # linux 2.6 lseek from fcntl.h | ||
+ | SEEK_SET=0 | ||
+ | SEEK_CUR=1 | ||
+ | SEEK_END=2 | ||
+ | |||
+ | fh = open(dev, ' | ||
+ | fh.seek(0,SEEK_END) | ||
+ | mediasize = fh.tell() | ||
+ | fh.close() | ||
+ | |||
+ | | ||
+ | raise Exception(" | ||
+ | |||
+ | _mediasizes[dev] = mediasize | ||
+ | return mediasize | ||
+ | |||
+ | def greek(value, precision=0, | ||
+ | """ | ||
+ | # Copyright (c) 1999 Martin Pohl, copied from | ||
+ | # http:// | ||
+ | if prefix: | ||
+ | | ||
+ | _abbrevs = [ | ||
+ | (10**15, | ||
+ | (10**12, ' | ||
+ | (10** 9, ' | ||
+ | (10** 6, ' | ||
+ | (10** 3, ' | ||
+ | (1 , ' ') | ||
+ | ] | ||
+ | else: | ||
+ | # Use IEC (2-based) units | ||
+ | _abbrevs = [ | ||
+ | (1<< | ||
+ | (1<< | ||
+ | (1<< | ||
+ | (1<< | ||
+ | (1<< | ||
+ | (1 , ' | ||
+ | ] | ||
+ | |||
+ | for factor, suffix in _abbrevs: | ||
+ | if value >= factor: | ||
+ | break | ||
+ | |||
+ | if precision == 0: | ||
+ | return " | ||
+ | else: | ||
+ | fmt=" | ||
+ | return fmt % (float(value)/ | ||
+ | |||
+ | |||
+ | def iops(dev, blocksize=512, | ||
+ | """ | ||
+ | Perform random 512b aligned reads of blocksize bytes on fh for t seconds | ||
+ | and print a stats line | ||
+ | Returns: IOs/s | ||
+ | """ | ||
+ | |||
+ | fh = open(dev, ' | ||
+ | count = 0 | ||
+ | start = time.time() | ||
+ | while time.time() < start+t: | ||
+ | count += 1 | ||
+ | pos = random.randint(0, | ||
+ | pos &= ~0x1ff | ||
+ | fh.seek(pos) | ||
+ | blockdata = fh.read(blocksize) | ||
+ | end = time.time() | ||
+ | |||
+ | t = end - start | ||
+ | |||
+ | fh.close() | ||
+ | |||
+ | return count/t | ||
if __name__ == ' | if __name__ == ' | ||
- | # Establish communication queues | + | # parse cli |
- | | + | |
- | | + | num_threads = 1 |
- | + | dev = None | |
- | # Start consumers | + | |
- | | + | if len(sys.argv) < 2: |
- | print ' | + | raise SystemExit(USAGE) |
- | | + | |
- | for i in xrange(num_consumers) ] | + | |
- | for w in consumers: | + | arg = sys.argv.pop(0) |
- | w.start() | + | if arg in [' |
- | + | | |
- | # Enqueue jobs | + | elif arg in [' |
- | | + | t = int(sys.argv.pop(0)) |
- | for i in xrange(num_jobs): | + | else: |
- | tasks.put(Task(i, i)) | + | dev = arg |
- | + | ||
- | # Add a poison pill for each consumer | + | |
- | | + | blocksize = 512 |
- | | + | try: |
+ | | ||
+ | _iops = num_threads+1 # initial loop | ||
+ | while _iops > num_threads and blocksize < mediasize(dev): | ||
+ | # threading boilerplate | ||
+ | threads | ||
+ | results = [] | ||
+ | |||
+ | def results_wrap(results, func, *__args, **__kw): | ||
+ | """ | ||
+ | result = func(*__args, | ||
+ | | ||
+ | |||
+ | | ||
+ | _t = threading.Thread(target=results_wrap, | ||
+ | _t.start() | ||
+ | threads.append(_t) | ||
+ | |||
+ | | ||
+ | _t.join() | ||
+ | _iops = sum(results) | ||
+ | |||
+ | | ||
+ | print " %sB blocks: %6.1f IO/s, %sB/s (%sbit/ | ||
+ | greek(bandwidth, | ||
+ | |||
+ | | ||
+ | | ||
+ | | ||
+ | except KeyboardInterrupt: | ||
+ | print " | ||
- | | + | # eof. |
- | tasks.join() | + | |
- | + | ||
- | # Start printing results | + | |
- | while num_jobs: | + | |
- | result = results.get() | + | |
- | print ' | + | |
- | num_jobs -= 1 | + | |
</ | </ |
python/programmes/multiprocessing1.1329050573.txt.gz · Dernière modification : 2012/02/12 12:42 de root