Outils pour utilisateurs

Outils du site


python:programmes:twisted_ssl_server

Authentification par certificat SSL avec twisted et openssl

Serveur

serveur_ssl.py
#!/usr/bin/env python
# Author : Ghislain LE MEUR
try:
        import sys, os, socket, shutil, logging
        from OpenSSL import SSL, crypto
        from twisted.internet.protocol import Factory, Protocol
        from twisted.internet import ssl, reactor
        from twisted.python import log
except Exception as e:
        print('%s', e)
        sys.exit(1)
 
port = 8000
 
ssl_dir = os.path.join(os.path.dirname(sys.argv[0]), 'ssl')
ssl_ca = os.path.join(ssl_dir, 'ca.crt')
ssl_crt = os.path.join(ssl_dir, 'server.crt')
ssl_key = os.path.join(ssl_dir, 'server.key')
ssl_trusted = os.path.join(ssl_dir, 'trusted')
ssl_rejected = os.path.join(ssl_dir, 'rejected')
 
def ssl_generate():
        logServer.info('Cetificate crt/key is missing. Generate certificate for server...')
        try:
                k = crypto.PKey()
                k.generate_key(crypto.TYPE_RSA, 4096)
 
                cert = crypto.X509()
                cert.get_subject().C = 'FR'
                cert.get_subject().ST = 'Yvelines'
                cert.get_subject().L = 'xxx'
                cert.get_subject().O = 'xxx'
                cert.get_subject().OU = 'xxx'
                cert.get_subject().CN = socket.gethostname()
 
                #cert.set_serial_number(1000)
                cert.gmtime_adj_notBefore(0)
                cert.gmtime_adj_notAfter(315360000)
                cert.set_issuer(cert.get_subject())
                cert.set_pubkey(k)
                cert.sign(k, 'sha1')
 
                open(ssl_crt, 'w').write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
                open(ssl_key, 'w').write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
 
                os.chmod(ssl_key, 0400)
        except Exception as e:
                print(dir(e))
                logServer.critical('Stopping server => %s' % e)
                sys.exit(1)
 
def verif_ssl():
        if os.path.isdir(ssl_dir):
                if not oct(os.stat(ssl_dir).st_mode & 0777) != 0555:
                        try: os.chmod(ssl_dir, 0555)
                        except Exception as e:
                                logServer.critical('Stopping server => %s' % e)
                                sys.exit(1)
        else:
                try:
                        os.mkdir(ssl_dir)
                        os.chmod(ssl_dir, 0555)
                except Exception as e:
                        logServer.critical('Stopping server => %s' % e)
                        sys.exit(1)
 
        if os.path.isfile(ssl_crt) and os.path.isfile(ssl_key):
                if oct(os.stat(ssl_crt).st_mode & 0777) != 0444:
                        try: os.chmod(ssl_crt, 0444)
                        except Exception as e:
                                logServer.critical('Stopping server => %s' % e)
                                sys.exit(1)
                if oct(os.stat(ssl_key).st_mode & 0777) != 0400:
                        try: os.chmod(ssl_key, 0444)
                        except Exception as e:
                                logServer.critical('Stopping server => %s' % e)
                                sys.exit(1)
        else: ssl_generate()
 
        if not os.path.isfile(ssl_ca):
                try: shutil.copy(ssl_crt, ssl_ca)
                except Exception as e:
                        logServer.critical('Stopping server => %s' % e)
                        #sys.exit(sys.errno)
 
        if not os.path.isdir(ssl_trusted):
                logServer.info("%s doesn't exist, create directory" % ssl_trusted)
                try: os.mkdir(ssl_trusted)
                except Exception as e:
                        logServer.critical('%s', e)
                        sys.exit(1)
 
        if not os.path.isdir(ssl_rejected):
                logServer.info("%s doesn't exist, create directory" % ssl_rejected)
                try: os.mkdir(ssl_rejected)
                except Exception as e:
                        logServer.critical('%s', e)
                        sys.exit(1)
 
 
