Outils pour utilisateurs

Outils du site


python:programmes:multiprocessing1

Différences

Ci-dessous, les différences entre deux révisions de la page.

Lien vers cette vue comparative

Les deux révisions précédentesRévision précédente
Prochaine révision
Révision précédente
python:programmes:multiprocessing1 [2012/02/12 14:04] rootpython:programmes:multiprocessing1 [2013/03/12 23:07] (Version actuelle) root
Ligne 1: Ligne 1:
-====== Multithreading (module multiprocessing) ======+Pourquoi il faut utiliser le multiprocessing et non le threading sous python : http://blog.unix4fun.net/post/2007/09/17/python-le-GIL-pourquoi-et-comment-faire
  
-Doc : http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html+====== Module multiprocessing ======
  
-===== Simple code =====+Méthode avec Pool :
  
-<code>+<code python multiprocessing1.py>
 #!/usr/bin/env python #!/usr/bin/env python
-from multiprocessing import Process 
-import os 
-import time 
  
-def sleeper(nameseconds): +from multiprocessing import Poolcpu_count 
-   print('starting child process with id: ', os.getpid()) +from time import sleep
-   print('parent process:', os.getppid()) +
-   print('sleeping for %s ' % seconds) +
-   time.sleep(seconds) +
-   print("Done sleeping")+
  
 +def f(x,y):
 +    sleep(2)
 +    return x*y
 +
 +def k(x,y):
 +    sleep(2)
 +    return x+y
  
 if __name__ == '__main__': if __name__ == '__main__':
