Outils pour utilisateurs

Outils du site


python:les_modules

Modules

Liste des modules inclus :

Chemin des modules

  • Dans l'environnement SHELL:
export PYTHONPATH=/home/user/python/lib/mylib
  • Dans le code python :
#!/usr/bin/python
import os, sys
sys.path.append(os.path.join(os.path.dirname(__file__), "lib"))
#sys.path.insert(0, os.path.join(os.path.dirname(__file__), "lib"))

optparse

pynotify

http://pyinotify.sourceforge.net/

notify.py
#!/usr/bin/env python
 
import os
import pyinotify
 
wm = pyinotify.WatchManager()
mask = pyinotify.IN_DELETE | pyinotify.IN_CREATE
 
class PTmp(pyinotify.ProcessEvent):
        def process_IN_CREATE(self, event):
                print "Create: %s " % os.path.join(event.path, event.name)
 
        def process_IN_DELETE(self, event):
                  print "Delete: %s " % os.path.join(event.path, event.name)
 
notifier = pyinotify.Notifier(wm, PTmp())
 
wdd = wm.add_watch('/home/gigi', mask, rec=True)
 
while True:
        try:
                notifier.process_events()
                if notifier.check_events():
                        notifier.read_events()
        except KeyboardInterrupt:
                notifier.stop()
                break
  • tail -f avec inotify :
tail.py
#!/usr/bin/env python
### Use inotify to tail logfile (argv[1]), mimicking tail -F (follow if rotated). 
###
### Usage: ./tail-F_inotify.py /path/to/file
###
### @author: "Charlie Schluting" <charlie@schluting.com>
### Copyright: the only free-as-in-freedom one: MIT license
###
 
import sys, os, pyinotify
from optparse import OptionParser
 
parser = OptionParser()
parser.add_option("--debug", help="print debug messages", action="store_true", dest="debug")
(options, args) = parser.parse_args()
 
myfile = args[0] 
if options.debug:
    print "I am totally opening " + myfile 
 
wm = pyinotify.WatchManager()
 
# watched events on the directory, and parse $path for file_of_interest:
dirmask = pyinotify.IN_MODIFY | pyinotify.IN_DELETE | pyinotify.IN_MOVE_SELF | pyinotify.IN_CREATE
 
# open file, skip to end..
global fh 
fh = open(myfile, 'r')
fh.seek(0,2)
 
# the event handlers:
class PTmp(pyinotify.ProcessEvent):
    def process_IN_MODIFY(self, event):
        if myfile not in os.path.join(event.path, event.name):
            return
        else:
    	    print fh.readline().rstrip()
 
    def process_IN_MOVE_SELF(self, event):
        if options.debug:
            print "The file moved! Continuing to read from that, until a new one is created.."
 
    def process_IN_CREATE(self, event):
        if myfile in os.path.join(event.path, event.name):
            # yay, I exist, umm.. again!
            global fh
            fh.close
            fh = open(myfile, 'r')
            # catch up, in case lines were written during the time we were re-opening:
            if options.debug:
                print "My file was created! I'm now catching up with lines in the newly created file." 
            for line in fh.readlines():
                print line.rstrip()
            # then skip to the end, and wait for more IN_MODIFY events
            fh.seek(0,2)
        return
 
notifier = pyinotify.Notifier(wm, PTmp())
 
# watch the directory, so we can get IN_CREATE events and re-open the file when logrotate comes along.
# if you just watch the file, pyinotify errors when it moves, saying "can't track, can't trust it.. watch 
#  the directory".
index = myfile.rfind('/')
wm.add_watch(myfile[:index], dirmask)
 
while True:
    try:
        notifier.process_events()
        if notifier.check_events():
            notifier.read_events()
    except KeyboardInterrupt:
        break
 
# cleanup: stop the inotify, and close the file handle:
notifier.stop()
fh.close()
 
sys.exit(0)

Scapy

source : http://www.kbrandt.com/2011/02/creating-a-histogram-of-tcp-window-sizes-from-a-packet-capture-using-python.html

