#!/usr/bin/env python # Ghislain LE MEUR import twisted.cred as cred, twisted.cred.credentials, twisted.cred.checkers, twisted.cred.portal import twisted.conch as conch, twisted.conch.recvline, twisted.conch.avatar, twisted.conch.checkers, twisted.conch.ssh, twisted.conch.ssh.factory, twisted.conch.ssh.session import twisted.internet as internet, twisted.internet.defer import twisted.application as application from twisted.python.randbytes import secureRandom from twisted.python import log from zope.interface import implements import os, sys, re, socket, PAM class PamPasswordDatabase: """Authentication/authorization backend using the 'login' PAM service""" credentialInterfaces = cred.credentials.IUsernamePassword, implements(cred.checkers.ICredentialsChecker) def requestAvatarId(self, credentials): service = 'passwd' auth = PAM.pam() auth.start(service) auth.set_item(PAM.PAM_USER, credentials.username) auth.set_item(PAM.PAM_CONV, lambda *args: [(credentials.password, 0)]) try: auth.authenticate() auth.acct_mgmt() except(PAM.error): return internet.defer.fail(cred.error.UnauthorizedLogin("invalid password")) except: return internet.defer.fail(cred.error.UnauthorizedLogin("internal error")) else: return internet.defer.succeed(credentials.username) class mySSHProtocol(conch.recvline.HistoricRecvLine): def __init__(self, user): self.user = user self.prompt = "%s (%s)> " % (socket.gethostname(), self.user.username) #def keystrokeReceived(self, keyID, modifier): # if keyID == '\x04': # CTRL+d (eof) # self.terminal.loseConnection() # elif keyID == '\x0c': # CTRL+l # self.terminal.eraseDisplay() #(clear screen) # elif keyID == '\x03': # CTRL+c # pass # #self._call.stop() def connectionMade(self): conch.recvline.HistoricRecvLine.connectionMade(self) self.terminal.write("GigiX SSH Server V1.0") self.terminal.nextLine() self.terminal.write(self.prompt) #def showPrompt(self, line=''): # self.terminal.write("%s (%s)> " % (socket.gethostname(), self.user.username)) def lineReceived(self, line): if line == 'whoami': self.terminal.write(self.user.username) self.terminal.nextLine() self.terminal.write(self.prompt) elif line == 'exit': self.terminal.loseConnection() elif line == '': self.terminal.write(self.prompt) else: self.terminal.write('Unknown command !') self.terminal.nextLine() self.terminal.write(self.prompt) class mySSHAvatar(conch.avatar.ConchUser): implements(conch.interfaces.ISession) def __init__(self, username): conch.avatar.ConchUser.__init__(self) self.username = username self.channelLookup.update({'session':conch.ssh.session.SSHSession}) def openShell(self, protocol): serverProtocol = conch.insults.insults.ServerProtocol(mySSHProtocol, self) serverProtocol.makeConnection(protocol) protocol.makeConnection(conch.ssh.session.wrapProtocol(serverProtocol)) def getPty(self, terminal, windowSize, attrs): self.ptyReq = True def execCommand(self, protocol, cmd): pass def eofReceived(self): pass def closed(self): pass class mySSHRealm: implements(cred.portal.IRealm) def requestAvatar(self, avatarId, mind, *interfaces): if conch.interfaces.IConchUser in interfaces: return(interfaces[0], mySSHAvatar(avatarId), lambda: None) else: raise(Exception, "No supported interfaces found.") def getRSAKeys(): from Crypto.PublicKey import RSA dir_keys = os.path.join(os.path.dirname(sys.argv[0]), 'ssh_keys') publicKey = os.path.join(dir_keys, 'public.key') privateKey = os.path.join(dir_keys, 'private.key') try: if not os.path.isdir(dir_keys): os.mkdir(dir_keys) if not os.path.isfile(publicKey) or not os.path.isfile(privateKey): key_size = 4096 log.msg('Generating %i bits RSA keypair...' % key_size) cle = RSA.generate(key_size, secureRandom) file(publicKey, 'w').write(conch.ssh.keys.Key(cle).public().toString('openssh')) file(privateKey, 'w').write(conch.ssh.keys.Key(cle).toString('openssh')) os.chmod(privateKey, 0400) except Exception as e: log.err('Unable to create SSH keys, please verify write access on %s : %s' % (dir_keys, e)) sys.exit(1) try: publicKeyString = file(publicKey, 'r').read() privateKeyString = file(privateKey, 'r').read() except Exception as e: log.err('Unable to read SSH keys, please verify read access on %s : %s' % (dir_keys, e)) sys.exit(1) return publicKeyString, privateKeyString if __name__ == "__main__": log.startLogging(sys.stdout) sshFactory = conch.ssh.factory.SSHFactory() sshFactory.portal = cred.portal.Portal(mySSHRealm()) #users = {'admin': 'aaa', 'guest': 'bbb'} #sshFactory.portal.registerChecker(cred.checkers.InMemoryUsernamePasswordDatabaseDontUse(**users)) sshFactory.portal.registerChecker(PamPasswordDatabase()) # Supports PAM sshFactory.portal.registerChecker(conch.checkers.SSHPublicKeyDatabase()) # Supports PKI pubKeyString, privKeyString = getRSAKeys() sshFactory.publicKeys = {'ssh-rsa': conch.ssh.keys.Key.fromString(data = pubKeyString)} sshFactory.privateKeys = {'ssh-rsa': conch.ssh.keys.Key.fromString(data = privKeyString)} internet.reactor.listenTCP(2222, sshFactory) internet.reactor.run()