-   print("in parent process (id %s)" % os.getpid()) +    result=[] 
-   p = Process(target=sleeperargs=('bob'5)) +    pool = Pool(processes = cpu_count())  # start x worker processes 
-   p.start() +    result.append(pool.apply_async(f, (5,10,)))  # evaluate "f(10)asynchronously 
-   print("in parent process after child process start") +    result.append(pool.apply_async(k, (10,10,)))  # evaluate "f(10)asynchronously 
-   print("parent process about to join child process"+ 
-   p.join() +    for r in result: 
-   print("in parent process after child process join") +         try: 
-   print("parent process exiting with id ", os.getpid()) +             print r.get(timeout=3                    # timeout 3 secs 
-   print("The parent's parent process:", os.getppid())+         except: 
 +             print "timeout..."
 </code> </code>
  
-===== Avec les queues =====+Méthode avec Queue (échanges interprocess) :  
 +Extrait de http://download.velannes.com/Python/processing.pdf 
 +<code python multiprocessing2.py> 
 +#!/usr/bin/env python
  
-<code> +from multiprocessing import Queue, Process, current_process 
-import multiprocessing+ 
 +def worker(tasks, results): 
 + # Get a tasks 
 +        t = tasks.get() 
 + 
 + # Do operation 
 + result = t * 2 
 + 
 + # Put the result in results queue 
 + results.put([current_process().name, t, "*", 2, result]) 
 + 
 +if __name__ == '__main__': 
 + # Number of processes 
 + n = 100 
 + 
 + # Create my tasks and results Queue 
 + myTasks = Queue() 
 + myResults = Queue() 
 + 
 + # Create n process 
 + Workers = [ Process(target=worker, args=(myTasks, myResults)) for i in range(n) ] 
 + 
 + # Start processes 
 + for each in Workers: 
 + each.start() 
 + 
 + # Create tasks 
 + for each in range(n): 
 + myTasks.put(each) 
 + 
 + # Get the results 
 + for each in range(n): 
 + result = myResults.get() 
 + print("Res : %s" % result) 
 +</code> 
 + 
 +====== Module threading ====== 
 + 
 +Exemple de bench IO avec des threads : 
 + 
 +<code python threading.py> 
 +#!/usr/bin/env python 
 +
 +# Copyright (c) 2008-2010 Benjamin Schweizer and others. 
 +
 +# Permission to use, copy, modify, and/or distribute this software for any 
 +# purpose with or without fee is hereby granted, provided that the above 
 +# copyright notice and this permission notice appear in all copies. 
 +
 +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 
 +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 
 +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 
 +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 
 +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 
 +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 
 +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 
 +
 +
 +# Abstract 
 +# ~~~~~~~~ 
 +# Benchmark disk IOs 
 +
 +# Authors 
 +# ~~~~~~~ 
 +# Benjamin Schweizer, http://benjamin-schweizer.de/contact 
 +# Uwe Menges 
 +
 +# Changes 
 +# ~~~~~~~ 
 +# 2010-08-12, benjamin/uwe: added multi-threading support 
 +# 2010-07-22, benjamin: fixed 32bit ioctls on bsd 
 +# 2010-07-21, benjamin: freebsd/osx support, switched to isc license 
 +# 2009-09-16, uwe: changed formatting, fixed last block bug 
 +# 2008-10-16, benjamin: initial release 
 +
 +# Todo 
 +# ~~~~ 
 +# - check/add netbsd/openbsd mediasize ioctls 
 +
 + 
 +USAGE = """Copyright (c) 2008-2010 Benjamin Schweizer and others. 
 + 
 +usage: 
 + 
 +    iops [-n|--num_threads threads] [-t|--time time] <device> 
 + 
 +    threads := number of concurrent io threads, default 1 
 +    time    := time in seconds, default 10 
 +    device  := some block device, like /dev/sda 
 + 
 +example: 
 + 
 +    iops /dev/sda 
 +    iops -n 8 -t 2 /dev/disk0 
 + 
 +""" 
 + 
 +import sys 
 +import fcntl 
 +import array 
 +import struct 
 +import random
 import time import time
 +import threading
  
-class Consumer(multiprocessing.Process): +def mediasize(dev): 
-     +    """report the media size for a device""" 
-    def __init__(self, task_queue, result_queue): +    # caching 
-        multiprocessing.Process.__init__(self) +    global _mediasizes 
-        self.task_queue = task_queue +    if not '_mediasizes' in globals(): _mediasizes = {} 
-        self.result_queue = result_queue+    if dev in _mediasizes: 
 +        return _mediasizes[dev]
  
-    def run(self): +    mediasize bytes
-        proc_name self.name +
-        while True: +
-            next_task = self.task_queue.get() +
-            if next_task is None: +
-                Poison pill means shutdown +
-                print '%s: Exiting' % proc_name +
-                self.task_queue.task_done() +
-                break +
-            print '%s: %s' % (proc_name, next_task) +
-            answer = next_task() +
-            self.task_queue.task_done() +
-            self.result_queue.put(answer) +
-        return+
  
 +    if sys.platform in ['darwin']:
 +        # mac os x 10.5+ sysctl from sys/disk.h
 +        DKIOCGETBLOCKSIZE = 0x40046418
 +        DKIOCGETBLOCKCOUNT = 0x40086419
  
-class Task(object): +        fh = open(dev, 'r'
-    def __init__(selfab): +        buf = array.array('B'range(0,4))  # uint32 
-        self.a +        r = fcntl.ioctl(fh.fileno(), DKIOCGETBLOCKSIZE, buf, 1) 
-        self.b +        blocksize struct.unpack('I', buf)[0] 
-    def __call__(self): +        buf = array.array('B', range(0,8))  # uint64 
-        time.sleep(0.1) # pretend to take some time to do the work +        r fcntl.ioctl(fh.fileno(), DKIOCGETBLOCKCOUNT, buf, 1) 
-        return '%s * %s %s(self.aself.bself.a * self.b+        blockcount = struct.unpack('Q', buf)[0] 
-    def __str__(self): +        fh.close() 
-        return '%s * %s% (self.a, self.b)+        mediasize = blocksize*blockcount 
 + 
 +    elif sys.platform in ['freebsd8']
 +        # freebsd 8 sysctl from sys/disk.
 +        DIOCGMEDIASIZE = 0x40086481 
 + 
 +        fh = open(dev, 'r'
 +        buf = array.array('B', range(0,8))  # off_t / int64 
 +        r = fcntl.ioctl(fh.fileno(), DIOCGMEDIASIZE, buf, 1) 
 +        mediasize struct.unpack('q', buf)[0] 
 +        fh.close(
 + 
 +    else: # linux or compat 
 +        # linux 2.6 lseek from fcntl.h 
 +        SEEK_SET=0 
 +        SEEK_CUR=1 
 +        SEEK_END=2 
 + 
 +        fh = open(dev'r'
 +        fh.seek(0,SEEK_END) 
 +        mediasize = fh.tell() 
 +        fh.close() 
 + 
 +    if not mediasize: 
 +        raise Exception("cannot determine media size"
 + 
 +    _mediasizes[dev] = mediasize 
 +    return mediasize 
 + 
 +def greek(value, precision=0, prefix=None)
 +    """Return a string representing the IEC or SI suffix of a value""" 
 +    # Copyright (c) 1999 Martin Pohl, copied from 
 +    # http://mail.python.org/pipermail/python-list/1999-December/018519.html 
 +    if prefix
 +        # Use SI (10-based) units 
 +        _abbrevs = [ 
 +            (10**15, 'P'), 
 +            (10**12, 'T'), 
 +            (10** 9, 'G'), 
 +            (10** 6, 'M'), 
 +            (10** 3, 'k'), 
 +            (1     , ' ') 
 +        ] 
 +    else: 
 +        # Use IEC (2-based) units 
 +        _abbrevs = [ 
 +            (1<<50L, 'Pi'), 
 +            (1<<40L, 'Ti'), 
 +            (1<<30L, 'Gi'), 
 +            (1<<20L, 'Mi'), 
 +            (1<<10L, 'Ki'), 
 +            (1     , '  ') 
 +        ] 
 + 
 +    for factor, suffix in _abbrevs: 
 +        if value >= factor: 
 +            break 
 + 
 +    if precision == 0: 
 +        return "%3.d %s% (int(value/factor), suffix) 
 +    else: 
 +        fmt="%%%d.%df %%s" % (4+precision, precision) 
 +        return fmt % (float(value)/factor, suffix) 
 + 
 + 
 +def iops(dev, blocksize=512, t=10): 
 +    """measure input/output operations per second 
 +    Perform random 512b aligned reads of blocksize bytes on fh for t seconds 
 +    and print stats line 
 +    Returns: IOs/s 
 +    """ 
 + 
 +    fh = open(dev'r'
 +    count = 0 
 +    start = time.time() 
 +    while time.time() < start+t: 
 +        count += 1 
 +        pos = random.randint(0, mediasize(dev) - blocksize) # need at least one block left 
 +        pos &= ~0x1ff   # freebsd8: pos needs 512B sector alignment 
 +        fh.seek(pos) 
 +        blockdata = fh.read(blocksize) 
 +    end = time.time() 
 + 
 +    t = end - start 
 + 
 +    fh.close() 
 + 
 +    return count/t
  
  
 if __name__ == '__main__': if __name__ == '__main__':
-    # Establish communication queues +    # parse cli 
-    tasks multiprocessing.JoinableQueue() +    10 
-    results multiprocessing.Queue() +    num_threads = 1 
-     +    dev = None 
-    # Start consumers + 
-    num_consumers multiprocessing.cpu_count() * 2 +    if len(sys.argv) < 2: 
-    print 'Creating %d consumers' num_consumers +        raise SystemExit(USAGE) 
-    consumers = [ Consumer(tasks, results) + 
-                  for i in xrange(num_consumers] +    while sys.argv: 
-    for in consumers+        arg sys.argv.pop(0
-        w.start() +        if arg in ['-n', '--num-threads']: 
-     +            num_threads = int(sys.argv.pop(0)) 
-    # Enqueue jobs +        elif arg in ['-t', '--time']: 
-    num_jobs 10 +            t int(sys.argv.pop(0)) 
-    for i in xrange(num_jobs): +        else: 
-        tasks.put(Task(ii)) +            dev = arg 
-     + 
-    # Add a poison pill for each consumer +    # run benchmark 
-    for i in xrange(num_consumers): +    blocksize = 512 
-        tasks.put(None)+    try: 
 +        print "%s, %sB, %d threads:" (dev, greek(mediasize(dev), 2, 'si'), num_threads) 
 +        _iops = num_threads+1 # initial loop 
 +        while _iops > num_threads and blocksize < mediasize(dev): 
 +            # threading boilerplate 
 +            threads = [
 +            results = [] 
 +             
 +            def results_wrap(resultsfunc, *__args, **__kw): 
 +                """collect return values from func""" 
 +                result = func(*__args, **__kw) 
 +                results.append(result
 + 
 +            for i in range(0, num_threads): 
 +                _t = threading.Thread(target=results_wrap, args=(results, iops, dev, blocksize, t,)) 
 +                _t.start() 
 +                threads.append(_t) 
 + 
 +            for _t in threads
 +                _t.join() 
 +            _iops = sum(results) 
 + 
 +            bandwidth int(blocksize*_iops) 
 +            print " %sB blocks%6.1f IO/s, %sB/s (%sbit/s)"(greek(blocksize)_iops, 
 +                greek(bandwidth, 1), greek(8*bandwidth, 1, 'si')) 
 + 
 +            blocksize *= 2 
 +    except IOError, (err_no, err_str): 
 +        raise SystemExit(err_str) 
 +    except KeyboardInterrupt: 
 +        print "caught ctrl-c, bye."
  
-    Wait for all of the tasks to finish +eof.
-    tasks.join() +
-     +
-    # Start printing results +
-    while num_jobs: +
-        result = results.get() +
-        print 'Result:', result +
-        num_jobs -= 1+
 </code> </code>
python/programmes/multiprocessing1.1329055460.txt.gz · Dernière modification : 2012/02/12 14:04 de root