====== Modules ====== Liste des modules inclus : * [[http://docs.python.org/2/library/|Python 2]] * [[http://docs.python.org/3/library/|Python 3]] ===== Chemin des modules ===== * Dans l'environnement SHELL: export PYTHONPATH=/home/user/python/lib/mylib * Dans le code python : #!/usr/bin/python import os, sys sys.path.append(os.path.join(os.path.dirname(__file__), "lib")) #sys.path.insert(0, os.path.join(os.path.dirname(__file__), "lib")) ===== optparse ===== Doc officielle : http://docs.python.org/2/library/optparse.html ===== pynotify ===== http://pyinotify.sourceforge.net/ #!/usr/bin/env python import os import pyinotify wm = pyinotify.WatchManager() mask = pyinotify.IN_DELETE | pyinotify.IN_CREATE class PTmp(pyinotify.ProcessEvent): def process_IN_CREATE(self, event): print "Create: %s " % os.path.join(event.path, event.name) def process_IN_DELETE(self, event): print "Delete: %s " % os.path.join(event.path, event.name) notifier = pyinotify.Notifier(wm, PTmp()) wdd = wm.add_watch('/home/gigi', mask, rec=True) while True: try: notifier.process_events() if notifier.check_events(): notifier.read_events() except KeyboardInterrupt: notifier.stop() break * tail -f avec inotify : #!/usr/bin/env python ### Use inotify to tail logfile (argv[1]), mimicking tail -F (follow if rotated). ### ### Usage: ./tail-F_inotify.py /path/to/file ### ### @author: "Charlie Schluting" ### Copyright: the only free-as-in-freedom one: MIT license ### import sys, os, pyinotify from optparse import OptionParser parser = OptionParser() parser.add_option("--debug", help="print debug messages", action="store_true", dest="debug") (options, args) = parser.parse_args() myfile = args[0] if options.debug: print "I am totally opening " + myfile wm = pyinotify.WatchManager() # watched events on the directory, and parse $path for file_of_interest: dirmask = pyinotify.IN_MODIFY | pyinotify.IN_DELETE | pyinotify.IN_MOVE_SELF | pyinotify.IN_CREATE # open file, skip to end.. global fh fh = open(myfile, 'r') fh.seek(0,2) # the event handlers: class PTmp(pyinotify.ProcessEvent): def process_IN_MODIFY(self, event): if myfile not in os.path.join(event.path, event.name): return else: print fh.readline().rstrip() def process_IN_MOVE_SELF(self, event): if options.debug: print "The file moved! Continuing to read from that, until a new one is created.." def process_IN_CREATE(self, event): if myfile in os.path.join(event.path, event.name): # yay, I exist, umm.. again! global fh fh.close fh = open(myfile, 'r') # catch up, in case lines were written during the time we were re-opening: if options.debug: print "My file was created! I'm now catching up with lines in the newly created file." for line in fh.readlines(): print line.rstrip() # then skip to the end, and wait for more IN_MODIFY events fh.seek(0,2) return notifier = pyinotify.Notifier(wm, PTmp()) # watch the directory, so we can get IN_CREATE events and re-open the file when logrotate comes along. # if you just watch the file, pyinotify errors when it moves, saying "can't track, can't trust it.. watch # the directory". index = myfile.rfind('/') wm.add_watch(myfile[:index], dirmask) while True: try: notifier.process_events() if notifier.check_events(): notifier.read_events() except KeyboardInterrupt: break # cleanup: stop the inotify, and close the file handle: notifier.stop() fh.close() sys.exit(0) ===== Scapy ===== * [[http://wiki.spiritofhack.net/index.php/Scapy-usage|Tuto Scapy 1]] * [[http://repo.zenk-security.com/Protocoles_reseaux_securisation/Les%20Fourberies%20de%20Scapy.pdf|Tuto scapy 2]] * [[http://www.siteduzero.com/tutoriel-3-568537-manipulez-les-paquets-reseau-avec-scapy.html|Tuto scapy 3]] * [[http://libpfb.so/uploads/media/Cours_scapy.pdf|Tuto Scapy 4]] (le plus complet) source : http://www.kbrandt.com/2011/02/creating-a-histogram-of-tcp-window-sizes-from-a-packet-capture-using-python.html from scapy.all import IP,TCP,rdpcap from pylab import * #Import the Capture File a = rdpcap('trace.pcap') #Filter the Capture File b = [ pkt for pkt in a if IP in pkt and (pkt[IP].src == '10.7.0.12' or pkt[IP].dst == '10.7.0.12') ] #Create an array of TCP Window sizes from the capture wins = [ int(win[TCP].window) for win in b if TCP in win ] #Create the Histogram hist(wins, bins=100) #Display it ylabel('Frequency') xlabel('TCP Window Size') show() Création d'une requête DNS : >>> p = sr1(IP(dst="8.8.8.8")/UDP(sport=RandShort(),dport=53)/DNS(rd=1,qd=DNSQR(qname="velannes.com", qtype="A")), timeout=0.5) Ou avec l'entête ethernet (noter que le nom de la fonction change) : >>> p = srp1(Ether()/IP(dst="8.8.8.8")/UDP(sport=RandShort(),dport=53)/DNS(rd=1,qd=DNSQR(qname="velannes.com", qtype="A"))) Begin emission: .Finished to send 1 packets. * Received 2 packets, got 1 answers, remaining 0 packets >>> p an= ns=None ar=None |>>>> >>> p.summary() 'IP / UDP / DNS Ans "94.23.218.89" ' ===== Fabric ===== * [[http://docs.fabfile.org/en/latest/index.html|Fabric - Pour coder des scripts de déploiement]] * [[http://docs.fabfile.org/en/latest/tutorial.html|Fabric - Tuto]] * [[http://www.unixgarden.com/index.php/gnu-linux-magazine/fabric-administration-distante-dun-parc-dynamique-de-serveursmachines/3|Exemple]] * Le fichier doit s'appeler **fabfile.py**. * **fab -l** pour voir les fonctions disponibles. #!/usr/bin/env python import sys, os, getpass #import sys, os, getpass, multiprocessing try: from fabric.api import * from fabric.colors import * except: print('Fabric module must be installed !') sys.exit(1) sys.tracebacklimit = 0 output['running'] = False output['stdout'] = False output['aborts'] = False output['status'] = False output['warnings'] = False #env.user = 'root' #env.password = None #env.passwords = { # 'user@host':'pass', # 'gwuser@gwhost':'gwpass' #} #env.shell = "/bin/sh -c" #env.gateway = None env.disable_known_hosts = True #env.parralel = True #env.pool_size = multiprocessing.cpu_count() #Sets the number of concurrent processes to use when executing tasks in parallel. #env.keepalive = 300 #env.timeout = 10 #Network ssh connection timeout, in seconds. #env.command_timeout = 60 env.linewise = True env.warn_only = True env.role_default = 'adm' env.roledefs = { 'eth': ['molx1321', 'molx1322', 'molx1323', 'molx1324', 'molx1325', 'molx1326', 'molx1327', 'molx1328', 'molx1329', 'molx1330', 'molx1331', 'molx1332', 'molx1333', 'molx1334', 'molx1335', 'molx1336' ], 'adm' : ['molx1321-adm', 'molx1322-adm', 'molx1323-adm', 'molx1324-adm', 'molx1325-adm', 'molx1326-adm', 'molx1327-adm', 'molx1328-adm', 'molx1329-adm', 'molx1330-adm', 'molx1331-adm', 'molx1332-adm', 'molx1333-adm', 'molx1334-adm', 'molx1335-adm', 'molx1336-adm' ], 'ib' : ['molx1321-ib', 'molx1322-ib', 'molx1323-ib', 'molx1324-ib', 'molx1325-ib', 'molx1326-ib', 'molx1327-ib', 'molx1328-ib', 'molx1329-ib', 'molx1330-ib', 'molx1331-ib', 'molx1332-ib', 'molx1333-ib', 'molx1334-ib', 'molx1335-ib', 'molx1336-ib' ], 'imm' : ['molx1321-imm', 'molx1322-imm', 'molx1323-imm', 'molx1324-imm', 'molx1325-imm', 'molx1326-imm', 'molx1327-imm', 'molx1328-imm', 'molx1329-imm', 'molx1330-imm', 'molx1331-imm', 'molx1332-imm', 'molx1333-imm', 'molx1334-imm', 'molx1335-imm', 'molx1336-imm' ], 'web_adm' : ['sw-eth','sw-ib'], } env.roledefs['web_adm'] += env.roledefs['imm'] if env.hosts == [] and env.roles == []: env.hosts = env.roledefs[env.role_default] #list_all = [] #for server in env.roledefs.values(): # list_all += server #env.roledefs['all'] = list(set(list_all)) def VERIF_IPMI_PASSWORD(): if 'IPMI_PASSWORD' in os.environ: globals()['IPMI_PASSWORD'] = os.environ['IPMI_PASSWORD'] if 'IPMI_PASSWORD' not in globals(): IPMI_PASSWORD = getpass.getpass(prompt='IMM password : ') globals()['IPMI_PASSWORD'] = IPMI_PASSWORD @task @serial def lr(): """ : Liste les roles utilisables avec l'option '-R'""" for role in env.roledefs: if role == env.role_default: print("%s (default) : %s" % (role, ' ,'.join(env.roledefs[role]))) else: print("%s : %s" % (role, ' ,'.join(env.roledefs[role]))) sys.exit() #@parallel(pool_size = multiprocessing.cpu_count() + 1) @task def cmd(cmd, show = None): """ : Run command on a host (show='details' for details, show='status' for see if command is successful or not)""" #with show('stdout','running','warnings'): try: out = run(cmd) except Exception as err: sys.stdout.write(yellow("%(host)s (%(user)s) : " % env, bold = True)) print(red(err)) return if show == None: for line in out.splitlines(): sys.stdout.write(yellow("%(host)s (%(user)s) : " % env, bold = True)) print(line) if out.failed: sys.stdout.write(yellow("%(host)s (%(user)s) : " % env, bold = True)) print(red(' KO', bold = True)) + " (return %s)" % out.return_code elif show == 'details' or show == 'status': sys.stdout.write(yellow("Executing on %(host)s as %(user)s : " % env, bold = True) + blue(cmd, bold = True)) if out.failed: print(red(' KO', bold = True)) else: print(green(' OK', bold = True)) if show == 'details': print(yellow('OUT : (return code = %s)' % out.return_code)) print('%s\n' % out) else: print("Option %s unknow" % show) sys.exit(1) @task def send_file(src, dst = None, mode = None): """ : Send file on a host (dst=, mode=)""" if dst == None: dst = src if mode == None: put(src, dst) else: try: int(mode) except: print("'mode' value must be an integer !") sys.exit(1) put(src, dst, mode=mode) @task def reboot(wait = 0): """ : Reboot a host (wait=)""" if os.getlogin() != 'root': print('Only root must be reboot a host') sys.exit(1) try: int(wait) except: print("'wait' value must be an integer !") sys.exit(1) reboot(wait = wait) @serial @task def power(state): """ : IMM power manager (state=)""" if not env.host in env.roledefs['imm']: print('Use "-R imm" or "-H -imm"') print('%s not in this table : %s' % (env.host, ','.join(env.roledefs['imm']))) sys.exit(1) if state not in ['status','on','off','cycle','reset','diag','soft']: print("%s state is not supported. Please use state !" % state) sys.exit(1) VERIF_IPMI_PASSWORD() out = local("ipmitool -I lanplus -C 2 -U system -P '%s' -E -H %s chassis power %s" % (globals()['IPMI_PASSWORD'], env.host, state), capture=True) if out.failed: sys.stdout.write(yellow("%(host)s : " % env, bold = True)) print(red('Failed ! (return code : %s)' % out.return_code)) print(out.stderr) sys.exit(1) else: for line in out.splitlines(): sys.stdout.write(yellow("%(host)s : " % env, bold = True)) print(line) @serial @task def console(action='activate'): """ : IMM SOL (action=)""" if len(env.hosts) != 1: print('Only on one host. Use option -H to precise the host !') sys.exit(1) if not env.host in env.roledefs['imm']: print('Use "-H -imm"') print('%s not in this table : %s' % (env.host, ','.join(env.roledefs['imm']))) sys.exit(1) VERIF_IPMI_PASSWORD() os.system("ipmitool -I lanplus -C 2 -U system -P '%s' -E -H %s sol %s" % (globals()['IPMI_PASSWORD'], env.host, action)) @serial @task def web_adm(): """ : Open Firefox to administrate (use -H option)""" if len(env.hosts) != 1: print('Only on one host. Use option -H to precise the host (%s)!' % ', '.join(env.roledefs['web_adm'])) sys.exit(1) if os.getenv('DISPLAY') == None: print('You must have a valid display !') sys.exit(1) if env.host not in env.roledefs['web_adm']: print("%s is not a valid host ! Valid host are %s" % (env.host, ', '.join(env.roledefs['web_adm']))) sys.exit(1) os.system('firefox https://%s/' % env.host) ===== readline ===== import readline, signal prompt = 'GigiX > ' def signal_handler(signal, frame): print input(prompt) return signal.signal(signal.SIGINT, signal_handler) readline.parse_and_bind("tab: complete") class completer: def __init__(self, tree): self.tree = tree def _parser(self, line, search): if (search is None) or (callable(search) == True): return [] elif len(line) == 0: return [x for x in search] elif len(line) == 1 and readline.get_line_buffer()[-1] != ' ': return ['%s ' % x for x in search if x.startswith(line[0])] else: return self._parser(line[1:], search[line[0]]) return [] def complete(self, text, state): line = readline.get_line_buffer().split() results = self._parser(line, self.tree) return results[state] def linux_sles_12(): print('Je suis Linux SLES 12') tree = { 'unix': { 'solaris': { '8':None, '9':None, '10':None, '11':None, } }, 'linux': { 'debian': { 'buzz':None, 'rex':None, 'bo':None, 'hamm':None, 'slink':None, 'patato':None, 'woody':None, 'sarge':None, 'etch':None, 'lenny':None, 'squeeze':None, 'wheezy':None, 'jessie':None, }, 'RHEL': { '3':None, '4':None, '5':None, '6':None, '7':None, }, 'SLES': { '9':None, '10':None, '11':None, '12':linux_sles_12, } }, 'windows': { 'xp':None, 'vista':None, 'seven':None, '8':None, '10':None, } } readline.set_completer(completer(tree).complete) l = '' while l != ('quit' or 'exit'): try: l = input(prompt) eval("tree['"+"']['".join(l.split())+"']()") except EOFError: print('Bye') break except: continue ===== Twisted ===== * Site : http://twistedmatrix.com * Doc : http://twistedmatrix.com/documents/current/api/moduleIndex.html * Exemples : http://twistedmatrix.com/documents/current/ Cette bibliothèque est orientée vers le réseau. Elle supporte de nombreux protocoles de communication réseau : TCP, UDP, SMTP, HTTP, PROXY, SSH... ===== Shutil===== * [[http://python.developpez.com/cours/docs.python.org/2.6/library/shutil.php|Copie/Suppression de fichiers/dossiers (récurssif) ]] ===== Psutils ===== * [[http://code.google.com/p/psutil/|Gestion des processus / mémoire / disque / réseau]] ===== Ctypes ===== Ce module permet de charger une librairie C et de faire appelles aux fonctions de la librairies. Aide : http://docs.python.org/release/2.7.2/library/ctypes.html Exemple : #!/usr/bin/env python import ctypes, ctypes.util libc=ctypes.cdll.LoadLibrary(ctypes.util.find_library('c')) libc.printf("uid = %d - gid = %d\n", libc.geteuid(), libc.getgid()) Si on veut faire appel à un syscall en langage C il faut connaître l'id. Ci dessous les id des systems call pour 32 et 64 bits : * [[http://www.acsu.buffalo.edu/~charngda/linux_syscalls_32bit.html|Syscalls 32 bits]] * [[http://www.acsu.buffalo.edu/~charngda/linux_syscalls_64bit.html|Syscalls 64 bits]] ===== ConfigParser ===== Permet de charger / sauvegarder une configuration * __**Sauvegarde**__ * ConfigParser.ConfigParser permet de déclarer notre objet * cp.add_section(section) permet de rajouter une section à notre structure * cp.set(section, option, value) permet de rajouter la clé option à la section existante section avec la valeur value * cp.write(fileobject) permet de sauvegarder la structure import ConfigParser config = ConfigParser.ConfigParser() config.add_section('Section String') config.set('Section String', 'str1', 'MonString') config.add_section('Section Integer') config.set('Section String', 'int1', 100) config.write(open('conf.cfg','w')) * __**Chargement**__ On peut utiliser 4 méthodes dépendantes du type de la clé: * cp.get( section, option) pour les clés de type string *cp.getint( section, option) pour les clés de type integer *cp.getfloat( section, option) pour les clés de type float * cp.getboolean( section, option) pour les clés de type boolean config.read('conf.cfg') monint = config.getint('Section Integer', 'int1') monstring = config.get('Section String', 'str1') ===== fcntl ===== Locker un fichier : #!/usr/bin/env python # -*- coding: utf-8 -*- import fcntl, time myfile = open('/tmp/gigix', 'r') try: fcntl.flock(myfile, fcntl.LOCK_EX|fcntl.LOCK_NB) except IOError: print("Can't immediately write-lock the file, waiting...") fcntl.flock(myfile, fcntl.LOCK_EX) finally: print('Lock acquire, sleep 30 secs...') time.sleep(30) fcntl.flock(myfile, fcntl.LOCK_UN) myfile.close() print('Bye...') * **LOCK_UN** : unlock * **LOCK_SH** : acquire a shared lock * **LOCK_EX** : acquire an exclusive lock ===== datetime / calendar / time ===== Modules qui permettent de manipuler les dates : * [[http://docs.python.org/2/library/datetime.html|datetime]] * [[http://docs.python.org/2/library/calendar.html#module-calendar|calendar]] * [[http://docs.python.org/2/library/time.html#module-time|time]] ===== ftplib ===== Module permettant la connexion à un serveur FTP : * [[http://docs.python.org/2/library/ftplib.html|Site Officiel]] * [[http://www.siteduzero.com/informatique/tutoriels/utiliser-le-module-ftp-de-python/toutes-les-fonctions-une-a-une|Autre Tuto]] ===== smtplib / email ===== Envoyer des emails en python : * [[http://docs.python.org/2/library/email-examples.html|Exemples]] ===== poplib / imaplib ===== Lire ses emails avec python : * [[http://docs.python.org/2/library/poplib.html|Protocol pop]] * [[http://docs.python.org/2/library/imaplib.html|Protocole imap]] ===== PAM ===== Fourni dans le paquage **python-pam** : /usr/share/doc/python-pam/examples/pamtest.py #!/usr/bin/env python import sys import PAM from getpass import getpass def pam_conv(auth, query_list, userData): resp = [] for i in range(len(query_list)): query, type = query_list[i] if type == PAM.PAM_PROMPT_ECHO_ON: val = raw_input(query) resp.append((val, 0)) elif type == PAM.PAM_PROMPT_ECHO_OFF: val = getpass(query) resp.append((val, 0)) elif type == PAM.PAM_PROMPT_ERROR_MSG or type == PAM.PAM_PROMPT_TEXT_INFO: print query resp.append(('', 0)) else: return None return resp service = 'passwd' if len(sys.argv) == 2: user = sys.argv[1] else: user = None auth = PAM.pam() auth.start(service) if user != None: auth.set_item(PAM.PAM_USER, user) auth.set_item(PAM.PAM_CONV, pam_conv) try: auth.authenticate() auth.acct_mgmt() except PAM.error, resp: print('Go away! (%s)' % resp) except: print('Internal error') else: print('Good to go!') ===== sqlalchemy ===== sqlalchemy est un ORM permet de se connecter aux BDD de manière objet plusieurs connecteurs (postgres, mysql, sqlite...[[http://docs.sqlalchemy.org/en/rel_0_8/core/engines.html|info connecteurs]]) : * sqlalchemy ([[http://docs.sqlalchemy.org/en/latest/orm/|Tuto Officiel]] / [[http://www.rmunn.com/sqlalchemy-tutorial/tutorial.html|Tuto 2]]) ===== itertools===== * Tuto : http://www.siteduzero.com/informatique/tutoriels/pratiques-avancees-et-meconnues-en-python/le-module-itertools * combinations : >>> list(itertools.combinations(['A','B','C','D'],3)) [('A', 'B', 'C'), ('A', 'B', 'D'), ('A', 'C', 'D'), ('B', 'C', 'D')] * imap, ifilter, ifilterfalse : >>> from itertools import imap, ifilter, ifilterfalse >>> pair = lambda n: n % 2 == 0 >>> carre = lambda n: n ** 2 >>> imap(carre, xrange(10)) # Renvoie un générateur >>> for elem in imap(carre, xrange(10)): print elem ... 0 1 4 9 16 25 36 49 64 81 >>> list(ifilter(pair, xrange(10))) [0, 2, 4, 6, 8] >>> list(ifilterfalse(pair, xrange(10))) [1, 3, 5, 7, 9] * chain : >>> from itertools import chain >>> list(chain(xrange(11), xrange(15, 21), xrange(30, 46))) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 16, 17, 18, 19, 20, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45] * takewhile, dropwhile : >>> from itertools import takewhile, dropwhile >>> list(takewhile(lambda i: i < 10, xrange(20))) # On prend tant que i < 10 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> list(dropwhile(lambda i: i < 10, xrange(20))) # On jette tant que i < 10 [10, 11, 12, 13, 14, 15, 16, 17, 18, 19] * groupby : >>> from itertools import groupby >>> liste = list('aaaaabbbcddddee') >>> for element, iterateur in groupby(liste): ... # element est l'élément répété une ou plusieurs fois ... # iterateur est un itérateur sur la répétition de cet élément ; on l'utilise particulièrement pour savoir combien de fois il est répété ... print "%s est répété %d fois" % (element, len(list(iterateur))) ... a est répété 5 fois b est répété 3 fois c est répété 1 fois d est répété 4 fois e est répété 2 fois * izip : >>> for i in itertools.izip([1, 2, 3], ['a', 'b', 'c']): ... i ... (1, 'a') (2, 'b') (3, 'c') >>> for i in itertools.izip(itertools.count(5), ['a', 'b', 'c']): i ... (5, 'a') (6, 'b') (7, 'c') * cycle : >>> for i in itertools.cycle(['a', 'b', 'c']):i 'a' 'b' 'c' 'a' 'b' 'c' 'a' 'b' 'c' 'a' 'b' 'c' ... ... ===== string ===== >>> string.ascii_letters 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' >>> string.digits '0123456789' >>> string.printable '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ \t\n\r\x0b\x0c' >>> string.punctuation '!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~' >>> string.whitespace '\t\n\x0b\x0c\r ' ===== zip ===== >>> zip(['a', 'b', 'c'], ['d', 'e', 'f']) [('a', 'd'), ('b', 'e'), ('c', 'f')] ===== operator ===== Trie par rapport a un index : >>> import operator >>> d = dict(a=1, b=2, c=1, d=2, e=1, f=2, g=3) >>> sorted(d.iteritems(), key=operator.itemgetter(1)) [('a', 1), ('c', 1), ('e', 1), ('b', 2), ('d', 2), ('f', 2), ('g', 3)] ===== dbus ===== Dbus permet d’interagir avec les autres programmes identifiés par DBUS comme knotify, le service de notifications KDE. Pour voir la liste des services DBUS, lancer la commande qdbus : gigi@debian:~$ qdbus| grep knotify org.kde.knotify #!/usr/bin/env python import sys, dbus knotify = dbus.SessionBus().get_object("org.kde.knotify", "/Notify") try: title, text = sys.argv[1:3] except: print 'Usage: knotify.py title text' sys.exit(1) knotify.event("warning", "kde", [], title, text, [], [], 0, 0,dbus_interface="org.kde.KNotify")