Outils pour utilisateurs

Outils du site


python:programmes:multiprocessing1

Différences

Ci-dessous, les différences entre deux révisions de la page.

Lien vers cette vue comparative

Les deux révisions précédentesRévision précédente
Prochaine révision
Révision précédente
python:programmes:multiprocessing1 [2012/11/11 17:45] rootpython:programmes:multiprocessing1 [2013/03/12 23:07] (Version actuelle) root
Ligne 1: Ligne 1:
-====== Multithreading (module multiprocessing) ======+Pourquoi il faut utiliser le multiprocessing et non le threading sous python : http://blog.unix4fun.net/post/2007/09/17/python-le-GIL-pourquoi-et-comment-faire
  
-<code>+====== Module multiprocessing ====== 
 + 
 +Méthode avec Pool : 
 + 
 +<code python multiprocessing1.py>
 #!/usr/bin/env python #!/usr/bin/env python
  
Ligne 26: Ligne 30:
          except:          except:
              print "timeout..."              print "timeout..."
 +</code>
 +
 +Méthode avec Queue (échanges interprocess) : 
 +Extrait de http://download.velannes.com/Python/processing.pdf
 +<code python multiprocessing2.py>
 +#!/usr/bin/env python
 +
 +from multiprocessing import Queue, Process, current_process
 +
 +def worker(tasks, results):
 + # Get a tasks
 +        t = tasks.get()
 +
 + # Do operation
 + result = t * 2
 +
 + # Put the result in results queue
 + results.put([current_process().name, t, "*", 2, result])
 +
 +if __name__ == '__main__':
 + # Number of processes
 + n = 100
 +
 + # Create my tasks and results Queue
 + myTasks = Queue()
 + myResults = Queue()
 +
 + # Create n process
 + Workers = [ Process(target=worker, args=(myTasks, myResults)) for i in range(n) ]
 +
 + # Start processes
 + for each in Workers:
 + each.start()
 +
 + # Create tasks
 + for each in range(n):
 + myTasks.put(each)
 +
 + # Get the results
 + for each in range(n):
 + result = myResults.get()
 + print("Res : %s" % result)
 +</code>
 +
 +====== Module threading ======
 +
 +Exemple de bench IO avec des threads :
 +
 +<code python threading.py>
 +#!/usr/bin/env python
 +#
 +# Copyright (c) 2008-2010 Benjamin Schweizer and others.
 +#
 +# Permission to use, copy, modify, and/or distribute this software for any
 +# purpose with or without fee is hereby granted, provided that the above
 +# copyright notice and this permission notice appear in all copies.
 +#
 +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 +#
 +#
 +# Abstract
 +# ~~~~~~~~
 +# Benchmark disk IOs
 +#
 +# Authors
 +# ~~~~~~~
 +# Benjamin Schweizer, http://benjamin-schweizer.de/contact
 +# Uwe Menges
 +#
 +# Changes
 +# ~~~~~~~
 +# 2010-08-12, benjamin/uwe: added multi-threading support
 +# 2010-07-22, benjamin: fixed 32bit ioctls on bsd
 +# 2010-07-21, benjamin: freebsd/osx support, switched to isc license
 +# 2009-09-16, uwe: changed formatting, fixed last block bug
 +# 2008-10-16, benjamin: initial release
 +#
 +# Todo
 +# ~~~~
 +# - check/add netbsd/openbsd mediasize ioctls
 +#
 +
 +USAGE = """Copyright (c) 2008-2010 Benjamin Schweizer and others.
 +
 +usage:
 +
 +    iops [-n|--num_threads threads] [-t|--time time] <device>
 +
 +    threads := number of concurrent io threads, default 1
 +    time    := time in seconds, default 10
 +    device  := some block device, like /dev/sda
 +
 +example:
 +
 +    iops /dev/sda
 +    iops -n 8 -t 2 /dev/disk0
 +
 +"""
 +
 +import sys
 +import fcntl
 +import array
 +import struct
 +import random
 +import time
 +import threading
 +
 +def mediasize(dev):
 +    """report the media size for a device"""
 +    # caching
 +    global _mediasizes
 +    if not '_mediasizes' in globals(): _mediasizes = {}
 +    if dev in _mediasizes:
 +        return _mediasizes[dev]
 +
 +    mediasize = 0 # bytes
 +
 +    if sys.platform in ['darwin']:
 +        # mac os x 10.5+ sysctl from sys/disk.h
 +        DKIOCGETBLOCKSIZE = 0x40046418
 +        DKIOCGETBLOCKCOUNT = 0x40086419
 +
 +        fh = open(dev, 'r')
 +        buf = array.array('B', range(0,4))  # uint32
 +        r = fcntl.ioctl(fh.fileno(), DKIOCGETBLOCKSIZE, buf, 1)
 +        blocksize = struct.unpack('I', buf)[0]
 +        buf = array.array('B', range(0,8))  # uint64
 +        r = fcntl.ioctl(fh.fileno(), DKIOCGETBLOCKCOUNT, buf, 1)
 +        blockcount = struct.unpack('Q', buf)[0]
 +        fh.close()
 +        mediasize = blocksize*blockcount
 +
 +    elif sys.platform in ['freebsd8']:
 +        # freebsd 8 sysctl from sys/disk.h
 +        DIOCGMEDIASIZE = 0x40086481
 +
 +        fh = open(dev, 'r')
 +        buf = array.array('B', range(0,8))  # off_t / int64
 +        r = fcntl.ioctl(fh.fileno(), DIOCGMEDIASIZE, buf, 1)
 +        mediasize = struct.unpack('q', buf)[0]
 +        fh.close()
 +
 +    else: # linux or compat
 +        # linux 2.6 lseek from fcntl.h
 +        SEEK_SET=0
 +        SEEK_CUR=1
 +        SEEK_END=2
 +
 +        fh = open(dev, 'r')
 +        fh.seek(0,SEEK_END)
 +        mediasize = fh.tell()
 +        fh.close()
 +
 +    if not mediasize:
 +        raise Exception("cannot determine media size")
 +
 +    _mediasizes[dev] = mediasize
 +    return mediasize
 +
 +def greek(value, precision=0, prefix=None):
 +    """Return a string representing the IEC or SI suffix of a value"""
 +    # Copyright (c) 1999 Martin Pohl, copied from
 +    # http://mail.python.org/pipermail/python-list/1999-December/018519.html
 +    if prefix:
 +        # Use SI (10-based) units
 +        _abbrevs = [
 +            (10**15, 'P'),
 +            (10**12, 'T'),
 +            (10** 9, 'G'),
 +            (10** 6, 'M'),
 +            (10** 3, 'k'),
 +            (1     , ' ')
 +        ]
 +    else:
 +        # Use IEC (2-based) units
 +        _abbrevs = [
 +            (1<<50L, 'Pi'),
 +            (1<<40L, 'Ti'),
 +            (1<<30L, 'Gi'),
 +            (1<<20L, 'Mi'),
 +            (1<<10L, 'Ki'),
 +            (1     , '  ')
 +        ]
 +
 +    for factor, suffix in _abbrevs:
 +        if value >= factor:
 +            break
 +
 +    if precision == 0:
 +        return "%3.d %s" % (int(value/factor), suffix)
 +    else:
 +        fmt="%%%d.%df %%s" % (4+precision, precision)
 +        return fmt % (float(value)/factor, suffix)
 +
 +
 +def iops(dev, blocksize=512, t=10):
 +    """measure input/output operations per second
 +    Perform random 512b aligned reads of blocksize bytes on fh for t seconds
 +    and print a stats line
 +    Returns: IOs/s
 +    """
 +
 +    fh = open(dev, 'r')
 +    count = 0
 +    start = time.time()
 +    while time.time() < start+t:
 +        count += 1
 +        pos = random.randint(0, mediasize(dev) - blocksize) # need at least one block left
 +        pos &= ~0x1ff   # freebsd8: pos needs 512B sector alignment
 +        fh.seek(pos)
 +        blockdata = fh.read(blocksize)
 +    end = time.time()
 +
 +    t = end - start
 +
 +    fh.close()
 +
 +    return count/t
 +
 +
 +if __name__ == '__main__':
 +    # parse cli
 +    t = 10
 +    num_threads = 1
 +    dev = None
 +
 +    if len(sys.argv) < 2:
 +        raise SystemExit(USAGE)
 +
 +    while sys.argv:
 +        arg = sys.argv.pop(0)
 +        if arg in ['-n', '--num-threads']:
 +            num_threads = int(sys.argv.pop(0))
 +        elif arg in ['-t', '--time']:
 +            t = int(sys.argv.pop(0))
 +        else:
 +            dev = arg
 +
 +    # run benchmark
 +    blocksize = 512
 +    try:
 +        print "%s, %sB, %d threads:" % (dev, greek(mediasize(dev), 2, 'si'), num_threads)
 +        _iops = num_threads+1 # initial loop
 +        while _iops > num_threads and blocksize < mediasize(dev):
 +            # threading boilerplate
 +            threads = []
 +            results = []
 +            
 +            def results_wrap(results, func, *__args, **__kw):
 +                """collect return values from func"""
 +                result = func(*__args, **__kw)
 +                results.append(result)
 +
 +            for i in range(0, num_threads):
 +                _t = threading.Thread(target=results_wrap, args=(results, iops, dev, blocksize, t,))
 +                _t.start()
 +                threads.append(_t)
 +
 +            for _t in threads:
 +                _t.join()
 +            _iops = sum(results)
 +
 +            bandwidth = int(blocksize*_iops)
 +            print " %sB blocks: %6.1f IO/s, %sB/s (%sbit/s)" % (greek(blocksize), _iops,
 +                greek(bandwidth, 1), greek(8*bandwidth, 1, 'si'))
 +
 +            blocksize *= 2
 +    except IOError, (err_no, err_str):
 +        raise SystemExit(err_str)
 +    except KeyboardInterrupt:
 +        print "caught ctrl-c, bye."
 +
 +# eof.
 </code> </code>
python/programmes/multiprocessing1.1352655902.txt.gz · Dernière modification : 2012/11/11 17:45 de root