Outils pour utilisateurs

Outils du site


python:programmes:multiprocessing1

Pourquoi il faut utiliser le multiprocessing et non le threading sous python : http://blog.unix4fun.net/post/2007/09/17/python-le-GIL-pourquoi-et-comment-faire

Module multiprocessing

Méthode avec Pool :

multiprocessing1.py
#!/usr/bin/env python
 
from multiprocessing import Pool, cpu_count
from time import sleep
 
def f(x,y):
    sleep(2)
    return x*y
 
def k(x,y):
    sleep(2)
    return x+y
 
if __name__ == '__main__':
    result=[]
    pool = Pool(processes = cpu_count())  # start x worker processes
    result.append(pool.apply_async(f, (5,10,)))  # evaluate "f(10)" asynchronously
    result.append(pool.apply_async(k, (10,10,)))  # evaluate "f(10)" asynchronously
 
    for r in result:
         try:
             print r.get(timeout=3)                     # timeout 3 secs
         except:
             print "timeout..."

Méthode avec Queue (échanges interprocess) : Extrait de http://download.velannes.com/Python/processing.pdf

multiprocessing2.py
#!/usr/bin/env python
 
from multiprocessing import Queue, Process, current_process
 
def worker(tasks, results):
	# Get a tasks
        t = tasks.get()
 
	# Do operation
	result = t * 2
 
	# Put the result in results queue
	results.put([current_process().name, t, "*", 2, result])
 
if __name__ == '__main__':
	# Number of processes
	n = 100
 
	# Create my tasks and results Queue
	myTasks = Queue()
	myResults = Queue()
 
	# Create n process
	Workers = [ Process(target=worker, args=(myTasks, myResults)) for i in range(n) ]
 
	# Start processes
	for each in Workers:
		each.start()
 
	# Create tasks
	for each in range(n):
		myTasks.put(each)
 
	# Get the results
	for each in range(n):
		result = myResults.get()
		print("Res : %s" % result)

Module threading

Exemple de bench IO avec des threads :

threading.py
#!/usr/bin/env python
#
# Copyright (c) 2008-2010 Benjamin Schweizer and others.
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
#
# Abstract
# ~~~~~~~~
# Benchmark disk IOs
#
# Authors
# ~~~~~~~
# Benjamin Schweizer, http://benjamin-schweizer.de/contact
# Uwe Menges
#
# Changes
# ~~~~~~~
# 2010-08-12, benjamin/uwe: added multi-threading support
# 2010-07-22, benjamin: fixed 32bit ioctls on bsd
# 2010-07-21, benjamin: freebsd/osx support, switched to isc license
# 2009-09-16, uwe: changed formatting, fixed last block bug
# 2008-10-16, benjamin: initial release
#
# Todo
# ~~~~
# - check/add netbsd/openbsd mediasize ioctls
#
 
USAGE = """Copyright (c) 2008-2010 Benjamin Schweizer and others.
 
usage:
 
    iops [-n|--num_threads threads] [-t|--time time] <device>
 
    threads := number of concurrent io threads, default 1
    time    := time in seconds, default 10
    device  := some block device, like /dev/sda
 
example:
 
    iops /dev/sda
    iops -n 8 -t 2 /dev/disk0
 
"""
 
import sys
import fcntl
import array
import struct
import random
import time
import threading
 
def mediasize(dev):
    """report the media size for a device"""
    # caching
    global _mediasizes
    if not '_mediasizes' in globals(): _mediasizes = {}
    if dev in _mediasizes:
        return _mediasizes[dev]
 
    mediasize = 0 # bytes
 
    if sys.platform in ['darwin']:
        # mac os x 10.5+ sysctl from sys/disk.h
        DKIOCGETBLOCKSIZE = 0x40046418
        DKIOCGETBLOCKCOUNT = 0x40086419
 
        fh = open(dev, 'r')
        buf = array.array('B', range(0,4))  # uint32
        r = fcntl.ioctl(fh.fileno(), DKIOCGETBLOCKSIZE, buf, 1)
        blocksize = struct.unpack('I', buf)[0]
        buf = array.array('B', range(0,8))  # uint64
        r = fcntl.ioctl(fh.fileno(), DKIOCGETBLOCKCOUNT, buf, 1)
        blockcount = struct.unpack('Q', buf)[0]
        fh.close()
        mediasize = blocksize*blockcount
 
    elif sys.platform in ['freebsd8']:
        # freebsd 8 sysctl from sys/disk.h
        DIOCGMEDIASIZE = 0x40086481
 
        fh = open(dev, 'r')
        buf = array.array('B', range(0,8))  # off_t / int64
        r = fcntl.ioctl(fh.fileno(), DIOCGMEDIASIZE, buf, 1)
        mediasize = struct.unpack('q', buf)[0]
        fh.close()
 
    else: # linux or compat
        # linux 2.6 lseek from fcntl.h
        SEEK_SET=0
        SEEK_CUR=1
        SEEK_END=2
 
        fh = open(dev, 'r')
        fh.seek(0,SEEK_END)
        mediasize = fh.tell()
        fh.close()
 
    if not mediasize:
        raise Exception("cannot determine media size")
 
    _mediasizes[dev] = mediasize
    return mediasize
 