scapy.py
from scapy.all import IP,TCP,rdpcap
from pylab import *
#Import the Capture File
a = rdpcap('trace.pcap')
#Filter the Capture File
b = [ pkt for pkt in a if IP in pkt and
      (pkt[IP].src == '10.7.0.12' or pkt[IP].dst == '10.7.0.12') ]
#Create an array of TCP Window sizes from the capture
wins = [ int(win[TCP].window) for win in b if TCP in win ]
#Create the Histogram
hist(wins, bins=100)
#Display it
ylabel('Frequency')
xlabel('TCP Window Size')
show()

Création d'une requête DNS :

>>> p = sr1(IP(dst="8.8.8.8")/UDP(sport=RandShort(),dport=53)/DNS(rd=1,qd=DNSQR(qname="velannes.com", qtype="A")), timeout=0.5)

Ou avec l'entête ethernet (noter que le nom de la fonction change) :

>>> p = srp1(Ether()/IP(dst="8.8.8.8")/UDP(sport=RandShort(),dport=53)/DNS(rd=1,qd=DNSQR(qname="velannes.com", qtype="A")))
Begin emission:
.Finished to send 1 packets.
*
Received 2 packets, got 1 answers, remaining 0 packets
>>> p
<Ether  dst=00:21:5d:5f:05:f8 src=00:12:17:16:0d:e7 type=0x800 |<IP  version=4L ihl=5L tos=0x0 len=74 id=20959 flags= frag=0L ttl=47 proto=udp chksum=0x67a4 src=8.8.8.8 dst=192.168.1.104 options=[] |<UDP  sport=domain dport=33487 len=54 chksum=0x4c65 |<DNS  id=0 qr=1L opcode=QUERY aa=0L tc=0L rd=1L ra=1L z=0L rcode=ok qdcount=1 ancount=1 nscount=0 arcount=0 qd=<DNSQR  qname='velannes.com.' qtype=A qclass=IN |> an=<DNSRR  rrname='velannes.com.' type=A rclass=IN ttl=25286 rdata='94.23.218.89' |> ns=None ar=None |>>>>
>>> p.summary()
'IP / UDP / DNS Ans "94.23.218.89" '

Fabric

  • Le fichier doit s'appeler fabfile.py.
  • fab -l pour voir les fonctions disponibles.
fabric.py
#!/usr/bin/env python
 
import sys, os, getpass
#import sys, os, getpass, multiprocessing
 
try:
        from fabric.api import *
        from fabric.colors import *
except:
        print('Fabric module must be installed !')
        sys.exit(1)
 
sys.tracebacklimit = 0
 
output['running'] = False
output['stdout'] = False
output['aborts'] = False
output['status'] = False
output['warnings'] = False
 
#env.user = 'root'
#env.password = None
 
#env.passwords = {
#    'user@host':'pass',
#    'gwuser@gwhost':'gwpass'
#}
#env.shell = "/bin/sh -c"
#env.gateway = None
env.disable_known_hosts = True
#env.parralel = True
#env.pool_size = multiprocessing.cpu_count() #Sets the number of concurrent processes to use when executing tasks in parallel.
#env.keepalive = 300
#env.timeout = 10 #Network ssh connection timeout, in seconds.
#env.command_timeout = 60
env.linewise = True
env.warn_only = True
 
env.role_default = 'adm'
 
env.roledefs = {
        'eth': ['molx1321', 'molx1322', 'molx1323', 'molx1324', 'molx1325', 'molx1326', 'molx1327', 'molx1328', 'molx1329', 'molx1330', 'molx1331', 'molx1332', 'molx1333', 'molx1334', 'molx1335', 'molx1336' ],
        'adm' : ['molx1321-adm', 'molx1322-adm', 'molx1323-adm', 'molx1324-adm', 'molx1325-adm', 'molx1326-adm', 'molx1327-adm', 'molx1328-adm', 'molx1329-adm', 'molx1330-adm', 'molx1331-adm', 'molx1332-adm', 'molx1333-adm', 'molx1334-adm', 'molx1335-adm', 'molx1336-adm' ],
        'ib' : ['molx1321-ib', 'molx1322-ib', 'molx1323-ib', 'molx1324-ib', 'molx1325-ib', 'molx1326-ib', 'molx1327-ib', 'molx1328-ib', 'molx1329-ib', 'molx1330-ib', 'molx1331-ib', 'molx1332-ib', 'molx1333-ib', 'molx1334-ib', 'molx1335-ib', 'molx1336-ib' ],
        'imm' : ['molx1321-imm', 'molx1322-imm', 'molx1323-imm', 'molx1324-imm', 'molx1325-imm', 'molx1326-imm', 'molx1327-imm', 'molx1328-imm', 'molx1329-imm', 'molx1330-imm', 'molx1331-imm', 'molx1332-imm', 'molx1333-imm', 'molx1334-imm', 'molx1335-imm', 'molx1336-imm' ],
        'web_adm' : ['sw-eth','sw-ib'],
}
 