class ServerContextFactory(ssl.ContextFactory, Protocol):
    def _verify(self, connection, x509, errnum, errdepth, ok):
        try:
                print self.transport.getPeer()
        except Exception as e:
                print "Error : %s" % e
 
        try:
                print connection.getpeername()
        except Exception as e:
                print "Error : %s" % e
 
        logServer.info('subjetc=%s issuer=%s err=%s errdepth=%s ok=%s' %(x509.get_subject(), x509.get_issuer(), errnum, errdepth, ok))
        if ok != 1:
                CN = x509.get_subject().commonName.decode()
                if errnum == 18:
                        logServer.warn('Certificate with CN %s rejected, certificate not trusted !' % CN)
                        try: open(os.path.join(ssl_rejected, '%s.crt' % CN), 'w').write(crypto.dump_certificate(crypto.FILETYPE_PEM, x509))
                        except Exception as e: logServer.error('Impossible to write crt file in %s directory => %s' % (ssl_rejected, e))
                else: logServer.warn('Unknown error for certificate %s' % CN)
        return ok
 
    def getContext(self):
        try:
                #ctx.set_passwd_cb(self.passphraseCB)
                ctx = SSL.Context(SSL.SSLv23_METHOD)
                ctx.use_certificate_file(ssl_crt)
                ctx.use_privatekey_file(ssl_key)
                #ctx.load_client_ca('ca/all-cas.cert')
                ctx.load_verify_locations(ssl_ca)
                ctx.set_verify(SSL.VERIFY_PEER|SSL.VERIFY_FAIL_IF_NO_PEER_CERT,self._verify)
                ctx.set_verify_depth(10)
        except Exception as e:
                logServer.critical('Impossible to initialize SSL context => %s' % e)
                reactor.stop()
                sys.exit(1)
        return ctx
 
class MyProtocol(Protocol):
    def connectionMade(self):
        #print 'connectionMade', self.transport.getPeerCertificate()
        #log.msg(self.transport.getHost())
        #logServer.info(self.transport.getPeer())
        return self.transport.write('Server : Connection OK\n')
 
    def dataReceived(self, data):
        print 'dataReceived', self.transport.getPeerCertificate()
        print data
        return self.transport.write('Server : %s\n' % data)
 
if __name__ == '__main__':
    logLevel = logging.INFO
    logServer = logging.getLogger('server')
    logClient = logging.getLogger('client')
 
    if logLevel == logging.DEBUG: logging.basicConfig(format = "%(asctime)s - %(name)s - %(pathname)s - %(funcName)s (%(lineno)d) - %(levelname)s : %(message)s", datefmt= "%d/%m/%Y %H:%M:%S", level=logging.DEBUG)
    else: logging.basicConfig(format = "%(asctime)s %(name)s %(levelname)s : %(message)s", datefmt= "%d/%m/%Y %H:%M:%S", level = logLevel)
 
    observer = log.PythonLoggingObserver(loggerName='twisted')
    observer.logger.setLevel(logging.DEBUG)
    observer.start()
 
    verif_ssl()
 
    from twisted.internet.protocol import ServerFactory
    factory = ServerFactory()
    factory.protocol = MyProtocol
    try:
        reactor.listenSSL(port, factory, ServerContextFactory())
        reactor.run()
    except Exception as e:
        logServer.critical('%s', e)
        sys.exit(1)

Client

ssl_client.py
#!/usr/bin/env python
# Author : Ghislain LE MEUR
try:
        import sys, os, logging
        from OpenSSL import SSL
        from twisted.internet.protocol import ClientFactory
        from twisted.protocols.basic import LineReceiver
        from twisted.internet import ssl, reactor
        from twisted.python import log
except Exception as e:
        print('%s', e)
        sys.exit(1)
 
port = 8000
 
ssl_dir = os.path.join(os.path.dirname(sys.argv[0]), 'ssl')
ssl_ca = os.path.join(ssl_dir, 'ca.crt')
ssl_crt = os.path.join(ssl_dir, 'client.crt')
ssl_key = os.path.join(ssl_dir, 'client.key')
 
def ssl_generate():
        logClient.info('Cetificate crt/key is missing. Generate certificate for server...')
        try:
                k = crypto.PKey()
                k.generate_key(crypto.TYPE_RSA, 4096)
 
                cert = crypto.X509()
                cert.get_subject().C = 'FR'
                cert.get_subject().ST = 'Yvelines'
                cert.get_subject().L = 'xxx'
                cert.get_subject().O = 'xxx'
                cert.get_subject().OU = 'xxx'
                cert.get_subject().CN = socket.gethostname()
 
                #cert.set_serial_number(1000)
                cert.gmtime_adj_notBefore(0)
                cert.gmtime_adj_notAfter(315360000)
                cert.set_issuer(cert.get_subject())
                cert.set_pubkey(k)
                cert.sign(k, 'sha1')
 
                open(ssl_crt, 'w').write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
                open(ssl_key, 'w').write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
 
                os.chmod(ssl_key, 0400)
        except Exception as e:
                logClient.critical('Stopping server => %s' % e)
                sys.exit(1)
 
