====== Authentification par certificat SSL avec twisted et openssl ======
===== Serveur =====
#!/usr/bin/env python
# Author : Ghislain LE MEUR
try:
import sys, os, socket, shutil, logging
from OpenSSL import SSL, crypto
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import ssl, reactor
from twisted.python import log
except Exception as e:
print('%s', e)
sys.exit(1)
port = 8000
ssl_dir = os.path.join(os.path.dirname(sys.argv[0]), 'ssl')
ssl_ca = os.path.join(ssl_dir, 'ca.crt')
ssl_crt = os.path.join(ssl_dir, 'server.crt')
ssl_key = os.path.join(ssl_dir, 'server.key')
ssl_trusted = os.path.join(ssl_dir, 'trusted')
ssl_rejected = os.path.join(ssl_dir, 'rejected')
def ssl_generate():
logServer.info('Cetificate crt/key is missing. Generate certificate for server...')
try:
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 4096)
cert = crypto.X509()
cert.get_subject().C = 'FR'
cert.get_subject().ST = 'Yvelines'
cert.get_subject().L = 'xxx'
cert.get_subject().O = 'xxx'
cert.get_subject().OU = 'xxx'
cert.get_subject().CN = socket.gethostname()
#cert.set_serial_number(1000)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(315360000)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
cert.sign(k, 'sha1')
open(ssl_crt, 'w').write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
open(ssl_key, 'w').write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
os.chmod(ssl_key, 0400)
except Exception as e:
print(dir(e))
logServer.critical('Stopping server => %s' % e)
sys.exit(1)
def verif_ssl():
if os.path.isdir(ssl_dir):
if not oct(os.stat(ssl_dir).st_mode & 0777) != 0555:
try: os.chmod(ssl_dir, 0555)
except Exception as e:
logServer.critical('Stopping server => %s' % e)
sys.exit(1)
else:
try:
os.mkdir(ssl_dir)
os.chmod(ssl_dir, 0555)
except Exception as e:
logServer.critical('Stopping server => %s' % e)
sys.exit(1)
if os.path.isfile(ssl_crt) and os.path.isfile(ssl_key):
if oct(os.stat(ssl_crt).st_mode & 0777) != 0444:
try: os.chmod(ssl_crt, 0444)
except Exception as e:
logServer.critical('Stopping server => %s' % e)
sys.exit(1)
if oct(os.stat(ssl_key).st_mode & 0777) != 0400:
try: os.chmod(ssl_key, 0444)
except Exception as e:
logServer.critical('Stopping server => %s' % e)
sys.exit(1)
else: ssl_generate()
if not os.path.isfile(ssl_ca):
try: shutil.copy(ssl_crt, ssl_ca)
except Exception as e:
logServer.critical('Stopping server => %s' % e)
#sys.exit(sys.errno)
if not os.path.isdir(ssl_trusted):
logServer.info("%s doesn't exist, create directory" % ssl_trusted)
try: os.mkdir(ssl_trusted)
except Exception as e:
logServer.critical('%s', e)
sys.exit(1)
if not os.path.isdir(ssl_rejected):
logServer.info("%s doesn't exist, create directory" % ssl_rejected)
try: os.mkdir(ssl_rejected)
except Exception as e:
logServer.critical('%s', e)
sys.exit(1)
class ServerContextFactory(ssl.ContextFactory, Protocol):
def _verify(self, connection, x509, errnum, errdepth, ok):
try:
print self.transport.getPeer()
except Exception as e:
print "Error : %s" % e
try:
print connection.getpeername()
except Exception as e:
print "Error : %s" % e
logServer.info('subjetc=%s issuer=%s err=%s errdepth=%s ok=%s' %(x509.get_subject(), x509.get_issuer(), errnum, errdepth, ok))
if ok != 1:
CN = x509.get_subject().commonName.decode()
if errnum == 18:
logServer.warn('Certificate with CN %s rejected, certificate not trusted !' % CN)
try: open(os.path.join(ssl_rejected, '%s.crt' % CN), 'w').write(crypto.dump_certificate(crypto.FILETYPE_PEM, x509))
except Exception as e: logServer.error('Impossible to write crt file in %s directory => %s' % (ssl_rejected, e))
else: logServer.warn('Unknown error for certificate %s' % CN)
return ok
def getContext(self):
try:
#ctx.set_passwd_cb(self.passphraseCB)
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_certificate_file(ssl_crt)
ctx.use_privatekey_file(ssl_key)
#ctx.load_client_ca('ca/all-cas.cert')
ctx.load_verify_locations(ssl_ca)
ctx.set_verify(SSL.VERIFY_PEER|SSL.VERIFY_FAIL_IF_NO_PEER_CERT,self._verify)
ctx.set_verify_depth(10)
except Exception as e:
logServer.critical('Impossible to initialize SSL context => %s' % e)
reactor.stop()
sys.exit(1)
return ctx
class MyProtocol(Protocol):
def connectionMade(self):
#print 'connectionMade', self.transport.getPeerCertificate()
#log.msg(self.transport.getHost())
#logServer.info(self.transport.getPeer())
return self.transport.write('Server : Connection OK\n')
def dataReceived(self, data):
print 'dataReceived', self.transport.getPeerCertificate()
print data
return self.transport.write('Server : %s\n' % data)
if __name__ == '__main__':
logLevel = logging.INFO
logServer = logging.getLogger('server')
logClient = logging.getLogger('client')
if logLevel == logging.DEBUG: logging.basicConfig(format = "%(asctime)s - %(name)s - %(pathname)s - %(funcName)s (%(lineno)d) - %(levelname)s : %(message)s", datefmt= "%d/%m/%Y %H:%M:%S", level=logging.DEBUG)
else: logging.basicConfig(format = "%(asctime)s %(name)s %(levelname)s : %(message)s", datefmt= "%d/%m/%Y %H:%M:%S", level = logLevel)
observer = log.PythonLoggingObserver(loggerName='twisted')
observer.logger.setLevel(logging.DEBUG)
observer.start()
verif_ssl()
from twisted.internet.protocol import ServerFactory
factory = ServerFactory()
factory.protocol = MyProtocol
try:
reactor.listenSSL(port, factory, ServerContextFactory())
reactor.run()
except Exception as e:
logServer.critical('%s', e)
sys.exit(1)
===== Client =====
#!/usr/bin/env python
# Author : Ghislain LE MEUR
try:
import sys, os, logging
from OpenSSL import SSL
from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet import ssl, reactor
from twisted.python import log
except Exception as e:
print('%s', e)
sys.exit(1)
port = 8000
ssl_dir = os.path.join(os.path.dirname(sys.argv[0]), 'ssl')
ssl_ca = os.path.join(ssl_dir, 'ca.crt')
ssl_crt = os.path.join(ssl_dir, 'client.crt')
ssl_key = os.path.join(ssl_dir, 'client.key')
def ssl_generate():
logClient.info('Cetificate crt/key is missing. Generate certificate for server...')
try:
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 4096)
cert = crypto.X509()
cert.get_subject().C = 'FR'
cert.get_subject().ST = 'Yvelines'
cert.get_subject().L = 'xxx'
cert.get_subject().O = 'xxx'
cert.get_subject().OU = 'xxx'
cert.get_subject().CN = socket.gethostname()
#cert.set_serial_number(1000)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(315360000)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
cert.sign(k, 'sha1')
open(ssl_crt, 'w').write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
open(ssl_key, 'w').write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
os.chmod(ssl_key, 0400)
except Exception as e:
logClient.critical('Stopping server => %s' % e)
sys.exit(1)
def verif_ssl():
if os.path.isdir(ssl_dir):
if not oct(os.stat(ssl_dir).st_mode & 0777) != 0555:
try: os.chmod(ssl_dir, 0555)
except Exception as e:
logClient.critical('Stopping server => %s' % e)
sys.exit(1)
else:
try:
os.mkdir(ssl_dir)
os.chmod(ssl_dir, 0555)
except Exception as e:
logClient.critical('Stopping server => %s' % e)
sys.exit(1)
if os.path.isfile(ssl_crt) and os.path.isfile(ssl_key):
if oct(os.stat(ssl_crt).st_mode & 0777) != 0444:
try: os.chmod(ssl_crt, 0444)
except Exception as e:
logClient.critical('Stopping server => %s' % e)
sys.exit(1)
if oct(os.stat(ssl_key).st_mode & 0777) != 0400:
try: os.chmod(ssl_key, 0444)
except Exception as e:
logClient.critical('Stopping server => %s' % e)
sys.exit(1)
else: ssl_generate()
if not os.path.isfile(ssl_ca):
try: shutil.copy(ssl_crt, ssl_ca)
except Exception as e:
logClient.critical('Stopping server => %s' % e)
sys.exit(1)
class ClientContextFactory(ssl.ClientContextFactory):
def _verify(self, connection, x509, errnum, errdepth, ok):
logClient.info('subjetc=%s issuer=%s err=%s errdepth=%s ok=%s' %(x509.get_subject(), x509.get_issuer(), errnum, errdepth, ok))
if ok != 1:
CN = x509.get_subject().commonName.decode()
if errnum == 18:
logClient.warn('Certificate with CN %s rejected, certificate not trusted !' % CN)
try: open(os.path.join(ssl_rejected, '%s.crt' % CN), 'w').write(crypto.dump_certificate(crypto.FILETYPE_PEM, x509))
except Exception as e: logClient.error('Impossible to write crt file in %s directory => %s' % (ssl_rejected, e))
else: logClient.warn('Unknown error for certificate %s' % CN)
return ok
def getContext(self):
try:
ctx = ssl.ClientContextFactory.getContext(self)
ctx.use_certificate_file(ssl_crt)
ctx.use_privatekey_file(ssl_key)
ctx.load_verify_locations(ssl_ca)
ctx.set_verify(SSL.VERIFY_PEER|SSL.VERIFY_FAIL_IF_NO_PEER_CERT, self._verify)
except Exception as e:
logClient.critical('Impossible to initialize SSL context => %s' % e)
reactor.stop()
sys.exit(1)
return ctx
class MyProtocol(LineReceiver):
def connectionMade(self):
#return self.transport.write('Hello, world By Client !' + self.delimiter)
return self.sendLine("Hello, world By Client !")
#self.transport.loseConnection()
# def connectionLost(self, reason):
# print 'connection lost : %s' % reason.getErrorMessage()
# return self.transport.loseConnection()
# return LineReceiver.connectionLost(reason)
# return LineReceiver.connectionLost(self, reason)
def lineReceived(self, line):
#x509 = self.transport.getPeerCertificate()
#print x509
print "Receive : %s" % line
# def sendLine(self, line):
# print('%s line was sent' % line)
# return self.transport.write(line + self.delimiter)
class MyFactory(ClientFactory):
protocol = MyProtocol
def clientConnectionFailed(self, connector, reason):
logClient.critical('Connection failed => %s', reason.getErrorMessage())
reactor.stop()
def clientConnectionLost(self, connector, reason):
logClient.critical('Connection lost => %s', reason.getErrorMessage())
reactor.stop()
if __name__ == '__main__':
logLevel = logging.INFO
logServer = logging.getLogger('server')
logClient = logging.getLogger('client')
if logLevel == logging.DEBUG: logging.basicConfig(format = "%(asctime)s - %(name)s - %(pathname)s - %(funcName)s (%(lineno)d) - %(levelname)s : %(message)s", datefmt= "%d/%m/%Y %H:%M:%S", level=logging.DEBUG)
else: logging.basicConfig(format = "%(asctime)s %(name)s %(levelname)s : %(message)s", datefmt= "%d/%m/%Y %H:%M:%S", level = logLevel)
observer = log.PythonLoggingObserver(loggerName='twisted')
observer.logger.setLevel(logging.DEBUG)
observer.start()
verif_ssl()
if len(sys.argv) > 1: host = sys.argv[1]
else: host = 'localhost'
factory = MyFactory()
try:
reactor.connectSSL(host, port, factory, ClientContextFactory())
reactor.run()
except Exception as e:
logClient.critical('%s', e)