env.roledefs['web_adm'] += env.roledefs['imm']
 
if env.hosts == [] and env.roles == []:
        env.hosts = env.roledefs[env.role_default]
 
#list_all = []
#for server in env.roledefs.values():
#       list_all += server
#env.roledefs['all'] = list(set(list_all))
 
def VERIF_IPMI_PASSWORD():
        if 'IPMI_PASSWORD' in os.environ:
                globals()['IPMI_PASSWORD'] = os.environ['IPMI_PASSWORD']
 
        if 'IPMI_PASSWORD' not in globals():
                IPMI_PASSWORD = getpass.getpass(prompt='IMM password : ')
                globals()['IPMI_PASSWORD'] = IPMI_PASSWORD
 
@task
@serial
def lr():
        """ : Liste les roles utilisables avec l'option '-R'"""
        for role in env.roledefs:
                if role == env.role_default:
                        print("%s (default) : %s" % (role, ' ,'.join(env.roledefs[role])))
                else:
                        print("%s : %s" % (role, ' ,'.join(env.roledefs[role])))
        sys.exit()
 
#@parallel(pool_size = multiprocessing.cpu_count() + 1)
@task
def cmd(cmd, show = None):
        """ : Run command on a host (show='details' for details, show='status' for see if command is successful or not)"""
        #with show('stdout','running','warnings'):
        try:
                out = run(cmd)
        except Exception as err:
                sys.stdout.write(yellow("%(host)s (%(user)s) : " % env, bold = True))
                print(red(err))
                return
 
        if show == None:
                for line in out.splitlines():
                        sys.stdout.write(yellow("%(host)s (%(user)s) : " % env, bold = True))
                        print(line)
 
                if out.failed:
                        sys.stdout.write(yellow("%(host)s (%(user)s) : " % env, bold = True))
                        print(red(' KO', bold = True)) + " (return %s)" % out.return_code
        elif show == 'details' or show == 'status':
                sys.stdout.write(yellow("Executing on %(host)s as %(user)s : " % env, bold = True) + blue(cmd, bold = True))
                if out.failed:
                        print(red(' KO', bold = True))
                else:
                        print(green(' OK', bold = True))
 
                if show ==  'details':
                        print(yellow('OUT : (return code = %s)' % out.return_code))
                        print('%s\n' % out)
        else:
                print("Option %s unknow" % show)
                sys.exit(1)
 
@task
def send_file(src, dst = None, mode = None):
        """ : Send file on a host (dst=<destination>, mode=<mode>)"""
        if dst == None:
                dst = src
 
 
        if mode == None:
                put(src, dst)
        else:
                try:
                        int(mode)
                except:
                        print("'mode' value must be an integer !")
                        sys.exit(1)
 
                put(src, dst, mode=mode)
@task
def reboot(wait = 0):
        """ : Reboot a host (wait=<secondes>)"""
        if os.getlogin() != 'root':
                print('Only root must be reboot a host')
                sys.exit(1)
 
        try:
                int(wait)
        except:
                print("'wait' value must be an integer !")
                sys.exit(1)
 
        reboot(wait = wait)
 