def verif_ssl():
        if os.path.isdir(ssl_dir):
                if not oct(os.stat(ssl_dir).st_mode & 0777) != 0555:
                        try: os.chmod(ssl_dir, 0555)
                        except Exception as e:
                                logClient.critical('Stopping server => %s' % e)
                                sys.exit(1)
        else:
                try:
                        os.mkdir(ssl_dir)
                        os.chmod(ssl_dir, 0555)
                except Exception as e:
                        logClient.critical('Stopping server => %s' % e)
                        sys.exit(1)
 
        if os.path.isfile(ssl_crt) and os.path.isfile(ssl_key):
                if oct(os.stat(ssl_crt).st_mode & 0777) != 0444:
                        try: os.chmod(ssl_crt, 0444)
                        except Exception as e:
                                logClient.critical('Stopping server => %s' % e)
                                sys.exit(1)
                if oct(os.stat(ssl_key).st_mode & 0777) != 0400:
                        try: os.chmod(ssl_key, 0444)
                        except Exception as e:
                                logClient.critical('Stopping server => %s' % e)
                                sys.exit(1)
        else: ssl_generate()
 
        if not os.path.isfile(ssl_ca):
                try: shutil.copy(ssl_crt, ssl_ca)
                except Exception as e:
                        logClient.critical('Stopping server => %s' % e)
                        sys.exit(1)
 
class ClientContextFactory(ssl.ClientContextFactory):
    def _verify(self, connection, x509, errnum, errdepth, ok):
        logClient.info('subjetc=%s issuer=%s err=%s errdepth=%s ok=%s' %(x509.get_subject(), x509.get_issuer(), errnum, errdepth, ok))
        if ok != 1:
                CN = x509.get_subject().commonName.decode()
                if errnum == 18:
                        logClient.warn('Certificate with CN %s rejected, certificate not trusted !' % CN)
                        try: open(os.path.join(ssl_rejected, '%s.crt' % CN), 'w').write(crypto.dump_certificate(crypto.FILETYPE_PEM, x509))
                        except Exception as e: logClient.error('Impossible to write crt file in %s directory => %s' % (ssl_rejected, e))
                else: logClient.warn('Unknown error for certificate %s' % CN)
        return ok
 
    def getContext(self):
        try:
                ctx = ssl.ClientContextFactory.getContext(self)
                ctx.use_certificate_file(ssl_crt)
                ctx.use_privatekey_file(ssl_key)
                ctx.load_verify_locations(ssl_ca)
                ctx.set_verify(SSL.VERIFY_PEER|SSL.VERIFY_FAIL_IF_NO_PEER_CERT, self._verify)
        except Exception as e:
                logClient.critical('Impossible to initialize SSL context => %s' % e)
                reactor.stop()
                sys.exit(1)
        return ctx
 
class MyProtocol(LineReceiver):
    def connectionMade(self):
        #return self.transport.write('Hello, world By Client !' + self.delimiter)
        return self.sendLine("Hello, world By Client !")
        #self.transport.loseConnection()
 
#    def connectionLost(self, reason):
#        print 'connection lost : %s' % reason.getErrorMessage()
#       return self.transport.loseConnection()
#       return LineReceiver.connectionLost(reason)
#       return LineReceiver.connectionLost(self, reason)
 
    def lineReceived(self, line):
        #x509 = self.transport.getPeerCertificate()
        #print x509
        print "Receive : %s" % line
 
#    def sendLine(self, line):
#       print('%s line was sent' % line)
#       return self.transport.write(line + self.delimiter)
 
class MyFactory(ClientFactory):
    protocol = MyProtocol
 
    def clientConnectionFailed(self, connector, reason):
        logClient.critical('Connection failed => %s', reason.getErrorMessage())
        reactor.stop()
 
    def clientConnectionLost(self, connector, reason):
        logClient.critical('Connection lost => %s', reason.getErrorMessage())
        reactor.stop()
 
if __name__ == '__main__':
    logLevel = logging.INFO
    logServer = logging.getLogger('server')
    logClient = logging.getLogger('client')
 
    if logLevel == logging.DEBUG: logging.basicConfig(format = "%(asctime)s - %(name)s - %(pathname)s - %(funcName)s (%(lineno)d) - %(levelname)s : %(message)s", datefmt= "%d/%m/%Y %H:%M:%S", level=logging.DEBUG)
    else: logging.basicConfig(format = "%(asctime)s %(name)s %(levelname)s : %(message)s", datefmt= "%d/%m/%Y %H:%M:%S", level = logLevel)
 
    observer = log.PythonLoggingObserver(loggerName='twisted')
    observer.logger.setLevel(logging.DEBUG)
    observer.start()
 
    verif_ssl()
 
    if len(sys.argv) > 1: host = sys.argv[1]
    else: host = 'localhost'
 
    factory = MyFactory()
    try:
        reactor.connectSSL(host, port, factory, ClientContextFactory())
        reactor.run()
    except Exception as e:
        logClient.critical('%s', e)
python/programmes/twisted_ssl_server.txt · Dernière modification : 2014/02/20 23:05 de root