Twisted SSH Server

ssh_server.py
#!/usr/bin/env python
# Ghislain LE MEUR
import twisted.cred as cred, twisted.cred.credentials, twisted.cred.checkers, twisted.cred.portal
import twisted.conch as conch, twisted.conch.recvline, twisted.conch.avatar, twisted.conch.checkers, twisted.conch.ssh, twisted.conch.ssh.factory, twisted.conch.ssh.session
import twisted.internet as internet, twisted.internet.defer
import twisted.application as application
from twisted.python.randbytes import secureRandom
from twisted.python import log
from zope.interface import implements
import os, sys, re, socket, PAM
 
 
class PamPasswordDatabase:
        """Authentication/authorization backend using the 'login' PAM service"""
        credentialInterfaces = cred.credentials.IUsernamePassword,
        implements(cred.checkers.ICredentialsChecker)
 
        def requestAvatarId(self, credentials):
                service = 'passwd'
                auth = PAM.pam()
                auth.start(service)
                auth.set_item(PAM.PAM_USER, credentials.username)
                auth.set_item(PAM.PAM_CONV, lambda *args: [(credentials.password, 0)])
                try:
                        auth.authenticate()
                        auth.acct_mgmt()
                except(PAM.error):
                        return internet.defer.fail(cred.error.UnauthorizedLogin("invalid password"))
                except:
                        return internet.defer.fail(cred.error.UnauthorizedLogin("internal error"))
                else:
                        return internet.defer.succeed(credentials.username)
 
class mySSHProtocol(conch.recvline.HistoricRecvLine):
        def __init__(self, user):
                self.user = user
                self.prompt = "%s (%s)> " % (socket.gethostname(), self.user.username)
 
        #def keystrokeReceived(self, keyID, modifier):
        #       if keyID == '\x04': # CTRL+d (eof)
        #               self.terminal.loseConnection()
        #       elif keyID == '\x0c': # CTRL+l
        #               self.terminal.eraseDisplay() #(clear screen)
        #       elif keyID == '\x03': # CTRL+c
        #               pass
        #               #self._call.stop()
 
        def connectionMade(self):
                conch.recvline.HistoricRecvLine.connectionMade(self)
                self.terminal.write("GigiX SSH Server V1.0")
                self.terminal.nextLine()
                self.terminal.write(self.prompt)
 
        #def showPrompt(self, line=''):
        #       self.terminal.write("%s (%s)> " % (socket.gethostname(), self.user.username))
 
        def lineReceived(self, line):
                if line == 'whoami':
                        self.terminal.write(self.user.username)
                        self.terminal.nextLine()
                        self.terminal.write(self.prompt)
                elif line == 'exit':
                        self.terminal.loseConnection()
                elif line == '':
                        self.terminal.write(self.prompt)
                else:
                        self.terminal.write('Unknown command !')
                        self.terminal.nextLine()
                        self.terminal.write(self.prompt)
 
class mySSHAvatar(conch.avatar.ConchUser):
        implements(conch.interfaces.ISession)
 
        def __init__(self, username):
                conch.avatar.ConchUser.__init__(self)
                self.username = username
                self.channelLookup.update({'session':conch.ssh.session.SSHSession})
 
        def openShell(self, protocol):
                serverProtocol = conch.insults.insults.ServerProtocol(mySSHProtocol, self)
                serverProtocol.makeConnection(protocol)
                protocol.makeConnection(conch.ssh.session.wrapProtocol(serverProtocol))
 
        def getPty(self, terminal, windowSize, attrs):
                self.ptyReq = True
 
        def execCommand(self, protocol, cmd):
                pass
 
        def eofReceived(self):
                pass
 
        def closed(self):
                pass
 
 
class mySSHRealm:
        implements(cred.portal.IRealm)
 
        def requestAvatar(self, avatarId, mind, *interfaces):
                if conch.interfaces.IConchUser in interfaces:
                        return(interfaces[0], mySSHAvatar(avatarId), lambda: None)
                else:
                        raise(Exception, "No supported interfaces found.")
 
def getRSAKeys():
        from Crypto.PublicKey import RSA
        dir_keys = os.path.join(os.path.dirname(sys.argv[0]), 'ssh_keys')
        publicKey = os.path.join(dir_keys, 'public.key')
        privateKey = os.path.join(dir_keys, 'private.key')
 
        try:
                if not os.path.isdir(dir_keys):
                        os.mkdir(dir_keys)
 
                if not os.path.isfile(publicKey) or not os.path.isfile(privateKey):
                        key_size = 4096
                        log.msg('Generating %i bits RSA keypair...' % key_size)
                        cle = RSA.generate(key_size, secureRandom)
                        file(publicKey, 'w').write(conch.ssh.keys.Key(cle).public().toString('openssh'))
                        file(privateKey, 'w').write(conch.ssh.keys.Key(cle).toString('openssh'))
                        os.chmod(privateKey, 0400)
        except Exception as e:
                log.err('Unable to create SSH keys, please verify write access on %s : %s' % (dir_keys, e))
                sys.exit(1)
 
        try:
                publicKeyString = file(publicKey, 'r').read()
                privateKeyString = file(privateKey, 'r').read()
        except Exception as e:
                log.err('Unable to read SSH keys, please verify read access on %s : %s' % (dir_keys, e))
                sys.exit(1)
 
        return publicKeyString, privateKeyString
 
if __name__ == "__main__":
        log.startLogging(sys.stdout)
        sshFactory = conch.ssh.factory.SSHFactory()
        sshFactory.portal = cred.portal.Portal(mySSHRealm())
        #users = {'admin': 'aaa', 'guest': 'bbb'}
        #sshFactory.portal.registerChecker(cred.checkers.InMemoryUsernamePasswordDatabaseDontUse(**users))
        sshFactory.portal.registerChecker(PamPasswordDatabase())   # Supports PAM
        sshFactory.portal.registerChecker(conch.checkers.SSHPublicKeyDatabase())  # Supports PKI
        pubKeyString, privKeyString = getRSAKeys()
        sshFactory.publicKeys = {'ssh-rsa': conch.ssh.keys.Key.fromString(data = pubKeyString)}
        sshFactory.privateKeys = {'ssh-rsa': conch.ssh.keys.Key.fromString(data = privKeyString)}
        internet.reactor.listenTCP(2222, sshFactory)
        internet.reactor.run()