@serial
@task
def power(state):
        """ : IMM power manager (state=<status|on|off|cycle|reset|diag|soft>)"""
        if not env.host in env.roledefs['imm']:
                print('Use "-R imm" or "-H <host>-imm"')
                print('%s not in this table : %s' % (env.host, ','.join(env.roledefs['imm'])))
                sys.exit(1)
 
        if state not in ['status','on','off','cycle','reset','diag','soft']:
                print("%s state is not supported. Please use state <status|on|off|cycle|reset|diag|soft> !" % state)
                sys.exit(1)
 
        VERIF_IPMI_PASSWORD()
 
        out = local("ipmitool -I lanplus -C 2 -U system -P '%s' -E -H %s chassis power %s" % (globals()['IPMI_PASSWORD'], env.host, state), capture=True)
 
        if out.failed:
                sys.stdout.write(yellow("%(host)s : " % env, bold = True))
                print(red('Failed ! (return code : %s)' % out.return_code))
                print(out.stderr)
                sys.exit(1)
        else:
                for line in out.splitlines():
                        sys.stdout.write(yellow("%(host)s : " % env, bold = True))
                        print(line)
 
@serial
@task
def console(action='activate'):
        """ : IMM SOL (action=<activate|deactivate>)"""
        if len(env.hosts) != 1:
                print('Only on one host. Use option -H to precise the host !')
                sys.exit(1)
 
        if not env.host in env.roledefs['imm']:
                print('Use "-H <host>-imm"')
                print('%s not in this table : %s' % (env.host, ','.join(env.roledefs['imm'])))
                sys.exit(1)
 
        VERIF_IPMI_PASSWORD()
 
        os.system("ipmitool -I lanplus -C 2 -U system -P '%s' -E -H %s sol %s" % (globals()['IPMI_PASSWORD'], env.host, action))
 
@serial
@task
def web_adm():
        """ : Open Firefox to administrate (use -H option)"""
        if len(env.hosts) != 1:
                print('Only on one host. Use option -H to precise the host (%s)!' % ', '.join(env.roledefs['web_adm']))
                sys.exit(1)
 
        if os.getenv('DISPLAY') == None:
                print('You must have a valid display !')
                sys.exit(1)
 
        if env.host not in env.roledefs['web_adm']:
                print("%s is not a valid host ! Valid host are %s" % (env.host, ', '.join(env.roledefs['web_adm'])))
                sys.exit(1)
 
        os.system('firefox https://%s/' % env.host)

readline

readline.py
import readline, signal
prompt = 'GigiX > '
 
def signal_handler(signal, frame):
    print
    input(prompt)
    return
 
signal.signal(signal.SIGINT, signal_handler)
readline.parse_and_bind("tab: complete")
 
class completer:
        def __init__(self, tree):
            self.tree = tree
 
        def _parser(self, line, search):
            if (search is None) or (callable(search) == True):
                return []
            elif len(line) == 0:
                return [x for x in search]
            elif len(line) == 1 and readline.get_line_buffer()[-1] != ' ':
                return ['%s ' % x for x in search if x.startswith(line[0])]
            else:
                return self._parser(line[1:], search[line[0]])
            return []
 
        def complete(self, text, state):
            line = readline.get_line_buffer().split()
            results = self._parser(line, self.tree)
            return results[state]
 
def linux_sles_12():
    print('Je suis Linux SLES 12')
 
tree = {
    'unix': {
        'solaris': {
            '8':None,
            '9':None,
            '10':None,
            '11':None,
        }
    },
    'linux': {
        'debian': {
            'buzz':None,
            'rex':None,
            'bo':None,
            'hamm':None,
            'slink':None,
            'patato':None,
            'woody':None,
            'sarge':None,
            'etch':None,
            'lenny':None,
            'squeeze':None,
            'wheezy':None,
            'jessie':None,
    },
        'RHEL': {
            '3':None,
            '4':None,
            '5':None,
            '6':None,
            '7':None,
        },
        'SLES': {
            '9':None,
            '10':None,
            '11':None,
            '12':linux_sles_12,
        }
    },
    'windows': {
        'xp':None,
        'vista':None,
        'seven':None,
        '8':None,
        '10':None,
    }
}
 
readline.set_completer(completer(tree).complete)
l = ''
while l != ('quit' or 'exit'):
    try:
            l = input(prompt)
            eval("tree['"+"']['".join(l.split())+"']()")
    except EOFError:
            print('Bye')
            break
    except:
        continue