def greek(value, precision=0, prefix=None):
    """Return a string representing the IEC or SI suffix of a value"""
    # Copyright (c) 1999 Martin Pohl, copied from
    # http://mail.python.org/pipermail/python-list/1999-December/018519.html
    if prefix:
        # Use SI (10-based) units
        _abbrevs = [
            (10**15, 'P'),
            (10**12, 'T'),
            (10** 9, 'G'),
            (10** 6, 'M'),
            (10** 3, 'k'),
            (1     , ' ')
        ]
    else:
        # Use IEC (2-based) units
        _abbrevs = [
            (1<<50L, 'Pi'),
            (1<<40L, 'Ti'),
            (1<<30L, 'Gi'),
            (1<<20L, 'Mi'),
            (1<<10L, 'Ki'),
            (1     , '  ')
        ]
 
    for factor, suffix in _abbrevs:
        if value >= factor:
            break
 
    if precision == 0:
        return "%3.d %s" % (int(value/factor), suffix)
    else:
        fmt="%%%d.%df %%s" % (4+precision, precision)
        return fmt % (float(value)/factor, suffix)
 
 
def iops(dev, blocksize=512, t=10):
    """measure input/output operations per second
    Perform random 512b aligned reads of blocksize bytes on fh for t seconds
    and print a stats line
    Returns: IOs/s
    """
 
    fh = open(dev, 'r')
    count = 0
    start = time.time()
    while time.time() < start+t:
        count += 1
        pos = random.randint(0, mediasize(dev) - blocksize) # need at least one block left
        pos &= ~0x1ff   # freebsd8: pos needs 512B sector alignment
        fh.seek(pos)
        blockdata = fh.read(blocksize)
    end = time.time()
 
    t = end - start
 
    fh.close()
 
    return count/t
 
 
if __name__ == '__main__':
    # parse cli
    t = 10
    num_threads = 1
    dev = None
 
    if len(sys.argv) < 2:
        raise SystemExit(USAGE)
 
    while sys.argv:
        arg = sys.argv.pop(0)
        if arg in ['-n', '--num-threads']:
            num_threads = int(sys.argv.pop(0))
        elif arg in ['-t', '--time']:
            t = int(sys.argv.pop(0))
        else:
            dev = arg
 
    # run benchmark
    blocksize = 512
    try:
        print "%s, %sB, %d threads:" % (dev, greek(mediasize(dev), 2, 'si'), num_threads)
        _iops = num_threads+1 # initial loop
        while _iops > num_threads and blocksize < mediasize(dev):
            # threading boilerplate
            threads = []
            results = []
 
            def results_wrap(results, func, *__args, **__kw):
                """collect return values from func"""
                result = func(*__args, **__kw)
                results.append(result)
 
            for i in range(0, num_threads):
                _t = threading.Thread(target=results_wrap, args=(results, iops, dev, blocksize, t,))
                _t.start()
                threads.append(_t)
 
            for _t in threads:
                _t.join()
            _iops = sum(results)
 
            bandwidth = int(blocksize*_iops)
            print " %sB blocks: %6.1f IO/s, %sB/s (%sbit/s)" % (greek(blocksize), _iops,
                greek(bandwidth, 1), greek(8*bandwidth, 1, 'si'))
 
            blocksize *= 2
    except IOError, (err_no, err_str):
        raise SystemExit(err_str)
    except KeyboardInterrupt:
        print "caught ctrl-c, bye."
 
# eof.
python/programmes/multiprocessing1.txt · Dernière modification : 2013/03/12 23:07 de root