Ceci est une ancienne révision du document !
Table des matières
Modules
optparse
Doc officielle : http://docs.python.org/2/library/optparse.html
pynotify
http://pyinotify.sourceforge.net/
- notify.py
#!/usr/bin/env python import os import pyinotify wm = pyinotify.WatchManager() mask = pyinotify.IN_DELETE | pyinotify.IN_CREATE class PTmp(pyinotify.ProcessEvent): def process_IN_CREATE(self, event): print "Create: %s " % os.path.join(event.path, event.name) def process_IN_DELETE(self, event): print "Delete: %s " % os.path.join(event.path, event.name) notifier = pyinotify.Notifier(wm, PTmp()) wdd = wm.add_watch('/home/gigi', mask, rec=True) while True: try: notifier.process_events() if notifier.check_events(): notifier.read_events() except KeyboardInterrupt: notifier.stop() break
- tail -f avec inotify :
- tail.py
#!/usr/bin/env python ### Use inotify to tail logfile (argv[1]), mimicking tail -F (follow if rotated). ### ### Usage: ./tail-F_inotify.py /path/to/file ### ### @author: "Charlie Schluting" <charlie@schluting.com> ### Copyright: the only free-as-in-freedom one: MIT license ### import sys, os, pyinotify from optparse import OptionParser parser = OptionParser() parser.add_option("--debug", help="print debug messages", action="store_true", dest="debug") (options, args) = parser.parse_args() myfile = args[0] if options.debug: print "I am totally opening " + myfile wm = pyinotify.WatchManager() # watched events on the directory, and parse $path for file_of_interest: dirmask = pyinotify.IN_MODIFY | pyinotify.IN_DELETE | pyinotify.IN_MOVE_SELF | pyinotify.IN_CREATE # open file, skip to end.. global fh fh = open(myfile, 'r') fh.seek(0,2) # the event handlers: class PTmp(pyinotify.ProcessEvent): def process_IN_MODIFY(self, event): if myfile not in os.path.join(event.path, event.name): return else: print fh.readline().rstrip() def process_IN_MOVE_SELF(self, event): if options.debug: print "The file moved! Continuing to read from that, until a new one is created.." def process_IN_CREATE(self, event): if myfile in os.path.join(event.path, event.name): # yay, I exist, umm.. again! global fh fh.close fh = open(myfile, 'r') # catch up, in case lines were written during the time we were re-opening: if options.debug: print "My file was created! I'm now catching up with lines in the newly created file." for line in fh.readlines(): print line.rstrip() # then skip to the end, and wait for more IN_MODIFY events fh.seek(0,2) return notifier = pyinotify.Notifier(wm, PTmp()) # watch the directory, so we can get IN_CREATE events and re-open the file when logrotate comes along. # if you just watch the file, pyinotify errors when it moves, saying "can't track, can't trust it.. watch # the directory". index = myfile.rfind('/') wm.add_watch(myfile[:index], dirmask) while True: try: notifier.process_events() if notifier.check_events(): notifier.read_events() except KeyboardInterrupt: break # cleanup: stop the inotify, and close the file handle: notifier.stop() fh.close() sys.exit(0)
Scapy
- Tuto Scapy 4 (le plus complet)
- scapy.py
from scapy.all import IP,TCP,rdpcap from pylab import * #Import the Capture File a = rdpcap('trace.pcap') #Filter the Capture File b = [ pkt for pkt in a if IP in pkt and (pkt[IP].src == '10.7.0.12' or pkt[IP].dst == '10.7.0.12') ] #Create an array of TCP Window sizes from the capture wins = [ int(win[TCP].window) for win in b if TCP in win ] #Create the Histogram hist(wins, bins=100) #Display it ylabel('Frequency') xlabel('TCP Window Size') show()
Création d'une requête DNS :
>>> p = sr1(IP(dst="8.8.8.8")/UDP(sport=RandShort(),dport=53)/DNS(rd=1,qd=DNSQR(qname="velannes.com", qtype="A")), timeout=0.5)
Ou avec l'entête ethernet (noter que le nom de la fonction change) :
>>> p = srp1(Ether()/IP(dst="8.8.8.8")/UDP(sport=RandShort(),dport=53)/DNS(rd=1,qd=DNSQR(qname="velannes.com", qtype="A"))) Begin emission: .Finished to send 1 packets. * Received 2 packets, got 1 answers, remaining 0 packets >>> p <Ether dst=00:21:5d:5f:05:f8 src=00:12:17:16:0d:e7 type=0x800 |<IP version=4L ihl=5L tos=0x0 len=74 id=20959 flags= frag=0L ttl=47 proto=udp chksum=0x67a4 src=8.8.8.8 dst=192.168.1.104 options=[] |<UDP sport=domain dport=33487 len=54 chksum=0x4c65 |<DNS id=0 qr=1L opcode=QUERY aa=0L tc=0L rd=1L ra=1L z=0L rcode=ok qdcount=1 ancount=1 nscount=0 arcount=0 qd=<DNSQR qname='velannes.com.' qtype=A qclass=IN |> an=<DNSRR rrname='velannes.com.' type=A rclass=IN ttl=25286 rdata='94.23.218.89' |> ns=None ar=None |>>>>
>>> p.summary() 'IP / UDP / DNS Ans "94.23.218.89" '
Fabric
- Le fichier doit s'appeler fabfile.py.
- fab -l pour voir les fonctions disponibles.
- fabric.py
#!/usr/bin/env python from fabric.api import * from fabric.colors import * env.hosts = ['host1', 'toto@host2'] env.user = 'root' env.password = 'xxx' #env.gateway = 'mygateway', #env.passwords = { 'user@host':'pass', 'gwuser@gwhost':'gwpass' } #env.shell = "/bin/sh -c" #env.keepalive = 300 #env.roledefs = { 'web': ['www1', 'www2', 'www3'], 'dns': ['ns1', 'ns2'] } env.disable_known_hosts = True env.parralel = True env.pool_size = 0 #Sets the number of concurrent processes to use when executing tasks in parallel. env.timeout = 10 #Network ssh connection timeout, in seconds. env.command_timeout = 10 #env.warn_only = True hide('warnings', 'running', 'stdout', 'stderr') def remote_info(): with hide('running', 'stdout'): print(yellow("Executing on %(host)s as %(user)s" % env, bold=True)) a = run('uname -a') fastprint(a, show_prefix=True) def local_info(): local('uname -a')
Twisted
- Site : http://twistedmatrix.com
- Exemples : http://twistedmatrix.com/documents/current/
Cette bibliothèque est orientée vers le réseau. Elle supporte de nombreux protocoles de communication réseau : TCP, UDP, SMTP, HTTP, PROXY, SSH…
Shutil
Psutils
Ctypes
Ce module permet de charger une librairie C et de faire appelles aux fonctions de la librairies.
Aide : http://docs.python.org/release/2.7.2/library/ctypes.html
Exemple :
- ctypes.py
#!/usr/bin/env python import ctypes, ctypes.util ctypes.cdll.LoadLibrary(ctypes.util.find_library('c')) libc.printf("uid = %d - gid = %d\n", libc.geteuid(), libc.getgid())
Si on veut faire appel à un syscall en langage C il faut connaître l'id. Ci dessous les id des systems call pour 32 et 64 bits :
ConfigParser
Permet de charger / sauvegarder une configuration
- Sauvegarde
- ConfigParser.ConfigParser permet de déclarer notre objet
- cp.add_section(section) permet de rajouter une section à notre structure
- cp.set(section, option, value) permet de rajouter la clé option à la section existante section avec la valeur value
- cp.write(fileobject) permet de sauvegarder la structure
- ConfigParser.py
import ConfigParser config = ConfigParser.ConfigParser() config.add_section('Section String') config.set('Section String', 'str1', 'MonString') config.add_section('Section Integer') config.set('Section String', 'int1', 100) config.write(open('conf.cfg','w'))
- Chargement
On peut utiliser 4 méthodes dépendantes du type de la clé:
- cp.get( section, option) pour les clés de type string
- cp.getint( section, option) pour les clés de type integer
- cp.getfloat( section, option) pour les clés de type float
- cp.getboolean( section, option) pour les clés de type boolean
config.read('conf.cfg') monint = config.getint('Section Integer', 'int1') monstring = config.get('Section String', 'str1')
fcntl
Locker un fichier :
- fcntl.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import fcntl, time myfile = open('/tmp/gigix', 'r') try: fcntl.flock(myfile, fcntl.LOCK_EX|fcntl.LOCK_NB) except IOError: print("Can't immediately write-lock the file, waiting...") fcntl.flock(myfile, fcntl.LOCK_EX) finally: print('Lock acquire, sleep 30 secs...') time.sleep(30) fcntl.flock(myfile, fcntl.LOCK_UN) myfile.close() print('Bye...')
- LOCK_UN : unlock
- LOCK_SH : acquire a shared lock
- LOCK_EX : acquire an exclusive lock
datetime / calendar / time
ftplib
Module permettant la connexion à un serveur FTP :
smtplib / email
Envoyer des emails en python :
poplib / imaplib
Lire ses emails avec python :
PAM
Fourni dans le paquage python-pam : /usr/share/doc/python-pam/examples/pamtest.py
- pam.py
#!/usr/bin/env python import sys import PAM from getpass import getpass def pam_conv(auth, query_list, userData): resp = [] for i in range(len(query_list)): query, type = query_list[i] if type == PAM.PAM_PROMPT_ECHO_ON: val = raw_input(query) resp.append((val, 0)) elif type == PAM.PAM_PROMPT_ECHO_OFF: val = getpass(query) resp.append((val, 0)) elif type == PAM.PAM_PROMPT_ERROR_MSG or type == PAM.PAM_PROMPT_TEXT_INFO: print query resp.append(('', 0)) else: return None return resp service = 'passwd' if len(sys.argv) == 2: user = sys.argv[1] else: user = None auth = PAM.pam() auth.start(service) if user != None: auth.set_item(PAM.PAM_USER, user) auth.set_item(PAM.PAM_CONV, pam_conv) try: auth.authenticate() auth.acct_mgmt() except PAM.error, resp: print('Go away! (%s)' % resp) except: print('Internal error') else: print('Good to go!')
sqlalchemy
sqlalchemy est un ORM permet de se connecter aux BDD de manière objet plusieurs connecteurs (postgres, mysql, sqlite…info connecteurs) :
- sqlalchemy (Tuto Officiel / Tuto 2)
itertools
- combinations :
>>> list(itertools.combinations(['A','B','C','D'],3)) [('A', 'B', 'C'), ('A', 'B', 'D'), ('A', 'C', 'D'), ('B', 'C', 'D')]
- imap, ifilter, ifilterfalse :
>>> from itertools import imap, ifilter, ifilterfalse >>> pair = lambda n: n % 2 == 0 >>> carre = lambda n: n ** 2 >>> imap(carre, xrange(10)) # Renvoie un générateur <itertools.imap object at 0x7f4ab174fc10> >>> for elem in imap(carre, xrange(10)): print elem ... 0 1 4 9 16 25 36 49 64 81 >>> list(ifilter(pair, xrange(10))) [0, 2, 4, 6, 8] >>> list(ifilterfalse(pair, xrange(10))) [1, 3, 5, 7, 9]
- chain :
>>> from itertools import chain >>> list(chain(xrange(11), xrange(15, 21), xrange(30, 46))) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 16, 17, 18, 19, 20, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45]
- takewhile, dropwhile :
>>> from itertools import takewhile, dropwhile >>> list(takewhile(lambda i: i < 10, xrange(20))) # On prend tant que i < 10 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> list(dropwhile(lambda i: i < 10, xrange(20))) # On jette tant que i < 10 [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
- groupby :
>>> from itertools import groupby >>> liste = list('aaaaabbbcddddee') >>> for element, iterateur in groupby(liste): ... # element est l'élément répété une ou plusieurs fois ... # iterateur est un itérateur sur la répétition de cet élément ; on l'utilise particulièrement pour savoir combien de fois il est répété ... print "%s est répété %d fois" % (element, len(list(iterateur))) ... a est répété 5 fois b est répété 3 fois c est répété 1 fois d est répété 4 fois e est répété 2 fois
- izip :
>>> for i in itertools.izip([1, 2, 3], ['a', 'b', 'c']): ... i ... (1, 'a') (2, 'b') (3, 'c')
>>> for i in itertools.izip(itertools.count(5), ['a', 'b', 'c']): i ... (5, 'a') (6, 'b') (7, 'c')
- cycle :
>>> for i in itertools.cycle(['a', 'b', 'c']):i 'a' 'b' 'c' 'a' 'b' 'c' 'a' 'b' 'c' 'a' 'b' 'c' ... ...
string
>>> string.ascii_letters 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' >>> string.digits '0123456789' >>> string.printable '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ \t\n\r\x0b\x0c' >>> string.punctuation '!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~' >>> string.whitespace '\t\n\x0b\x0c\r '
zip
>>> zip(['a', 'b', 'c'], ['d', 'e', 'f']) [('a', 'd'), ('b', 'e'), ('c', 'f')]