Twisted

Cette bibliothèque est orientée vers le réseau. Elle supporte de nombreux protocoles de communication réseau : TCP, UDP, SMTP, HTTP, PROXY, SSH…

Shutil

Psutils

Ctypes

Ce module permet de charger une librairie C et de faire appelles aux fonctions de la librairies.

Aide : http://docs.python.org/release/2.7.2/library/ctypes.html

Exemple :

ctypes.py
#!/usr/bin/env python
 
import ctypes, ctypes.util
 
libc=ctypes.cdll.LoadLibrary(ctypes.util.find_library('c'))
libc.printf("uid = %d - gid = %d\n", libc.geteuid(), libc.getgid())

Si on veut faire appel à un syscall en langage C il faut connaître l'id. Ci dessous les id des systems call pour 32 et 64 bits :

ConfigParser

Permet de charger / sauvegarder une configuration

  • Sauvegarde
  • ConfigParser.ConfigParser permet de déclarer notre objet
  • cp.add_section(section) permet de rajouter une section à notre structure
  • cp.set(section, option, value) permet de rajouter la clé option à la section existante section avec la valeur value
  • cp.write(fileobject) permet de sauvegarder la structure
ConfigParser.py
import ConfigParser
 
config = ConfigParser.ConfigParser()
 
config.add_section('Section String')
config.set('Section String', 'str1', 'MonString')
config.add_section('Section Integer')
config.set('Section String', 'int1', 100)
 
config.write(open('conf.cfg','w'))
  • Chargement

On peut utiliser 4 méthodes dépendantes du type de la clé:

  • cp.get( section, option) pour les clés de type string
  • cp.getint( section, option) pour les clés de type integer
  • cp.getfloat( section, option) pour les clés de type float
  • cp.getboolean( section, option) pour les clés de type boolean
config.read('conf.cfg')	
monint = config.getint('Section Integer', 'int1')
monstring = config.get('Section String', 'str1')

fcntl

Locker un fichier :

fcntl.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import fcntl, time
 
myfile = open('/tmp/gigix', 'r')
 
try:
        fcntl.flock(myfile, fcntl.LOCK_EX|fcntl.LOCK_NB)
except IOError:
        print("Can't immediately write-lock the file, waiting...")
        fcntl.flock(myfile, fcntl.LOCK_EX)
finally:
        print('Lock acquire, sleep 30 secs...')
        time.sleep(30)
        fcntl.flock(myfile, fcntl.LOCK_UN)
        myfile.close()
        print('Bye...')
  • LOCK_UN : unlock
  • LOCK_SH : acquire a shared lock
  • LOCK_EX : acquire an exclusive lock

datetime / calendar / time

Modules qui permettent de manipuler les dates :

ftplib

Module permettant la connexion à un serveur FTP :

smtplib / email

Envoyer des emails en python :

poplib / imaplib

Lire ses emails avec python :

PAM

Fourni dans le paquage python-pam : /usr/share/doc/python-pam/examples/pamtest.py

pam.py
#!/usr/bin/env python
 
import sys
import PAM
from getpass import getpass
 
def pam_conv(auth, query_list, userData):
 
        resp = []
 
        for i in range(len(query_list)):
                query, type = query_list[i]
                if type == PAM.PAM_PROMPT_ECHO_ON:
                        val = raw_input(query)
                        resp.append((val, 0))
                elif type == PAM.PAM_PROMPT_ECHO_OFF:
                        val = getpass(query)
                        resp.append((val, 0))
                elif type == PAM.PAM_PROMPT_ERROR_MSG or type == PAM.PAM_PROMPT_TEXT_INFO:
                        print query
                        resp.append(('', 0))
                else:
                        return None
 
        return resp
 
service = 'passwd'
 
if len(sys.argv) == 2:
        user = sys.argv[1]
else:
        user = None
 
auth = PAM.pam()
auth.start(service)
if user != None:
        auth.set_item(PAM.PAM_USER, user)
auth.set_item(PAM.PAM_CONV, pam_conv)
try:
        auth.authenticate()
        auth.acct_mgmt()
except PAM.error, resp:
        print('Go away! (%s)' % resp)
