if hasattr(protocol, 'transport') and protocol.transport is None:
protocol.transport = FakeTransport() # work around Twisted bug
- d = defer.succeed(None)
- d.addCallback(lambda ign: protocol.write("This server supports only SFTP, not shell sessions.\n"))
- d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
- return d
+ return self._unsupported(protocol)
def execCommand(self, protocol, cmd):
self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
d = defer.succeed(None)
if cmd == "df -P -k /":
d.addCallback(lambda ign: protocol.write(
- "Filesystem 1024-blocks Used Available Capacity Mounted on\n"
- "tahoe 628318530 314159265 314159265 50% /\n"))
+ "Filesystem 1024-blocks Used Available Capacity Mounted on\r\n"
+ "tahoe 628318530 314159265 314159265 50% /\r\n"))
d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
else:
- d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
+ d.addCallback(lambda ign: self._unsupported(protocol))
+ return d
+
+ def _unsupported(self, protocol):
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: protocol.errReceived(
+ "This server supports only the SFTP protocol. It does not support SCP,\r\n"
+ "interactive shell sessions, or commands other than one needed by sshfs.\r\n"))
+ d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
return d
def windowChanged(self, newWindowSize):
return d
def test_execCommand_and_openShell(self):
- class FakeProtocol:
+ class MockProtocol:
def __init__(self):
self.output = ""
+ self.error = ""
self.reason = None
+
def write(self, data):
+ return self.outReceived(data)
+
+ def outReceived(self, data):
self.output += data
return defer.succeed(None)
+
+ def errReceived(self, data):
+ self.error += data
+ return defer.succeed(None)
+
def processEnded(self, reason):
self.reason = reason
return defer.succeed(None)
+ def _lines_end_in_crlf(s):
+ return s.replace('\r\n', '').find('\n') == -1 and s.endswith('\r\n')
+
d = self._set_up("execCommand_and_openShell")
d.addCallback(lambda ign: conch_interfaces.ISession(self.handler))
def _exec_df(session):
- protocol = FakeProtocol()
+ protocol = MockProtocol()
d2 = session.execCommand(protocol, "df -P -k /")
d2.addCallback(lambda ign: self.failUnlessIn("1024-blocks", protocol.output))
+ d2.addCallback(lambda ign: self.failUnless(_lines_end_in_crlf(protocol.output), protocol.output))
+ d2.addCallback(lambda ign: self.failUnlessEqual(protocol.error, ""))
d2.addCallback(lambda ign: self.failUnless(isinstance(protocol.reason.value, ProcessDone)))
d2.addCallback(lambda ign: session.eofReceived())
d2.addCallback(lambda ign: session.closed())
return d2
d.addCallback(_exec_df)
+ def _check_unsupported(protocol):
+ d2 = defer.succeed(None)
+ d2.addCallback(lambda ign: self.failUnlessEqual(protocol.output, ""))
+ d2.addCallback(lambda ign: self.failUnlessIn("only the SFTP protocol", protocol.error))
+ d2.addCallback(lambda ign: self.failUnless(_lines_end_in_crlf(protocol.error), protocol.error))
+ d2.addCallback(lambda ign: self.failUnless(isinstance(protocol.reason.value, ProcessTerminated)))
+ d2.addCallback(lambda ign: self.failUnlessEqual(protocol.reason.value.exitCode, 1))
+ return d2
+
d.addCallback(lambda ign: conch_interfaces.ISession(self.handler))
def _exec_error(session):
- protocol = FakeProtocol()
+ protocol = MockProtocol()
d2 = session.execCommand(protocol, "error")
d2.addCallback(lambda ign: session.windowChanged(None))
- d2.addCallback(lambda ign: self.failUnlessEqual("", protocol.output))
- d2.addCallback(lambda ign: self.failUnless(isinstance(protocol.reason.value, ProcessTerminated)))
- d2.addCallback(lambda ign: self.failUnlessEqual(protocol.reason.value.exitCode, 1))
+ d2.addCallback(lambda ign: _check_unsupported(protocol))
d2.addCallback(lambda ign: session.closed())
return d2
d.addCallback(_exec_error)
d.addCallback(lambda ign: conch_interfaces.ISession(self.handler))
def _openShell(session):
- protocol = FakeProtocol()
+ protocol = MockProtocol()
d2 = session.openShell(protocol)
- d2.addCallback(lambda ign: self.failUnlessIn("only SFTP", protocol.output))
- d2.addCallback(lambda ign: self.failUnless(isinstance(protocol.reason.value, ProcessTerminated)))
- d2.addCallback(lambda ign: self.failUnlessEqual(protocol.reason.value.exitCode, 1))
+ d2.addCallback(lambda ign: _check_unsupported(protocol))
d2.addCallback(lambda ign: session.closed())
return d2
d.addCallback(_openShell)