Table des matières
Modules
Chemin des modules
- Dans l'environnement SHELL:
export PYTHONPATH=/home/user/python/lib/mylib
- Dans le code python :
#!/usr/bin/python import os, sys sys.path.append(os.path.join(os.path.dirname(__file__), "lib")) #sys.path.insert(0, os.path.join(os.path.dirname(__file__), "lib"))
optparse
Doc officielle : http://docs.python.org/2/library/optparse.html
pynotify
http://pyinotify.sourceforge.net/
- notify.py
#!/usr/bin/env python import os import pyinotify wm = pyinotify.WatchManager() mask = pyinotify.IN_DELETE | pyinotify.IN_CREATE class PTmp(pyinotify.ProcessEvent): def process_IN_CREATE(self, event): print "Create: %s " % os.path.join(event.path, event.name) def process_IN_DELETE(self, event): print "Delete: %s " % os.path.join(event.path, event.name) notifier = pyinotify.Notifier(wm, PTmp()) wdd = wm.add_watch('/home/gigi', mask, rec=True) while True: try: notifier.process_events() if notifier.check_events(): notifier.read_events() except KeyboardInterrupt: notifier.stop() break
- tail -f avec inotify :
- tail.py
#!/usr/bin/env python ### Use inotify to tail logfile (argv[1]), mimicking tail -F (follow if rotated). ### ### Usage: ./tail-F_inotify.py /path/to/file ### ### @author: "Charlie Schluting" <charlie@schluting.com> ### Copyright: the only free-as-in-freedom one: MIT license ### import sys, os, pyinotify from optparse import OptionParser parser = OptionParser() parser.add_option("--debug", help="print debug messages", action="store_true", dest="debug") (options, args) = parser.parse_args() myfile = args[0] if options.debug: print "I am totally opening " + myfile wm = pyinotify.WatchManager() # watched events on the directory, and parse $path for file_of_interest: dirmask = pyinotify.IN_MODIFY | pyinotify.IN_DELETE | pyinotify.IN_MOVE_SELF | pyinotify.IN_CREATE # open file, skip to end.. global fh fh = open(myfile, 'r') fh.seek(0,2) # the event handlers: class PTmp(pyinotify.ProcessEvent): def process_IN_MODIFY(self, event): if myfile not in os.path.join(event.path, event.name): return else: print fh.readline().rstrip() def process_IN_MOVE_SELF(self, event): if options.debug: print "The file moved! Continuing to read from that, until a new one is created.." def process_IN_CREATE(self, event): if myfile in os.path.join(event.path, event.name): # yay, I exist, umm.. again! global fh fh.close fh = open(myfile, 'r') # catch up, in case lines were written during the time we were re-opening: if options.debug: print "My file was created! I'm now catching up with lines in the newly created file." for line in fh.readlines(): print line.rstrip() # then skip to the end, and wait for more IN_MODIFY events fh.seek(0,2) return notifier = pyinotify.Notifier(wm, PTmp()) # watch the directory, so we can get IN_CREATE events and re-open the file when logrotate comes along. # if you just watch the file, pyinotify errors when it moves, saying "can't track, can't trust it.. watch # the directory". index = myfile.rfind('/') wm.add_watch(myfile[:index], dirmask) while True: try: notifier.process_events() if notifier.check_events(): notifier.read_events() except KeyboardInterrupt: break # cleanup: stop the inotify, and close the file handle: notifier.stop() fh.close() sys.exit(0)
Scapy
- Tuto Scapy 4 (le plus complet)
- scapy.py
from scapy.all import IP,TCP,rdpcap from pylab import * #Import the Capture File a = rdpcap('trace.pcap') #Filter the Capture File b = [ pkt for pkt in a if IP in pkt and (pkt[IP].src == '10.7.0.12' or pkt[IP].dst == '10.7.0.12') ] #Create an array of TCP Window sizes from the capture wins = [ int(win[TCP].window) for win in b if TCP in win ] #Create the Histogram hist(wins, bins=100) #Display it ylabel('Frequency') xlabel('TCP Window Size') show()
Création d'une requête DNS :
>>> p = sr1(IP(dst="8.8.8.8")/UDP(sport=RandShort(),dport=53)/DNS(rd=1,qd=DNSQR(qname="velannes.com", qtype="A")), timeout=0.5)
Ou avec l'entête ethernet (noter que le nom de la fonction change) :
>>> p = srp1(Ether()/IP(dst="8.8.8.8")/UDP(sport=RandShort(),dport=53)/DNS(rd=1,qd=DNSQR(qname="velannes.com", qtype="A"))) Begin emission: .Finished to send 1 packets. * Received 2 packets, got 1 answers, remaining 0 packets >>> p <Ether dst=00:21:5d:5f:05:f8 src=00:12:17:16:0d:e7 type=0x800 |<IP version=4L ihl=5L tos=0x0 len=74 id=20959 flags= frag=0L ttl=47 proto=udp chksum=0x67a4 src=8.8.8.8 dst=192.168.1.104 options=[] |<UDP sport=domain dport=33487 len=54 chksum=0x4c65 |<DNS id=0 qr=1L opcode=QUERY aa=0L tc=0L rd=1L ra=1L z=0L rcode=ok qdcount=1 ancount=1 nscount=0 arcount=0 qd=<DNSQR qname='velannes.com.' qtype=A qclass=IN |> an=<DNSRR rrname='velannes.com.' type=A rclass=IN ttl=25286 rdata='94.23.218.89' |> ns=None ar=None |>>>>
>>> p.summary() 'IP / UDP / DNS Ans "94.23.218.89" '
Fabric
- Le fichier doit s'appeler fabfile.py.
- fab -l pour voir les fonctions disponibles.
- fabric.py
#!/usr/bin/env python import sys, os, getpass #import sys, os, getpass, multiprocessing try: from fabric.api import * from fabric.colors import * except: print('Fabric module must be installed !') sys.exit(1) sys.tracebacklimit = 0 output['running'] = False output['stdout'] = False output['aborts'] = False output['status'] = False output['warnings'] = False #env.user = 'root' #env.password = None #env.passwords = { # 'user@host':'pass', # 'gwuser@gwhost':'gwpass' #} #env.shell = "/bin/sh -c" #env.gateway = None env.disable_known_hosts = True #env.parralel = True #env.pool_size = multiprocessing.cpu_count() #Sets the number of concurrent processes to use when executing tasks in parallel. #env.keepalive = 300 #env.timeout = 10 #Network ssh connection timeout, in seconds. #env.command_timeout = 60 env.linewise = True env.warn_only = True env.role_default = 'adm' env.roledefs = { 'eth': ['molx1321', 'molx1322', 'molx1323', 'molx1324', 'molx1325', 'molx1326', 'molx1327', 'molx1328', 'molx1329', 'molx1330', 'molx1331', 'molx1332', 'molx1333', 'molx1334', 'molx1335', 'molx1336' ], 'adm' : ['molx1321-adm', 'molx1322-adm', 'molx1323-adm', 'molx1324-adm', 'molx1325-adm', 'molx1326-adm', 'molx1327-adm', 'molx1328-adm', 'molx1329-adm', 'molx1330-adm', 'molx1331-adm', 'molx1332-adm', 'molx1333-adm', 'molx1334-adm', 'molx1335-adm', 'molx1336-adm' ], 'ib' : ['molx1321-ib', 'molx1322-ib', 'molx1323-ib', 'molx1324-ib', 'molx1325-ib', 'molx1326-ib', 'molx1327-ib', 'molx1328-ib', 'molx1329-ib', 'molx1330-ib', 'molx1331-ib', 'molx1332-ib', 'molx1333-ib', 'molx1334-ib', 'molx1335-ib', 'molx1336-ib' ], 'imm' : ['molx1321-imm', 'molx1322-imm', 'molx1323-imm', 'molx1324-imm', 'molx1325-imm', 'molx1326-imm', 'molx1327-imm', 'molx1328-imm', 'molx1329-imm', 'molx1330-imm', 'molx1331-imm', 'molx1332-imm', 'molx1333-imm', 'molx1334-imm', 'molx1335-imm', 'molx1336-imm' ], 'web_adm' : ['sw-eth','sw-ib'], } env.roledefs['web_adm'] += env.roledefs['imm'] if env.hosts == [] and env.roles == []: env.hosts = env.roledefs[env.role_default] #list_all = [] #for server in env.roledefs.values(): # list_all += server #env.roledefs['all'] = list(set(list_all)) def VERIF_IPMI_PASSWORD(): if 'IPMI_PASSWORD' in os.environ: globals()['IPMI_PASSWORD'] = os.environ['IPMI_PASSWORD'] if 'IPMI_PASSWORD' not in globals(): IPMI_PASSWORD = getpass.getpass(prompt='IMM password : ') globals()['IPMI_PASSWORD'] = IPMI_PASSWORD @task @serial def lr(): """ : Liste les roles utilisables avec l'option '-R'""" for role in env.roledefs: if role == env.role_default: print("%s (default) : %s" % (role, ' ,'.join(env.roledefs[role]))) else: print("%s : %s" % (role, ' ,'.join(env.roledefs[role]))) sys.exit() #@parallel(pool_size = multiprocessing.cpu_count() + 1) @task def cmd(cmd, show = None): """ : Run command on a host (show='details' for details, show='status' for see if command is successful or not)""" #with show('stdout','running','warnings'): try: out = run(cmd) except Exception as err: sys.stdout.write(yellow("%(host)s (%(user)s) : " % env, bold = True)) print(red(err)) return if show == None: for line in out.splitlines(): sys.stdout.write(yellow("%(host)s (%(user)s) : " % env, bold = True)) print(line) if out.failed: sys.stdout.write(yellow("%(host)s (%(user)s) : " % env, bold = True)) print(red(' KO', bold = True)) + " (return %s)" % out.return_code elif show == 'details' or show == 'status': sys.stdout.write(yellow("Executing on %(host)s as %(user)s : " % env, bold = True) + blue(cmd, bold = True)) if out.failed: print(red(' KO', bold = True)) else: print(green(' OK', bold = True)) if show == 'details': print(yellow('OUT : (return code = %s)' % out.return_code)) print('%s\n' % out) else: print("Option %s unknow" % show) sys.exit(1) @task def send_file(src, dst = None, mode = None): """ : Send file on a host (dst=<destination>, mode=<mode>)""" if dst == None: dst = src if mode == None: put(src, dst) else: try: int(mode) except: print("'mode' value must be an integer !") sys.exit(1) put(src, dst, mode=mode) @task def reboot(wait = 0): """ : Reboot a host (wait=<secondes>)""" if os.getlogin() != 'root': print('Only root must be reboot a host') sys.exit(1) try: int(wait) except: print("'wait' value must be an integer !") sys.exit(1) reboot(wait = wait) @serial @task def power(state): """ : IMM power manager (state=<status|on|off|cycle|reset|diag|soft>)""" if not env.host in env.roledefs['imm']: print('Use "-R imm" or "-H <host>-imm"') print('%s not in this table : %s' % (env.host, ','.join(env.roledefs['imm']))) sys.exit(1) if state not in ['status','on','off','cycle','reset','diag','soft']: print("%s state is not supported. Please use state <status|on|off|cycle|reset|diag|soft> !" % state) sys.exit(1) VERIF_IPMI_PASSWORD() out = local("ipmitool -I lanplus -C 2 -U system -P '%s' -E -H %s chassis power %s" % (globals()['IPMI_PASSWORD'], env.host, state), capture=True) if out.failed: sys.stdout.write(yellow("%(host)s : " % env, bold = True)) print(red('Failed ! (return code : %s)' % out.return_code)) print(out.stderr) sys.exit(1) else: for line in out.splitlines(): sys.stdout.write(yellow("%(host)s : " % env, bold = True)) print(line) @serial @task def console(action='activate'): """ : IMM SOL (action=<activate|deactivate>)""" if len(env.hosts) != 1: print('Only on one host. Use option -H to precise the host !') sys.exit(1) if not env.host in env.roledefs['imm']: print('Use "-H <host>-imm"') print('%s not in this table : %s' % (env.host, ','.join(env.roledefs['imm']))) sys.exit(1) VERIF_IPMI_PASSWORD() os.system("ipmitool -I lanplus -C 2 -U system -P '%s' -E -H %s sol %s" % (globals()['IPMI_PASSWORD'], env.host, action)) @serial @task def web_adm(): """ : Open Firefox to administrate (use -H option)""" if len(env.hosts) != 1: print('Only on one host. Use option -H to precise the host (%s)!' % ', '.join(env.roledefs['web_adm'])) sys.exit(1) if os.getenv('DISPLAY') == None: print('You must have a valid display !') sys.exit(1) if env.host not in env.roledefs['web_adm']: print("%s is not a valid host ! Valid host are %s" % (env.host, ', '.join(env.roledefs['web_adm']))) sys.exit(1) os.system('firefox https://%s/' % env.host)
readline
- readline.py
import readline, signal prompt = 'GigiX > ' def signal_handler(signal, frame): print input(prompt) return signal.signal(signal.SIGINT, signal_handler) readline.parse_and_bind("tab: complete") class completer: def __init__(self, tree): self.tree = tree def _parser(self, line, search): if (search is None) or (callable(search) == True): return [] elif len(line) == 0: return [x for x in search] elif len(line) == 1 and readline.get_line_buffer()[-1] != ' ': return ['%s ' % x for x in search if x.startswith(line[0])] else: return self._parser(line[1:], search[line[0]]) return [] def complete(self, text, state): line = readline.get_line_buffer().split() results = self._parser(line, self.tree) return results[state] def linux_sles_12(): print('Je suis Linux SLES 12') tree = { 'unix': { 'solaris': { '8':None, '9':None, '10':None, '11':None, } }, 'linux': { 'debian': { 'buzz':None, 'rex':None, 'bo':None, 'hamm':None, 'slink':None, 'patato':None, 'woody':None, 'sarge':None, 'etch':None, 'lenny':None, 'squeeze':None, 'wheezy':None, 'jessie':None, }, 'RHEL': { '3':None, '4':None, '5':None, '6':None, '7':None, }, 'SLES': { '9':None, '10':None, '11':None, '12':linux_sles_12, } }, 'windows': { 'xp':None, 'vista':None, 'seven':None, '8':None, '10':None, } } readline.set_completer(completer(tree).complete) l = '' while l != ('quit' or 'exit'): try: l = input(prompt) eval("tree['"+"']['".join(l.split())+"']()") except EOFError: print('Bye') break except: continue
Twisted
- Site : http://twistedmatrix.com
- Exemples : http://twistedmatrix.com/documents/current/
Cette bibliothèque est orientée vers le réseau. Elle supporte de nombreux protocoles de communication réseau : TCP, UDP, SMTP, HTTP, PROXY, SSH…
Shutil
Psutils
Ctypes
Ce module permet de charger une librairie C et de faire appelles aux fonctions de la librairies.
Aide : http://docs.python.org/release/2.7.2/library/ctypes.html
Exemple :
- ctypes.py
#!/usr/bin/env python import ctypes, ctypes.util libc=ctypes.cdll.LoadLibrary(ctypes.util.find_library('c')) libc.printf("uid = %d - gid = %d\n", libc.geteuid(), libc.getgid())
Si on veut faire appel à un syscall en langage C il faut connaître l'id. Ci dessous les id des systems call pour 32 et 64 bits :
ConfigParser
Permet de charger / sauvegarder une configuration
- Sauvegarde
- ConfigParser.ConfigParser permet de déclarer notre objet
- cp.add_section(section) permet de rajouter une section à notre structure
- cp.set(section, option, value) permet de rajouter la clé option à la section existante section avec la valeur value
- cp.write(fileobject) permet de sauvegarder la structure
- ConfigParser.py
import ConfigParser config = ConfigParser.ConfigParser() config.add_section('Section String') config.set('Section String', 'str1', 'MonString') config.add_section('Section Integer') config.set('Section String', 'int1', 100) config.write(open('conf.cfg','w'))
- Chargement
On peut utiliser 4 méthodes dépendantes du type de la clé:
- cp.get( section, option) pour les clés de type string
- cp.getint( section, option) pour les clés de type integer
- cp.getfloat( section, option) pour les clés de type float
- cp.getboolean( section, option) pour les clés de type boolean
config.read('conf.cfg') monint = config.getint('Section Integer', 'int1') monstring = config.get('Section String', 'str1')
fcntl
Locker un fichier :
- fcntl.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import fcntl, time myfile = open('/tmp/gigix', 'r') try: fcntl.flock(myfile, fcntl.LOCK_EX|fcntl.LOCK_NB) except IOError: print("Can't immediately write-lock the file, waiting...") fcntl.flock(myfile, fcntl.LOCK_EX) finally: print('Lock acquire, sleep 30 secs...') time.sleep(30) fcntl.flock(myfile, fcntl.LOCK_UN) myfile.close() print('Bye...')
- LOCK_UN : unlock
- LOCK_SH : acquire a shared lock
- LOCK_EX : acquire an exclusive lock
datetime / calendar / time
ftplib
Module permettant la connexion à un serveur FTP :
smtplib / email
Envoyer des emails en python :
poplib / imaplib
Lire ses emails avec python :
PAM
Fourni dans le paquage python-pam : /usr/share/doc/python-pam/examples/pamtest.py
- pam.py
#!/usr/bin/env python import sys import PAM from getpass import getpass def pam_conv(auth, query_list, userData): resp = [] for i in range(len(query_list)): query, type = query_list[i] if type == PAM.PAM_PROMPT_ECHO_ON: val = raw_input(query) resp.append((val, 0)) elif type == PAM.PAM_PROMPT_ECHO_OFF: val = getpass(query) resp.append((val, 0)) elif type == PAM.PAM_PROMPT_ERROR_MSG or type == PAM.PAM_PROMPT_TEXT_INFO: print query resp.append(('', 0)) else: return None return resp service = 'passwd' if len(sys.argv) == 2: user = sys.argv[1] else: user = None auth = PAM.pam() auth.start(service) if user != None: auth.set_item(PAM.PAM_USER, user) auth.set_item(PAM.PAM_CONV, pam_conv) try: auth.authenticate() auth.acct_mgmt() except PAM.error, resp: print('Go away! (%s)' % resp) except: print('Internal error') else: print('Good to go!')
sqlalchemy
sqlalchemy est un ORM permet de se connecter aux BDD de manière objet plusieurs connecteurs (postgres, mysql, sqlite…info connecteurs) :
- sqlalchemy (Tuto Officiel / Tuto 2)
itertools
- combinations :
>>> list(itertools.combinations(['A','B','C','D'],3)) [('A', 'B', 'C'), ('A', 'B', 'D'), ('A', 'C', 'D'), ('B', 'C', 'D')]
- imap, ifilter, ifilterfalse :
>>> from itertools import imap, ifilter, ifilterfalse >>> pair = lambda n: n % 2 == 0 >>> carre = lambda n: n ** 2 >>> imap(carre, xrange(10)) # Renvoie un générateur <itertools.imap object at 0x7f4ab174fc10> >>> for elem in imap(carre, xrange(10)): print elem ... 0 1 4 9 16 25 36 49 64 81 >>> list(ifilter(pair, xrange(10))) [0, 2, 4, 6, 8] >>> list(ifilterfalse(pair, xrange(10))) [1, 3, 5, 7, 9]
- chain :
>>> from itertools import chain >>> list(chain(xrange(11), xrange(15, 21), xrange(30, 46))) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 16, 17, 18, 19, 20, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45]
- takewhile, dropwhile :
>>> from itertools import takewhile, dropwhile >>> list(takewhile(lambda i: i < 10, xrange(20))) # On prend tant que i < 10 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> list(dropwhile(lambda i: i < 10, xrange(20))) # On jette tant que i < 10 [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
- groupby :
>>> from itertools import groupby >>> liste = list('aaaaabbbcddddee') >>> for element, iterateur in groupby(liste): ... # element est l'élément répété une ou plusieurs fois ... # iterateur est un itérateur sur la répétition de cet élément ; on l'utilise particulièrement pour savoir combien de fois il est répété ... print "%s est répété %d fois" % (element, len(list(iterateur))) ... a est répété 5 fois b est répété 3 fois c est répété 1 fois d est répété 4 fois e est répété 2 fois
- izip :
>>> for i in itertools.izip([1, 2, 3], ['a', 'b', 'c']): ... i ... (1, 'a') (2, 'b') (3, 'c')
>>> for i in itertools.izip(itertools.count(5), ['a', 'b', 'c']): i ... (5, 'a') (6, 'b') (7, 'c')
- cycle :
>>> for i in itertools.cycle(['a', 'b', 'c']):i 'a' 'b' 'c' 'a' 'b' 'c' 'a' 'b' 'c' 'a' 'b' 'c' ... ...
string
>>> string.ascii_letters 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' >>> string.digits '0123456789' >>> string.printable '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ \t\n\r\x0b\x0c' >>> string.punctuation '!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~' >>> string.whitespace '\t\n\x0b\x0c\r '
zip
>>> zip(['a', 'b', 'c'], ['d', 'e', 'f']) [('a', 'd'), ('b', 'e'), ('c', 'f')]
operator
Trie par rapport a un index :
>>> import operator >>> d = dict(a=1, b=2, c=1, d=2, e=1, f=2, g=3) >>> sorted(d.iteritems(), key=operator.itemgetter(1)) [('a', 1), ('c', 1), ('e', 1), ('b', 2), ('d', 2), ('f', 2), ('g', 3)]
dbus
Dbus permet d’interagir avec les autres programmes identifiés par DBUS comme knotify, le service de notifications KDE.
Pour voir la liste des services DBUS, lancer la commande qdbus :
gigi@debian:~$ qdbus| grep knotify org.kde.knotify
- knotify.py
#!/usr/bin/env python import sys, dbus knotify = dbus.SessionBus().get_object("org.kde.knotify", "/Notify") try: title, text = sys.argv[1:3] except: print 'Usage: knotify.py title text' sys.exit(1) knotify.event("warning", "kde", [], title, text, [], [], 0, 0,dbus_interface="org.kde.KNotify")