====== Authentification par certificat SSL avec twisted et openssl ====== ===== Serveur ===== #!/usr/bin/env python # Author : Ghislain LE MEUR try: import sys, os, socket, shutil, logging from OpenSSL import SSL, crypto from twisted.internet.protocol import Factory, Protocol from twisted.internet import ssl, reactor from twisted.python import log except Exception as e: print('%s', e) sys.exit(1) port = 8000 ssl_dir = os.path.join(os.path.dirname(sys.argv[0]), 'ssl') ssl_ca = os.path.join(ssl_dir, 'ca.crt') ssl_crt = os.path.join(ssl_dir, 'server.crt') ssl_key = os.path.join(ssl_dir, 'server.key') ssl_trusted = os.path.join(ssl_dir, 'trusted') ssl_rejected = os.path.join(ssl_dir, 'rejected') def ssl_generate(): logServer.info('Cetificate crt/key is missing. Generate certificate for server...') try: k = crypto.PKey() k.generate_key(crypto.TYPE_RSA, 4096) cert = crypto.X509() cert.get_subject().C = 'FR' cert.get_subject().ST = 'Yvelines' cert.get_subject().L = 'xxx' cert.get_subject().O = 'xxx' cert.get_subject().OU = 'xxx' cert.get_subject().CN = socket.gethostname() #cert.set_serial_number(1000) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(315360000) cert.set_issuer(cert.get_subject()) cert.set_pubkey(k) cert.sign(k, 'sha1') open(ssl_crt, 'w').write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) open(ssl_key, 'w').write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k)) os.chmod(ssl_key, 0400) except Exception as e: print(dir(e)) logServer.critical('Stopping server => %s' % e) sys.exit(1) def verif_ssl(): if os.path.isdir(ssl_dir): if not oct(os.stat(ssl_dir).st_mode & 0777) != 0555: try: os.chmod(ssl_dir, 0555) except Exception as e: logServer.critical('Stopping server => %s' % e) sys.exit(1) else: try: os.mkdir(ssl_dir) os.chmod(ssl_dir, 0555) except Exception as e: logServer.critical('Stopping server => %s' % e) sys.exit(1) if os.path.isfile(ssl_crt) and os.path.isfile(ssl_key): if oct(os.stat(ssl_crt).st_mode & 0777) != 0444: try: os.chmod(ssl_crt, 0444) except Exception as e: logServer.critical('Stopping server => %s' % e) sys.exit(1) if oct(os.stat(ssl_key).st_mode & 0777) != 0400: try: os.chmod(ssl_key, 0444) except Exception as e: logServer.critical('Stopping server => %s' % e) sys.exit(1) else: ssl_generate() if not os.path.isfile(ssl_ca): try: shutil.copy(ssl_crt, ssl_ca) except Exception as e: logServer.critical('Stopping server => %s' % e) #sys.exit(sys.errno) if not os.path.isdir(ssl_trusted): logServer.info("%s doesn't exist, create directory" % ssl_trusted) try: os.mkdir(ssl_trusted) except Exception as e: logServer.critical('%s', e) sys.exit(1) if not os.path.isdir(ssl_rejected): logServer.info("%s doesn't exist, create directory" % ssl_rejected) try: os.mkdir(ssl_rejected) except Exception as e: logServer.critical('%s', e) sys.exit(1) class ServerContextFactory(ssl.ContextFactory, Protocol): def _verify(self, connection, x509, errnum, errdepth, ok): try: print self.transport.getPeer() except Exception as e: print "Error : %s" % e try: print connection.getpeername() except Exception as e: print "Error : %s" % e logServer.info('subjetc=%s issuer=%s err=%s errdepth=%s ok=%s' %(x509.get_subject(), x509.get_issuer(), errnum, errdepth, ok)) if ok != 1: CN = x509.get_subject().commonName.decode() if errnum == 18: logServer.warn('Certificate with CN %s rejected, certificate not trusted !' % CN) try: open(os.path.join(ssl_rejected, '%s.crt' % CN), 'w').write(crypto.dump_certificate(crypto.FILETYPE_PEM, x509)) except Exception as e: logServer.error('Impossible to write crt file in %s directory => %s' % (ssl_rejected, e)) else: logServer.warn('Unknown error for certificate %s' % CN) return ok def getContext(self): try: #ctx.set_passwd_cb(self.passphraseCB) ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.use_certificate_file(ssl_crt) ctx.use_privatekey_file(ssl_key) #ctx.load_client_ca('ca/all-cas.cert') ctx.load_verify_locations(ssl_ca) ctx.set_verify(SSL.VERIFY_PEER|SSL.VERIFY_FAIL_IF_NO_PEER_CERT,self._verify) ctx.set_verify_depth(10) except Exception as e: logServer.critical('Impossible to initialize SSL context => %s' % e) reactor.stop() sys.exit(1) return ctx class MyProtocol(Protocol): def connectionMade(self): #print 'connectionMade', self.transport.getPeerCertificate() #log.msg(self.transport.getHost()) #logServer.info(self.transport.getPeer()) return self.transport.write('Server : Connection OK\n') def dataReceived(self, data): print 'dataReceived', self.transport.getPeerCertificate() print data return self.transport.write('Server : %s\n' % data) if __name__ == '__main__': logLevel = logging.INFO logServer = logging.getLogger('server') logClient = logging.getLogger('client') if logLevel == logging.DEBUG: logging.basicConfig(format = "%(asctime)s - %(name)s - %(pathname)s - %(funcName)s (%(lineno)d) - %(levelname)s : %(message)s", datefmt= "%d/%m/%Y %H:%M:%S", level=logging.DEBUG) else: logging.basicConfig(format = "%(asctime)s %(name)s %(levelname)s : %(message)s", datefmt= "%d/%m/%Y %H:%M:%S", level = logLevel) observer = log.PythonLoggingObserver(loggerName='twisted') observer.logger.setLevel(logging.DEBUG) observer.start() verif_ssl() from twisted.internet.protocol import ServerFactory factory = ServerFactory() factory.protocol = MyProtocol try: reactor.listenSSL(port, factory, ServerContextFactory()) reactor.run() except Exception as e: logServer.critical('%s', e) sys.exit(1) ===== Client ===== #!/usr/bin/env python # Author : Ghislain LE MEUR try: import sys, os, logging from OpenSSL import SSL from twisted.internet.protocol import ClientFactory from twisted.protocols.basic import LineReceiver from twisted.internet import ssl, reactor from twisted.python import log except Exception as e: print('%s', e) sys.exit(1) port = 8000 ssl_dir = os.path.join(os.path.dirname(sys.argv[0]), 'ssl') ssl_ca = os.path.join(ssl_dir, 'ca.crt') ssl_crt = os.path.join(ssl_dir, 'client.crt') ssl_key = os.path.join(ssl_dir, 'client.key') def ssl_generate(): logClient.info('Cetificate crt/key is missing. Generate certificate for server...') try: k = crypto.PKey() k.generate_key(crypto.TYPE_RSA, 4096) cert = crypto.X509() cert.get_subject().C = 'FR' cert.get_subject().ST = 'Yvelines' cert.get_subject().L = 'xxx' cert.get_subject().O = 'xxx' cert.get_subject().OU = 'xxx' cert.get_subject().CN = socket.gethostname() #cert.set_serial_number(1000) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(315360000) cert.set_issuer(cert.get_subject()) cert.set_pubkey(k) cert.sign(k, 'sha1') open(ssl_crt, 'w').write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) open(ssl_key, 'w').write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k)) os.chmod(ssl_key, 0400) except Exception as e: logClient.critical('Stopping server => %s' % e) sys.exit(1) def verif_ssl(): if os.path.isdir(ssl_dir): if not oct(os.stat(ssl_dir).st_mode & 0777) != 0555: try: os.chmod(ssl_dir, 0555) except Exception as e: logClient.critical('Stopping server => %s' % e) sys.exit(1) else: try: os.mkdir(ssl_dir) os.chmod(ssl_dir, 0555) except Exception as e: logClient.critical('Stopping server => %s' % e) sys.exit(1) if os.path.isfile(ssl_crt) and os.path.isfile(ssl_key): if oct(os.stat(ssl_crt).st_mode & 0777) != 0444: try: os.chmod(ssl_crt, 0444) except Exception as e: logClient.critical('Stopping server => %s' % e) sys.exit(1) if oct(os.stat(ssl_key).st_mode & 0777) != 0400: try: os.chmod(ssl_key, 0444) except Exception as e: logClient.critical('Stopping server => %s' % e) sys.exit(1) else: ssl_generate() if not os.path.isfile(ssl_ca): try: shutil.copy(ssl_crt, ssl_ca) except Exception as e: logClient.critical('Stopping server => %s' % e) sys.exit(1) class ClientContextFactory(ssl.ClientContextFactory): def _verify(self, connection, x509, errnum, errdepth, ok): logClient.info('subjetc=%s issuer=%s err=%s errdepth=%s ok=%s' %(x509.get_subject(), x509.get_issuer(), errnum, errdepth, ok)) if ok != 1: CN = x509.get_subject().commonName.decode() if errnum == 18: logClient.warn('Certificate with CN %s rejected, certificate not trusted !' % CN) try: open(os.path.join(ssl_rejected, '%s.crt' % CN), 'w').write(crypto.dump_certificate(crypto.FILETYPE_PEM, x509)) except Exception as e: logClient.error('Impossible to write crt file in %s directory => %s' % (ssl_rejected, e)) else: logClient.warn('Unknown error for certificate %s' % CN) return ok def getContext(self): try: ctx = ssl.ClientContextFactory.getContext(self) ctx.use_certificate_file(ssl_crt) ctx.use_privatekey_file(ssl_key) ctx.load_verify_locations(ssl_ca) ctx.set_verify(SSL.VERIFY_PEER|SSL.VERIFY_FAIL_IF_NO_PEER_CERT, self._verify) except Exception as e: logClient.critical('Impossible to initialize SSL context => %s' % e) reactor.stop() sys.exit(1) return ctx class MyProtocol(LineReceiver): def connectionMade(self): #return self.transport.write('Hello, world By Client !' + self.delimiter) return self.sendLine("Hello, world By Client !") #self.transport.loseConnection() # def connectionLost(self, reason): # print 'connection lost : %s' % reason.getErrorMessage() # return self.transport.loseConnection() # return LineReceiver.connectionLost(reason) # return LineReceiver.connectionLost(self, reason) def lineReceived(self, line): #x509 = self.transport.getPeerCertificate() #print x509 print "Receive : %s" % line # def sendLine(self, line): # print('%s line was sent' % line) # return self.transport.write(line + self.delimiter) class MyFactory(ClientFactory): protocol = MyProtocol def clientConnectionFailed(self, connector, reason): logClient.critical('Connection failed => %s', reason.getErrorMessage()) reactor.stop() def clientConnectionLost(self, connector, reason): logClient.critical('Connection lost => %s', reason.getErrorMessage()) reactor.stop() if __name__ == '__main__': logLevel = logging.INFO logServer = logging.getLogger('server') logClient = logging.getLogger('client') if logLevel == logging.DEBUG: logging.basicConfig(format = "%(asctime)s - %(name)s - %(pathname)s - %(funcName)s (%(lineno)d) - %(levelname)s : %(message)s", datefmt= "%d/%m/%Y %H:%M:%S", level=logging.DEBUG) else: logging.basicConfig(format = "%(asctime)s %(name)s %(levelname)s : %(message)s", datefmt= "%d/%m/%Y %H:%M:%S", level = logLevel) observer = log.PythonLoggingObserver(loggerName='twisted') observer.logger.setLevel(logging.DEBUG) observer.start() verif_ssl() if len(sys.argv) > 1: host = sys.argv[1] else: host = 'localhost' factory = MyFactory() try: reactor.connectSSL(host, port, factory, ClientContextFactory()) reactor.run() except Exception as e: logClient.critical('%s', e)