except:
        print('Internal error')
else:
        print('Good to go!')

sqlalchemy

sqlalchemy est un ORM permet de se connecter aux BDD de manière objet plusieurs connecteurs (postgres, mysql, sqlite…info connecteurs) :

itertools

  • combinations :
>>> list(itertools.combinations(['A','B','C','D'],3))
[('A', 'B', 'C'), ('A', 'B', 'D'), ('A', 'C', 'D'), ('B', 'C', 'D')]
  • imap, ifilter, ifilterfalse :
>>> from itertools import imap, ifilter, ifilterfalse
>>> pair = lambda n: n % 2 == 0
>>> carre = lambda n: n ** 2
>>> imap(carre, xrange(10)) # Renvoie un générateur
<itertools.imap object at 0x7f4ab174fc10>
>>> for elem in imap(carre, xrange(10)): print elem
...
0
1
4
9
16
25
36
49
64
81
>>> list(ifilter(pair, xrange(10)))
[0, 2, 4, 6, 8]
>>> list(ifilterfalse(pair, xrange(10)))
[1, 3, 5, 7, 9]
  • chain :
>>> from itertools import chain
>>> list(chain(xrange(11), xrange(15, 21), xrange(30, 46)))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 16, 17, 18, 19, 20, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45]
  • takewhile, dropwhile :
>>> from itertools import takewhile, dropwhile
>>> list(takewhile(lambda i: i < 10, xrange(20))) # On prend tant que i < 10
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(dropwhile(lambda i: i < 10, xrange(20))) # On jette tant que i < 10
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
  • groupby :
>>> from itertools import groupby
>>> liste = list('aaaaabbbcddddee')
>>> for element, iterateur in groupby(liste):
...     # element est l'élément répété une ou plusieurs fois
...     # iterateur est un itérateur sur la répétition de cet élément ; on l'utilise particulièrement pour savoir combien de fois il est répété
...     print "%s est répété %d fois" % (element, len(list(iterateur)))
...
a est répété 5 fois
b est répété 3 fois
c est répété 1 fois
d est répété 4 fois
e est répété 2 fois
  • izip :
>>> for i in itertools.izip([1, 2, 3], ['a', 'b', 'c']):
...  i
...
(1, 'a')
(2, 'b')
(3, 'c')
>>> for i in itertools.izip(itertools.count(5), ['a', 'b', 'c']): i
...
(5, 'a')
(6, 'b')
(7, 'c')
  • cycle :
>>> for i in itertools.cycle(['a', 'b', 'c']):i
'a'
'b'
'c'
'a'
'b'
'c'
'a'
'b'
'c'
'a'
'b'
'c'
...
...

string

>>> string.ascii_letters
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
>>> string.digits
'0123456789'
>>> string.printable
'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ \t\n\r\x0b\x0c'
>>> string.punctuation
'!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
>>> string.whitespace
'\t\n\x0b\x0c\r '

zip

>>> zip(['a', 'b', 'c'], ['d', 'e', 'f'])
[('a', 'd'), ('b', 'e'), ('c', 'f')]

operator

Trie par rapport a un index :

>>> import operator
>>> d = dict(a=1, b=2, c=1, d=2, e=1, f=2, g=3)
>>> sorted(d.iteritems(), key=operator.itemgetter(1))
[('a', 1), ('c', 1), ('e', 1), ('b', 2), ('d', 2), ('f', 2), ('g', 3)]

dbus

Dbus permet d’interagir avec les autres programmes identifiés par DBUS comme knotify, le service de notifications KDE.

Pour voir la liste des services DBUS, lancer la commande qdbus :

gigi@debian:~$ qdbus| grep knotify
 org.kde.knotify
knotify.py
#!/usr/bin/env python
import sys, dbus
 
knotify = dbus.SessionBus().get_object("org.kde.knotify", "/Notify")
try:
    title, text = sys.argv[1:3]
except:
    print 'Usage: knotify.py title text'
    sys.exit(1)
 
knotify.event("warning", "kde", [], title, text, [], [], 0, 0,dbus_interface="org.kde.KNotify")
python/les_modules.txt · Dernière modification : 2017/06/03 13:08 de root