====== Modules ======
Liste des modules inclus :
* [[http://docs.python.org/2/library/|Python 2]]
* [[http://docs.python.org/3/library/|Python 3]]
===== Chemin des modules =====
* Dans l'environnement SHELL:
export PYTHONPATH=/home/user/python/lib/mylib
* Dans le code python :
#!/usr/bin/python
import os, sys
sys.path.append(os.path.join(os.path.dirname(__file__), "lib"))
#sys.path.insert(0, os.path.join(os.path.dirname(__file__), "lib"))
===== optparse =====
Doc officielle : http://docs.python.org/2/library/optparse.html
===== pynotify =====
http://pyinotify.sourceforge.net/
#!/usr/bin/env python
import os
import pyinotify
wm = pyinotify.WatchManager()
mask = pyinotify.IN_DELETE | pyinotify.IN_CREATE
class PTmp(pyinotify.ProcessEvent):
def process_IN_CREATE(self, event):
print "Create: %s " % os.path.join(event.path, event.name)
def process_IN_DELETE(self, event):
print "Delete: %s " % os.path.join(event.path, event.name)
notifier = pyinotify.Notifier(wm, PTmp())
wdd = wm.add_watch('/home/gigi', mask, rec=True)
while True:
try:
notifier.process_events()
if notifier.check_events():
notifier.read_events()
except KeyboardInterrupt:
notifier.stop()
break
* tail -f avec inotify :
#!/usr/bin/env python
### Use inotify to tail logfile (argv[1]), mimicking tail -F (follow if rotated).
###
### Usage: ./tail-F_inotify.py /path/to/file
###
### @author: "Charlie Schluting"
### Copyright: the only free-as-in-freedom one: MIT license
###
import sys, os, pyinotify
from optparse import OptionParser
parser = OptionParser()
parser.add_option("--debug", help="print debug messages", action="store_true", dest="debug")
(options, args) = parser.parse_args()
myfile = args[0]
if options.debug:
print "I am totally opening " + myfile
wm = pyinotify.WatchManager()
# watched events on the directory, and parse $path for file_of_interest:
dirmask = pyinotify.IN_MODIFY | pyinotify.IN_DELETE | pyinotify.IN_MOVE_SELF | pyinotify.IN_CREATE
# open file, skip to end..
global fh
fh = open(myfile, 'r')
fh.seek(0,2)
# the event handlers:
class PTmp(pyinotify.ProcessEvent):
def process_IN_MODIFY(self, event):
if myfile not in os.path.join(event.path, event.name):
return
else:
print fh.readline().rstrip()
def process_IN_MOVE_SELF(self, event):
if options.debug:
print "The file moved! Continuing to read from that, until a new one is created.."
def process_IN_CREATE(self, event):
if myfile in os.path.join(event.path, event.name):
# yay, I exist, umm.. again!
global fh
fh.close
fh = open(myfile, 'r')
# catch up, in case lines were written during the time we were re-opening:
if options.debug:
print "My file was created! I'm now catching up with lines in the newly created file."
for line in fh.readlines():
print line.rstrip()
# then skip to the end, and wait for more IN_MODIFY events
fh.seek(0,2)
return
notifier = pyinotify.Notifier(wm, PTmp())
# watch the directory, so we can get IN_CREATE events and re-open the file when logrotate comes along.
# if you just watch the file, pyinotify errors when it moves, saying "can't track, can't trust it.. watch
# the directory".
index = myfile.rfind('/')
wm.add_watch(myfile[:index], dirmask)
while True:
try:
notifier.process_events()
if notifier.check_events():
notifier.read_events()
except KeyboardInterrupt:
break
# cleanup: stop the inotify, and close the file handle:
notifier.stop()
fh.close()
sys.exit(0)
===== Scapy =====
* [[http://wiki.spiritofhack.net/index.php/Scapy-usage|Tuto Scapy 1]]
* [[http://repo.zenk-security.com/Protocoles_reseaux_securisation/Les%20Fourberies%20de%20Scapy.pdf|Tuto scapy 2]]
* [[http://www.siteduzero.com/tutoriel-3-568537-manipulez-les-paquets-reseau-avec-scapy.html|Tuto scapy 3]]
* [[http://libpfb.so/uploads/media/Cours_scapy.pdf|Tuto Scapy 4]] (le plus complet)
source : http://www.kbrandt.com/2011/02/creating-a-histogram-of-tcp-window-sizes-from-a-packet-capture-using-python.html
from scapy.all import IP,TCP,rdpcap
from pylab import *
#Import the Capture File
a = rdpcap('trace.pcap')
#Filter the Capture File
b = [ pkt for pkt in a if IP in pkt and
(pkt[IP].src == '10.7.0.12' or pkt[IP].dst == '10.7.0.12') ]
#Create an array of TCP Window sizes from the capture
wins = [ int(win[TCP].window) for win in b if TCP in win ]
#Create the Histogram
hist(wins, bins=100)
#Display it
ylabel('Frequency')
xlabel('TCP Window Size')
show()
Création d'une requête DNS :
>>> p = sr1(IP(dst="8.8.8.8")/UDP(sport=RandShort(),dport=53)/DNS(rd=1,qd=DNSQR(qname="velannes.com", qtype="A")), timeout=0.5)
Ou avec l'entête ethernet (noter que le nom de la fonction change) :
>>> p = srp1(Ether()/IP(dst="8.8.8.8")/UDP(sport=RandShort(),dport=53)/DNS(rd=1,qd=DNSQR(qname="velannes.com", qtype="A")))
Begin emission:
.Finished to send 1 packets.
*
Received 2 packets, got 1 answers, remaining 0 packets
>>> p
an= ns=None ar=None |>>>>
>>> p.summary()
'IP / UDP / DNS Ans "94.23.218.89" '
===== Fabric =====
* [[http://docs.fabfile.org/en/latest/index.html|Fabric - Pour coder des scripts de déploiement]]
* [[http://docs.fabfile.org/en/latest/tutorial.html|Fabric - Tuto]]
* [[http://www.unixgarden.com/index.php/gnu-linux-magazine/fabric-administration-distante-dun-parc-dynamique-de-serveursmachines/3|Exemple]]
* Le fichier doit s'appeler **fabfile.py**.
* **fab -l** pour voir les fonctions disponibles.
#!/usr/bin/env python
import sys, os, getpass
#import sys, os, getpass, multiprocessing
try:
from fabric.api import *
from fabric.colors import *
except:
print('Fabric module must be installed !')
sys.exit(1)
sys.tracebacklimit = 0
output['running'] = False
output['stdout'] = False
output['aborts'] = False
output['status'] = False
output['warnings'] = False
#env.user = 'root'
#env.password = None
#env.passwords = {
# 'user@host':'pass',
# 'gwuser@gwhost':'gwpass'
#}
#env.shell = "/bin/sh -c"
#env.gateway = None
env.disable_known_hosts = True
#env.parralel = True
#env.pool_size = multiprocessing.cpu_count() #Sets the number of concurrent processes to use when executing tasks in parallel.
#env.keepalive = 300
#env.timeout = 10 #Network ssh connection timeout, in seconds.
#env.command_timeout = 60
env.linewise = True
env.warn_only = True
env.role_default = 'adm'
env.roledefs = {
'eth': ['svr1', 'svr2', 'svr3', 'svr4', 'svr5', 'svr6', 'svr7', 'svr8', 'svr9', 'svr10', 'svr11', 'svr12', 'svr13', 'svr14', 'svr15', 'svr16'],
'adm': ['svr1-adm', 'svr2-adm', 'svr3-adm', 'svr4-adm', 'svr5-adm', 'svr6-adm', 'svr7-adm', 'svr8-adm', 'svr9-adm', 'svr10-adm', 'svr11-adm', 'svr12-adm', 'svr13-adm', 'svr14-adm', 'svr15-adm', 'svr16-adm'],
'ib': ['svr1-ib', 'svr2-ib', 'svr3-ib', 'svr4-ib', 'svr5-ib', 'svr6-ib', 'svr7-ib', 'svr8-ib', 'svr9-ib', 'svr10-ib', 'svr11-ib', 'svr12-ib', 'svr13-ib', 'svr14-ib', 'svr15-ib', 'svr16-ib'],
'imm': ['svr1-imm', 'svr2-imm', 'svr3-imm', 'svr4-imm', 'svr5-imm', 'svr6-imm', 'svr7-imm', 'svr8-imm', 'svr9-imm', 'svr10-imm', 'svr11-imm', 'svr12-imm', 'svr13-imm', 'svr14-imm', 'svr15-imm', 'svr16-imm'],
'web_adm': ['sw-eth', 'sw-ib'],
}
env.roledefs['web_adm'] += env.roledefs['imm']
if env.hosts == [] and env.roles == []:
env.hosts = env.roledefs[env.role_default]
#list_all = []
#for server in env.roledefs.values():
# list_all += server
#env.roledefs['all'] = list(set(list_all))
def VERIF_IPMI_PASSWORD():
if 'IPMI_PASSWORD' in os.environ:
globals()['IPMI_PASSWORD'] = os.environ['IPMI_PASSWORD']
if 'IPMI_PASSWORD' not in globals():
IPMI_PASSWORD = getpass.getpass(prompt='IMM password : ')
globals()['IPMI_PASSWORD'] = IPMI_PASSWORD
@task
@serial
def lr():
""" : Liste les roles utilisables avec l'option '-R'"""
for role in env.roledefs:
if role == env.role_default:
print("%s (default) : %s" % (role, ' ,'.join(env.roledefs[role])))
else:
print("%s : %s" % (role, ' ,'.join(env.roledefs[role])))
sys.exit()
#@parallel(pool_size = multiprocessing.cpu_count() + 1)
@task
def cmd(cmd, show = None):
""" : Run command on a host (show='details' for details, show='status' for see if command is successful or not)"""
#with show('stdout','running','warnings'):
try:
out = run(cmd)
except Exception as err:
sys.stdout.write(yellow("%(host)s (%(user)s) : " % env, bold = True))
print(red(err))
return
if show == None:
for line in out.splitlines():
sys.stdout.write(yellow("%(host)s (%(user)s) : " % env, bold = True))
print(line)
if out.failed:
sys.stdout.write(yellow("%(host)s (%(user)s) : " % env, bold = True))
print(red(' KO', bold = True)) + " (return %s)" % out.return_code
elif show == 'details' or show == 'status':
sys.stdout.write(yellow("Executing on %(host)s as %(user)s : " % env, bold = True) + blue(cmd, bold = True))
if out.failed:
print(red(' KO', bold = True))
else:
print(green(' OK', bold = True))
if show == 'details':
print(yellow('OUT : (return code = %s)' % out.return_code))
print('%s\n' % out)
else:
print("Option %s unknow" % show)
sys.exit(1)
@task
def send_file(src, dst = None, mode = None):
""" : Send file on a host (dst=, mode=)"""
if dst == None:
dst = src
if mode == None:
put(src, dst)
else:
try:
int(mode)
except:
print("'mode' value must be an integer !")
sys.exit(1)
put(src, dst, mode=mode)
@task
def reboot(wait = 0):
""" : Reboot a host (wait=)"""
if os.getlogin() != 'root':
print('Only root must be reboot a host')
sys.exit(1)
try:
int(wait)
except:
print("'wait' value must be an integer !")
sys.exit(1)
reboot(wait = wait)
@serial
@task
def power(state):
""" : IMM power manager (state=)"""
if not env.host in env.roledefs['imm']:
print('Use "-R imm" or "-H -imm"')
print('%s not in this table : %s' % (env.host, ','.join(env.roledefs['imm'])))
sys.exit(1)
if state not in ['status','on','off','cycle','reset','diag','soft']:
print("%s state is not supported. Please use state !" % state)
sys.exit(1)
VERIF_IPMI_PASSWORD()
out = local("ipmitool -I lanplus -C 2 -U system -P '%s' -E -H %s chassis power %s" % (globals()['IPMI_PASSWORD'], env.host, state), capture=True)
if out.failed:
sys.stdout.write(yellow("%(host)s : " % env, bold = True))
print(red('Failed ! (return code : %s)' % out.return_code))
print(out.stderr)
sys.exit(1)
else:
for line in out.splitlines():
sys.stdout.write(yellow("%(host)s : " % env, bold = True))
print(line)
@serial
@task
def console(action='activate'):
""" : IMM SOL (action=)"""
if len(env.hosts) != 1:
print('Only on one host. Use option -H to precise the host !')
sys.exit(1)
if not env.host in env.roledefs['imm']:
print('Use "-H -imm"')
print('%s not in this table : %s' % (env.host, ','.join(env.roledefs['imm'])))
sys.exit(1)
VERIF_IPMI_PASSWORD()
os.system("ipmitool -I lanplus -C 2 -U system -P '%s' -E -H %s sol %s" % (globals()['IPMI_PASSWORD'], env.host, action))
@serial
@task
def web_adm():
""" : Open Firefox to administrate (use -H option)"""
if len(env.hosts) != 1:
print('Only on one host. Use option -H to precise the host (%s)!' % ', '.join(env.roledefs['web_adm']))
sys.exit(1)
if os.getenv('DISPLAY') == None:
print('You must have a valid display !')
sys.exit(1)
if env.host not in env.roledefs['web_adm']:
print("%s is not a valid host ! Valid host are %s" % (env.host, ', '.join(env.roledefs['web_adm'])))
sys.exit(1)
os.system('firefox https://%s/' % env.host)
===== readline =====
import readline, signal
prompt = 'GigiX > '
def signal_handler(signal, frame):
print
input(prompt)
return
signal.signal(signal.SIGINT, signal_handler)
readline.parse_and_bind("tab: complete")
class completer:
def __init__(self, tree):
self.tree = tree
def _parser(self, line, search):
if (search is None) or (callable(search) == True):
return []
elif len(line) == 0:
return [x for x in search]
elif len(line) == 1 and readline.get_line_buffer()[-1] != ' ':
return ['%s ' % x for x in search if x.startswith(line[0])]
else:
return self._parser(line[1:], search[line[0]])
return []
def complete(self, text, state):
line = readline.get_line_buffer().split()
results = self._parser(line, self.tree)
return results[state]
def linux_sles_12():
print('Je suis Linux SLES 12')
tree = {
'unix': {
'solaris': {
'8':None,
'9':None,
'10':None,
'11':None,
}
},
'linux': {
'debian': {
'buzz':None,
'rex':None,
'bo':None,
'hamm':None,
'slink':None,
'patato':None,
'woody':None,
'sarge':None,
'etch':None,
'lenny':None,
'squeeze':None,
'wheezy':None,
'jessie':None,
},
'RHEL': {
'3':None,
'4':None,
'5':None,
'6':None,
'7':None,
},
'SLES': {
'9':None,
'10':None,
'11':None,
'12':linux_sles_12,
}
},
'windows': {
'xp':None,
'vista':None,
'seven':None,
'8':None,
'10':None,
}
}
readline.set_completer(completer(tree).complete)
l = ''
while l != ('quit' or 'exit'):
try:
l = input(prompt)
eval("tree['"+"']['".join(l.split())+"']()")
except EOFError:
print('Bye')
break
except:
continue
===== Twisted =====
* Site : http://twistedmatrix.com
* Doc : http://twistedmatrix.com/documents/current/api/moduleIndex.html
* Exemples : http://twistedmatrix.com/documents/current/
Cette bibliothèque est orientée vers le réseau. Elle supporte de nombreux protocoles de communication réseau : TCP, UDP, SMTP, HTTP, PROXY, SSH...
===== Shutil=====
* [[http://python.developpez.com/cours/docs.python.org/2.6/library/shutil.php|Copie/Suppression de fichiers/dossiers (récurssif) ]]
===== Psutils =====
* [[http://code.google.com/p/psutil/|Gestion des processus / mémoire / disque / réseau]]
===== Ctypes =====
Ce module permet de charger une librairie C et de faire appelles aux fonctions de la librairies.
Aide : http://docs.python.org/release/2.7.2/library/ctypes.html
Exemple :
#!/usr/bin/env python
import ctypes, ctypes.util
libc=ctypes.cdll.LoadLibrary(ctypes.util.find_library('c'))
libc.printf("uid = %d - gid = %d\n", libc.geteuid(), libc.getgid())
Si on veut faire appel à un syscall en langage C il faut connaître l'id. Ci dessous les id des systems call pour 32 et 64 bits :
* [[http://www.acsu.buffalo.edu/~charngda/linux_syscalls_32bit.html|Syscalls 32 bits]]
* [[http://www.acsu.buffalo.edu/~charngda/linux_syscalls_64bit.html|Syscalls 64 bits]]
===== ConfigParser =====
Permet de charger / sauvegarder une configuration
* __**Sauvegarde**__
* ConfigParser.ConfigParser permet de déclarer notre objet
* cp.add_section(section) permet de rajouter une section à notre structure
* cp.set(section, option, value) permet de rajouter la clé option à la section existante section avec la valeur value
* cp.write(fileobject) permet de sauvegarder la structure
import ConfigParser
config = ConfigParser.ConfigParser()
config.add_section('Section String')
config.set('Section String', 'str1', 'MonString')
config.add_section('Section Integer')
config.set('Section String', 'int1', 100)
config.write(open('conf.cfg','w'))
* __**Chargement**__
On peut utiliser 4 méthodes dépendantes du type de la clé:
* cp.get( section, option) pour les clés de type string
*cp.getint( section, option) pour les clés de type integer
*cp.getfloat( section, option) pour les clés de type float
* cp.getboolean( section, option) pour les clés de type boolean
config.read('conf.cfg')
monint = config.getint('Section Integer', 'int1')
monstring = config.get('Section String', 'str1')
===== fcntl =====
Locker un fichier :
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import fcntl, time
myfile = open('/tmp/gigix', 'r')
try:
fcntl.flock(myfile, fcntl.LOCK_EX|fcntl.LOCK_NB)
except IOError:
print("Can't immediately write-lock the file, waiting...")
fcntl.flock(myfile, fcntl.LOCK_EX)
finally:
print('Lock acquire, sleep 30 secs...')
time.sleep(30)
fcntl.flock(myfile, fcntl.LOCK_UN)
myfile.close()
print('Bye...')
* **LOCK_UN** : unlock
* **LOCK_SH** : acquire a shared lock
* **LOCK_EX** : acquire an exclusive lock
===== datetime / calendar / time =====
Modules qui permettent de manipuler les dates :
* [[http://docs.python.org/2/library/datetime.html|datetime]]
* [[http://docs.python.org/2/library/calendar.html#module-calendar|calendar]]
* [[http://docs.python.org/2/library/time.html#module-time|time]]
===== ftplib =====
Module permettant la connexion à un serveur FTP :
* [[http://docs.python.org/2/library/ftplib.html|Site Officiel]]
* [[http://www.siteduzero.com/informatique/tutoriels/utiliser-le-module-ftp-de-python/toutes-les-fonctions-une-a-une|Autre Tuto]]
===== smtplib / email =====
Envoyer des emails en python :
* [[http://docs.python.org/2/library/email-examples.html|Exemples]]
===== poplib / imaplib =====
Lire ses emails avec python :
* [[http://docs.python.org/2/library/poplib.html|Protocol pop]]
* [[http://docs.python.org/2/library/imaplib.html|Protocole imap]]
===== PAM =====
Fourni dans le paquage **python-pam** : /usr/share/doc/python-pam/examples/pamtest.py
#!/usr/bin/env python
import sys
import PAM
from getpass import getpass
def pam_conv(auth, query_list, userData):
resp = []
for i in range(len(query_list)):
query, type = query_list[i]
if type == PAM.PAM_PROMPT_ECHO_ON:
val = raw_input(query)
resp.append((val, 0))
elif type == PAM.PAM_PROMPT_ECHO_OFF:
val = getpass(query)
resp.append((val, 0))
elif type == PAM.PAM_PROMPT_ERROR_MSG or type == PAM.PAM_PROMPT_TEXT_INFO:
print query
resp.append(('', 0))
else:
return None
return resp
service = 'passwd'
if len(sys.argv) == 2:
user = sys.argv[1]
else:
user = None
auth = PAM.pam()
auth.start(service)
if user != None:
auth.set_item(PAM.PAM_USER, user)
auth.set_item(PAM.PAM_CONV, pam_conv)
try:
auth.authenticate()
auth.acct_mgmt()
except PAM.error, resp:
print('Go away! (%s)' % resp)
except:
print('Internal error')
else:
print('Good to go!')
===== sqlalchemy =====
sqlalchemy est un ORM permet de se connecter aux BDD de manière objet plusieurs connecteurs (postgres, mysql, sqlite...[[http://docs.sqlalchemy.org/en/rel_0_8/core/engines.html|info connecteurs]]) :
* sqlalchemy ([[http://docs.sqlalchemy.org/en/latest/orm/|Tuto Officiel]] / [[http://www.rmunn.com/sqlalchemy-tutorial/tutorial.html|Tuto 2]])
===== itertools=====
* Tuto : http://www.siteduzero.com/informatique/tutoriels/pratiques-avancees-et-meconnues-en-python/le-module-itertools
* combinations :
>>> list(itertools.combinations(['A','B','C','D'],3))
[('A', 'B', 'C'), ('A', 'B', 'D'), ('A', 'C', 'D'), ('B', 'C', 'D')]
* imap, ifilter, ifilterfalse :
>>> from itertools import imap, ifilter, ifilterfalse
>>> pair = lambda n: n % 2 == 0
>>> carre = lambda n: n ** 2
>>> imap(carre, xrange(10)) # Renvoie un générateur
>>> for elem in imap(carre, xrange(10)): print elem
...
0
1
4
9
16
25
36
49
64
81
>>> list(ifilter(pair, xrange(10)))
[0, 2, 4, 6, 8]
>>> list(ifilterfalse(pair, xrange(10)))
[1, 3, 5, 7, 9]
* chain :
>>> from itertools import chain
>>> list(chain(xrange(11), xrange(15, 21), xrange(30, 46)))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 16, 17, 18, 19, 20, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45]
* takewhile, dropwhile :
>>> from itertools import takewhile, dropwhile
>>> list(takewhile(lambda i: i < 10, xrange(20))) # On prend tant que i < 10
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(dropwhile(lambda i: i < 10, xrange(20))) # On jette tant que i < 10
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
* groupby :
>>> from itertools import groupby
>>> liste = list('aaaaabbbcddddee')
>>> for element, iterateur in groupby(liste):
... # element est l'élément répété une ou plusieurs fois
... # iterateur est un itérateur sur la répétition de cet élément ; on l'utilise particulièrement pour savoir combien de fois il est répété
... print "%s est répété %d fois" % (element, len(list(iterateur)))
...
a est répété 5 fois
b est répété 3 fois
c est répété 1 fois
d est répété 4 fois
e est répété 2 fois
* izip :
>>> for i in itertools.izip([1, 2, 3], ['a', 'b', 'c']):
... i
...
(1, 'a')
(2, 'b')
(3, 'c')
>>> for i in itertools.izip(itertools.count(5), ['a', 'b', 'c']): i
...
(5, 'a')
(6, 'b')
(7, 'c')
* cycle :
>>> for i in itertools.cycle(['a', 'b', 'c']):i
'a'
'b'
'c'
'a'
'b'
'c'
'a'
'b'
'c'
'a'
'b'
'c'
...
...
===== string =====
>>> string.ascii_letters
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
>>> string.digits
'0123456789'
>>> string.printable
'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ \t\n\r\x0b\x0c'
>>> string.punctuation
'!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
>>> string.whitespace
'\t\n\x0b\x0c\r '
===== zip =====
>>> zip(['a', 'b', 'c'], ['d', 'e', 'f'])
[('a', 'd'), ('b', 'e'), ('c', 'f')]
===== operator =====
Trie par rapport a un index :
>>> import operator
>>> d = dict(a=1, b=2, c=1, d=2, e=1, f=2, g=3)
>>> sorted(d.iteritems(), key=operator.itemgetter(1))
[('a', 1), ('c', 1), ('e', 1), ('b', 2), ('d', 2), ('f', 2), ('g', 3)]
===== dbus =====
Dbus permet d’interagir avec les autres programmes identifiés par DBUS comme knotify, le service de notifications KDE.
Pour voir la liste des services DBUS, lancer la commande qdbus :
gigi@debian:~$ qdbus| grep knotify
org.kde.knotify
#!/usr/bin/env python
import sys, dbus
knotify = dbus.SessionBus().get_object("org.kde.knotify", "/Notify")
try:
title, text = sys.argv[1:3]
except:
print 'Usage: knotify.py title text'
sys.exit(1)
knotify.event("warning", "kde", [], title, text, [], [], 0, 0,dbus_interface="org.kde.KNotify")