2 import os, tempfile, heapq, binascii, traceback, array, stat
3 from stat import S_IFREG, S_IFDIR
4 from time import time, strftime, localtime
6 from zope.interface import implements
7 from twisted.python import components
8 from twisted.application import service, strports
9 from twisted.conch.ssh import factory, keys, session
10 from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
11 FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
12 FX_BAD_MESSAGE, FX_FAILURE
13 from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
14 FXF_CREAT, FXF_TRUNC, FXF_EXCL
15 from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
16 from twisted.conch.avatar import ConchUser
17 from twisted.conch.openssh_compat import primes
18 from twisted.cred import portal
20 from twisted.internet import defer
21 from twisted.internet.interfaces import IFinishableConsumer
22 from foolscap.api import eventually
23 from allmydata.util import deferredutil
25 from allmydata.util.consumer import download_to_data
26 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
28 from allmydata.mutable.common import NotWriteableError
29 from allmydata.immutable.upload import FileHandle
31 from pycryptopp.cipher.aes import AES
33 # twisted.conch.ssh.filetransfer generates this warning, but not when it is imported,
36 warnings.filterwarnings("ignore", category=DeprecationWarning,
37 message="BaseException.message has been deprecated as of Python 2.6",
38 module=".*filetransfer", append=True)
41 use_foolscap_logging = True
43 if use_foolscap_logging:
44 from allmydata.util.log import msg as logmsg, err as logerr, \
45 NOISY, OPERATIONAL, SCARY, PrefixingLogMixin
47 def logmsg(s, level=None):
49 def logerr(s, level=None):
54 class PrefixingLogMixin:
55 def __init__(self, facility=None):
57 def log(self, s, level=None):
61 def eventually_callback(d):
62 s = traceback.format_stack()
65 if noisy: logmsg("CALLBACK %r %r" % (d, res), level=NOISY)
67 except: # pragma: no cover
68 logerr("Failed to callback %r with %r\n"
69 "Original stack:\n!%s" %
70 (d, res, '!'.join(s)), level=SCARY)
73 return lambda res: eventually(_cb, res)
75 def eventually_errback(d):
76 s = traceback.format_stack()
79 if noisy: logmsg("ERRBACK %r %r" % (d, err), level=NOISY)
81 except: # pragma: no cover
82 logerr("Failed to errback %r with %r\n"
83 "Original stack:\n!%s" %
84 (d, err, '!'.join(s)), level=SCARY)
87 return lambda err: eventually(_eb, err)
89 def eventually_callback(d):
90 return lambda res: eventually(d.callback, res)
92 def eventually_errback(d):
93 return lambda err: eventually(d.errback, err)
96 def _raise_error(err):
99 if noisy: logmsg("RAISE %r" % (err,), level=NOISY)
100 #traceback.print_exc(err)
102 # The message argument to SFTPError must not reveal information that
103 # might compromise anonymity.
105 if err.check(SFTPError):
106 # original raiser of SFTPError has responsibility to ensure anonymity
108 if err.check(NoSuchChildError):
109 childname = err.value.args[0].encode('utf-8')
110 raise SFTPError(FX_NO_SUCH_FILE, childname)
111 if err.check(ExistingChildError) or err.check(NotWriteableError):
112 # later versions of SFTP define FX_FILE_ALREADY_EXISTS, but version 3 doesn't
113 msg = err.value.args[0].encode('utf-8')
114 raise SFTPError(FX_PERMISSION_DENIED, msg)
115 if err.check(NotImplementedError):
116 raise SFTPError(FX_OP_UNSUPPORTED, str(err.value))
117 if err.check(EOFError):
118 raise SFTPError(FX_EOF, "end of file reached")
119 if err.check(defer.FirstError):
120 _raise_error(err.value.subFailure)
122 # We assume that the type of error is not anonymity-sensitive.
123 raise SFTPError(FX_FAILURE, str(err.type))
125 def _repr_flags(flags):
126 return "|".join([f for f in
127 [(flags & FXF_READ) and "FXF_READ" or None,
128 (flags & FXF_WRITE) and "FXF_WRITE" or None,
129 (flags & FXF_APPEND) and "FXF_APPEND" or None,
130 (flags & FXF_CREAT) and "FXF_CREAT" or None,
131 (flags & FXF_TRUNC) and "FXF_TRUNC" or None,
132 (flags & FXF_EXCL) and "FXF_EXCL" or None,
136 def _lsLine(name, attrs):
139 st_mtime = attrs.get("mtime", 0)
140 st_mode = attrs["permissions"]
141 # TODO: check that clients are okay with this being a "?".
142 # (They should be because the longname is intended for human
144 st_size = attrs.get("size", "?")
145 # We don't know how many links there really are to this object.
148 # From <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
149 # We can't call the version in Twisted because we might have a version earlier than
150 # <http://twistedmatrix.com/trac/changeset/25412> (released in Twisted 8.2).
153 perms = array.array('c', '-'*10)
154 ft = stat.S_IFMT(mode)
155 if stat.S_ISDIR(ft): perms[0] = 'd'
156 elif stat.S_ISCHR(ft): perms[0] = 'c'
157 elif stat.S_ISBLK(ft): perms[0] = 'b'
158 elif stat.S_ISREG(ft): perms[0] = '-'
159 elif stat.S_ISFIFO(ft): perms[0] = 'f'
160 elif stat.S_ISLNK(ft): perms[0] = 'l'
161 elif stat.S_ISSOCK(ft): perms[0] = 's'
164 if mode&stat.S_IRUSR:perms[1] = 'r'
165 if mode&stat.S_IWUSR:perms[2] = 'w'
166 if mode&stat.S_IXUSR:perms[3] = 'x'
168 if mode&stat.S_IRGRP:perms[4] = 'r'
169 if mode&stat.S_IWGRP:perms[5] = 'w'
170 if mode&stat.S_IXGRP:perms[6] = 'x'
172 if mode&stat.S_IROTH:perms[7] = 'r'
173 if mode&stat.S_IWOTH:perms[8] = 'w'
174 if mode&stat.S_IXOTH:perms[9] = 'x'
175 # suid/sgid never set
178 l += str(st_nlink).rjust(5) + ' '
186 sixmo = 60 * 60 * 24 * 7 * 26
187 if st_mtime + sixmo < time(): # last edited more than 6mo ago
188 l += strftime("%b %d %Y ", localtime(st_mtime))
190 l += strftime("%b %d %H:%M ", localtime(st_mtime))
194 def _populate_attrs(childnode, metadata, writeable, size=None):
197 # see webapi.txt for what these times mean
199 if "linkmotime" in metadata.get("tahoe", {}):
200 attrs["mtime"] = int(metadata["tahoe"]["linkmotime"])
201 elif "mtime" in metadata:
202 attrs["mtime"] = int(metadata["mtime"])
204 if "linkcrtime" in metadata.get("tahoe", {}):
205 attrs["createtime"] = int(metadata["tahoe"]["linkcrtime"])
207 if "ctime" in metadata:
208 attrs["ctime"] = int(metadata["ctime"])
210 # We would prefer to omit atime, but SFTP version 3 can only
211 # accept mtime if atime is also set.
212 attrs["atime"] = attrs["mtime"]
214 # The permissions must have the extra bits (040000 or 0100000),
215 # otherwise the client will not call openDirectory.
217 # Directories and unknown nodes have no size, and SFTP doesn't
218 # require us to make one up.
219 # childnode might be None, meaning that the file doesn't exist yet,
220 # but we're going to write it later.
222 if childnode and childnode.is_unknown():
224 elif childnode and IDirectoryNode.providedBy(childnode):
225 perms = S_IFDIR | 0770
227 # For files, omit the size if we don't immediately know it.
228 if childnode and size is None:
229 size = childnode.get_size()
231 assert isinstance(size, (int, long)), repr(size)
233 perms = S_IFREG | 0660
236 perms &= S_IFDIR | S_IFREG | 0555 # clear 'w' bits
238 attrs["permissions"] = perms
240 # We could set the SSH_FILEXFER_ATTR_FLAGS here:
241 # ENCRYPTED would always be true ("The file is stored on disk
242 # using file-system level transparent encryption.")
243 # SYSTEM, HIDDEN, ARCHIVE and SYNC would always be false.
244 # READONLY and IMMUTABLE would be set according to
245 # childnode.is_readonly() and childnode.is_immutable()
247 # However, twisted.conch.ssh.filetransfer only implements
248 # SFTP version 3, which doesn't include these flags.
252 class EncryptedTemporaryFile(PrefixingLogMixin):
253 # not implemented: next, readline, readlines, xreadlines, writelines
256 PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
257 self.file = tempfile.TemporaryFile()
258 self.key = os.urandom(16) # AES-128
260 def _crypt(self, offset, data):
261 # FIXME: use random-access AES (pycryptopp ticket #18)
262 offset_big = offset // 16
263 offset_small = offset % 16
264 iv = binascii.unhexlify("%032x" % offset_big)
265 cipher = AES(self.key, iv=iv)
266 cipher.process("\x00"*offset_small)
267 return cipher.process(data)
275 def seek(self, offset, whence=os.SEEK_SET):
276 if noisy: self.log(".seek(%r, %r)" % (offset, whence), level=NOISY)
277 self.file.seek(offset, whence)
280 offset = self.file.tell()
281 if noisy: self.log(".tell() = %r" % (offset,), level=NOISY)
284 def read(self, size=-1):
285 if noisy: self.log(".read(%r)" % (size,), level=NOISY)
286 index = self.file.tell()
287 ciphertext = self.file.read(size)
288 plaintext = self._crypt(index, ciphertext)
291 def write(self, plaintext):
292 if noisy: self.log(".write(%r)" % (plaintext,), level=NOISY)
293 index = self.file.tell()
294 ciphertext = self._crypt(index, plaintext)
295 self.file.write(ciphertext)
297 def truncate(self, newsize):
298 if noisy: self.log(".truncate(%r)" % (newsize,), level=NOISY)
299 self.file.truncate(newsize)
302 class OverwriteableFileConsumer(PrefixingLogMixin):
303 implements(IFinishableConsumer)
304 """I act both as a consumer for the download of the original file contents, and as a
305 wrapper for a temporary file that records the downloaded data and any overwrites.
306 I use a priority queue to keep track of which regions of the file have been overwritten
307 but not yet downloaded, so that the download does not clobber overwritten data.
308 I use another priority queue to record milestones at which to make callbacks
309 indicating that a given number of bytes have been downloaded.
311 The temporary file reflects the contents of the file that I represent, except that:
312 - regions that have neither been downloaded nor overwritten, if present,
314 - the temporary file may be shorter than the represented file (it is never longer).
315 The latter's current size is stored in self.current_size.
317 This abstraction is mostly independent of SFTP. Consider moving it, if it is found
318 useful for other frontends."""
320 def __init__(self, check_abort, download_size, tempfile_maker):
321 PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
322 self.check_abort = check_abort
323 self.download_size = download_size
324 self.current_size = download_size
325 self.f = tempfile_maker()
327 self.milestones = [] # empty heap of (offset, d)
328 self.overwrites = [] # empty heap of (start, end)
329 self.done = self.when_reached(download_size) # adds a milestone
335 def get_current_size(self):
336 return self.current_size
338 def set_current_size(self, size):
339 if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
340 (size, self.current_size, self.downloaded), level=NOISY)
341 if size < self.current_size or size < self.downloaded:
342 self.f.truncate(size)
343 self.current_size = size
344 if size < self.download_size:
345 self.download_size = size
346 if self.downloaded >= self.download_size:
349 def registerProducer(self, p, streaming):
352 # call resumeProducing once to start things off
358 def write(self, data):
359 if noisy: self.log(".write(%r)" % (data,), level=NOISY)
360 if self.check_abort():
364 if self.downloaded >= self.download_size:
367 next_downloaded = self.downloaded + len(data)
368 if next_downloaded > self.download_size:
369 data = data[:(self.download_size - self.downloaded)]
371 while len(self.overwrites) > 0:
372 (start, end) = self.overwrites[0]
373 if start >= next_downloaded:
374 # This and all remaining overwrites are after the data we just downloaded.
376 if start > self.downloaded:
377 # The data we just downloaded has been partially overwritten.
378 # Write the prefix of it that precedes the overwritten region.
379 self.f.seek(self.downloaded)
380 self.f.write(data[:(start - self.downloaded)])
382 # This merges consecutive overwrites if possible, which allows us to detect the
383 # case where the download can be stopped early because the remaining region
384 # to download has already been fully overwritten.
385 heapq.heappop(self.overwrites)
386 while len(self.overwrites) > 0:
387 (start1, end1) = self.overwrites[0]
391 heapq.heappop(self.overwrites)
393 if end >= next_downloaded:
394 # This overwrite extends past the downloaded data, so there is no
395 # more data to consider on this call.
396 heapq.heappush(self.overwrites, (next_downloaded, end))
397 self._update_downloaded(next_downloaded)
399 elif end >= self.downloaded:
400 data = data[(end - self.downloaded):]
401 self._update_downloaded(end)
403 self.f.seek(self.downloaded)
405 self._update_downloaded(next_downloaded)
407 def _update_downloaded(self, new_downloaded):
408 self.downloaded = new_downloaded
409 milestone = new_downloaded
410 if len(self.overwrites) > 0:
411 (start, end) = self.overwrites[0]
412 if start <= new_downloaded and end > milestone:
415 while len(self.milestones) > 0:
416 (next, d) = self.milestones[0]
419 if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
420 heapq.heappop(self.milestones)
421 eventually_callback(d)(None)
423 if milestone >= self.download_size:
426 def overwrite(self, offset, data):
427 if noisy: self.log(".overwrite(%r, %r)" % (offset, data), level=NOISY)
428 if offset > self.download_size and offset > self.current_size:
429 # Normally writing at an offset beyond the current end-of-file
430 # would leave a hole that appears filled with zeroes. However, an
431 # EncryptedTemporaryFile doesn't behave like that (if there is a
432 # hole in the file on disk, the zeroes that are read back will be
433 # XORed with the keystream). So we must explicitly write zeroes in
434 # the gap between the current EOF and the offset.
436 self.f.seek(self.current_size)
437 self.f.write("\x00" * (offset - self.current_size))
441 end = offset + len(data)
442 self.current_size = max(self.current_size, end)
443 if end > self.downloaded:
444 heapq.heappush(self.overwrites, (offset, end))
446 def read(self, offset, length):
447 """When the data has been read, callback the Deferred that we return with this data.
448 Otherwise errback the Deferred that we return.
449 The caller must perform no more overwrites until the Deferred has fired."""
451 if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
452 if offset >= self.current_size:
453 def _eof(): raise EOFError("read past end of file")
454 return defer.execute(_eof)
456 if offset + length > self.current_size:
457 length = self.current_size - offset
459 needed = min(offset + length, self.download_size)
460 d = self.when_reached(needed)
462 # It is not necessarily the case that self.downloaded >= needed, because
463 # the file might have been truncated (thus truncating the download) and
466 assert self.current_size >= offset + length, (self.current_size, offset, length)
467 if noisy: self.log("self.f = %r" % (self.f,), level=NOISY)
469 return self.f.read(length)
470 d.addCallback(_reached)
473 def when_reached(self, index):
474 if noisy: self.log(".when_reached(%r)" % (index,), level=NOISY)
475 if index <= self.downloaded: # already reached
476 if noisy: self.log("already reached %r" % (index,), level=NOISY)
477 return defer.succeed(None)
480 if noisy: self.log("reached %r" % (index,), level=NOISY)
482 d.addCallback(_reached)
483 heapq.heappush(self.milestones, (index, d))
490 while len(self.milestones) > 0:
491 (next, d) = self.milestones[0]
492 if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
493 heapq.heappop(self.milestones)
494 # The callback means that the milestone has been reached if
495 # it is ever going to be. Note that the file may have been
496 # truncated to before the milestone.
497 eventually_callback(d)(None)
499 # FIXME: causes spurious failures
500 #self.unregisterProducer()
506 def unregisterProducer(self):
508 self.producer.stopProducing()
512 SIZE_THRESHOLD = 1000
514 def _make_sftp_file(check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
515 if noisy: logmsg("_make_sftp_file(%r, %r, %r, parent=%r, childname=%r, filenode=%r, metadata=%r" %
516 (check_abort, flags, convergence, parent, childname, filenode, metadata), NOISY)
518 if not (flags & (FXF_WRITE | FXF_CREAT)) and (flags & FXF_READ) and filenode and \
519 not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
520 return ShortReadOnlySFTPFile(filenode, metadata)
522 return GeneralSFTPFile(check_abort, flags, convergence,
523 parent=parent, childname=childname, filenode=filenode, metadata=metadata)
526 class ShortReadOnlySFTPFile(PrefixingLogMixin):
527 implements(ISFTPFile)
528 """I represent a file handle to a particular file on an SFTP connection.
529 I am used only for short immutable files opened in read-only mode.
530 The file contents are downloaded to memory when I am created."""
532 def __init__(self, filenode, metadata):
533 PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
534 if noisy: self.log(".__init__(%r, %r)" % (filenode, metadata), level=NOISY)
536 assert IFileNode.providedBy(filenode), filenode
537 self.filenode = filenode
538 self.metadata = metadata
539 self.async = download_to_data(filenode)
542 def readChunk(self, offset, length):
543 self.log(".readChunk(%r, %r)" % (offset, length), level=OPERATIONAL)
546 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
547 return defer.execute(_closed)
551 if noisy: self.log("_read(%r) in readChunk(%r, %r)" % (data, offset, length), level=NOISY)
553 # "In response to this request, the server will read as many bytes as it
554 # can from the file (up to 'len'), and return them in a SSH_FXP_DATA
555 # message. If an error occurs or EOF is encountered before reading any
556 # data, the server will respond with SSH_FXP_STATUS. For normal disk
557 # files, it is guaranteed that this will read the specified number of
558 # bytes, or up to end of file."
560 # i.e. we respond with an EOF error iff offset is already at EOF.
562 if offset >= len(data):
563 eventually_errback(d)(SFTPError(FX_EOF, "read at or past end of file"))
565 eventually_callback(d)(data[offset:min(offset+length, len(data))])
567 self.async.addCallbacks(_read, eventually_errback(d))
570 def writeChunk(self, offset, data):
571 self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
573 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
574 return defer.execute(_denied)
577 self.log(".close()", level=OPERATIONAL)
580 return defer.succeed(None)
583 self.log(".getAttrs()", level=OPERATIONAL)
586 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
587 return defer.execute(_closed)
589 return defer.succeed(_populate_attrs(self.filenode, self.metadata, False))
591 def setAttrs(self, attrs):
592 self.log(".setAttrs(%r)" % (attrs,), level=OPERATIONAL)
593 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
594 return defer.execute(_denied)
597 class GeneralSFTPFile(PrefixingLogMixin):
598 implements(ISFTPFile)
599 """I represent a file handle to a particular file on an SFTP connection.
600 I wrap an instance of OverwriteableFileConsumer, which is responsible for
601 storing the file contents. In order to allow write requests to be satisfied
602 immediately, there is effectively a FIFO queue between requests made to this
603 file handle, and requests to my OverwriteableFileConsumer. This queue is
604 implemented by the callback chain of self.async."""
606 def __init__(self, check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
607 PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
608 if noisy: self.log(".__init__(%r, %r, %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
609 (check_abort, flags, convergence, parent, childname, filenode, metadata), level=NOISY)
611 self.check_abort = check_abort
613 self.convergence = convergence
615 self.childname = childname
616 self.filenode = filenode
617 self.metadata = metadata
618 self.async = defer.succeed(None)
621 # self.consumer should only be relied on in callbacks for self.async, since it might
622 # not be set before then.
624 tempfile_maker = EncryptedTemporaryFile
626 if (flags & FXF_TRUNC) or not filenode:
627 # We're either truncating or creating the file, so we don't need the old contents.
628 assert flags & FXF_CREAT, flags
629 self.consumer = OverwriteableFileConsumer(self.check_abort, 0, tempfile_maker)
630 self.consumer.finish()
632 assert IFileNode.providedBy(filenode), filenode
634 # TODO: use download interface described in #993 when implemented.
635 if filenode.is_mutable():
636 self.async.addCallback(lambda ign: filenode.download_best_version())
637 def _downloaded(data):
638 self.consumer = OverwriteableFileConsumer(self.check_abort, len(data), tempfile_maker)
639 self.consumer.write(data)
640 self.consumer.finish()
642 self.async.addCallback(_downloaded)
644 download_size = filenode.get_size()
645 assert download_size is not None
646 self.consumer = OverwriteableFileConsumer(self.check_abort, download_size, tempfile_maker)
647 self.async.addCallback(lambda ign: filenode.read(self.consumer, 0, None))
649 def readChunk(self, offset, length):
650 self.log(".readChunk(%r, %r)" % (offset, length), level=OPERATIONAL)
652 if not (self.flags & FXF_READ):
653 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
654 return defer.execute(_denied)
657 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
658 return defer.execute(_closed)
662 if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
663 d2 = self.consumer.read(offset, length)
664 d2.addErrback(_raise_error)
665 d2.addCallbacks(eventually_callback(d), eventually_errback(d))
666 # It is correct to drop d2 here.
668 self.async.addCallbacks(_read, eventually_errback(d))
671 def writeChunk(self, offset, data):
672 self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
674 if not (self.flags & FXF_WRITE):
675 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
676 return defer.execute(_denied)
679 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
680 return defer.execute(_closed)
682 # Note that we return without waiting for the write to occur. Reads and
683 # close wait for prior writes, and will fail if any prior operation failed.
684 # This is ok because SFTP makes no guarantee that the request completes
685 # before the write. In fact it explicitly allows write errors to be delayed
687 # "One should note that on some server platforms even a close can fail.
688 # This can happen e.g. if the server operating system caches writes,
689 # and an error occurs while flushing cached writes during the close."
692 # FXF_APPEND means that we should always write at the current end of file.
693 write_offset = offset
694 if self.flags & FXF_APPEND:
695 write_offset = self.consumer.get_current_size()
697 self.consumer.overwrite(write_offset, data)
699 self.async.addCallback(_write)
700 # don't addErrback to self.async, just allow subsequent async ops to fail.
701 return defer.succeed(None)
704 self.log(".close()", level=OPERATIONAL)
707 return defer.succeed(None)
709 # This means that close has been called, not that the close has succeeded.
712 if not (self.flags & (FXF_WRITE | FXF_CREAT)):
713 return defer.execute(self.consumer.close)
716 d2 = self.consumer.when_done()
717 if self.filenode and self.filenode.is_mutable():
718 d2.addCallback(lambda ign: self.consumer.get_current_size())
719 d2.addCallback(lambda size: self.consumer.read(0, size))
720 d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
721 #elif (self.flags & FXF_EXCL) and self.consumer.get_current_size() == 0:
722 # # The file will already have been written by the open call, so we can
723 # # optimize out the extra directory write (useful for zero-length lockfiles).
727 self.log("_add_file childname=%r" % (self.childname,), level=OPERATIONAL)
728 u = FileHandle(self.consumer.get_file(), self.convergence)
729 return self.parent.add_file(self.childname, u)
730 d2.addCallback(_add_file)
732 d2.addCallback(lambda ign: self.consumer.close())
734 self.async.addCallback(_close)
737 self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
741 self.log(".getAttrs()", level=OPERATIONAL)
744 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
745 return defer.execute(_closed)
747 # Optimization for read-only handles, when we already know the metadata.
748 if not(self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
749 return defer.succeed(_populate_attrs(self.filenode, self.metadata, False))
753 # FIXME: pass correct value for writeable
754 # self.filenode might be None, but that's ok.
755 attrs = _populate_attrs(self.filenode, self.metadata, False,
756 size=self.consumer.get_current_size())
757 eventually_callback(d)(attrs)
759 self.async.addCallbacks(_get, eventually_errback(d))
762 def setAttrs(self, attrs):
763 self.log(".setAttrs(attrs) %r" % (attrs,), level=OPERATIONAL)
765 if not (self.flags & FXF_WRITE):
766 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
767 return defer.execute(_denied)
770 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
771 return defer.execute(_closed)
773 if not "size" in attrs:
774 return defer.succeed(None)
777 if not isinstance(size, (int, long)) or size < 0:
778 def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
779 return defer.execute(_bad)
783 self.consumer.set_current_size(size)
784 eventually_callback(d)(None)
786 self.async.addCallbacks(_resize, eventually_errback(d))
790 def __init__(self, items):
799 class SFTPHandler(PrefixingLogMixin):
800 implements(ISFTPServer)
801 def __init__(self, user):
802 PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
803 if noisy: self.log(".__init__(%r)" % (user,), level=NOISY)
805 self.check_abort = user.check_abort
806 self.client = user.client
807 self.root = user.root
808 self.username = user.username
809 self.convergence = user.convergence
811 def gotVersion(self, otherVersion, extData):
812 self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
815 def openFile(self, pathstring, flags, attrs):
816 self.log(".openFile(%r, %r = %r, %r)" % (pathstring, flags, _repr_flags(flags), attrs), level=OPERATIONAL)
818 # This is used for both reading and writing.
819 # First exclude invalid combinations of flags.
821 # /usr/bin/sftp 'get' gives us FXF_READ, while 'put' on a new file
822 # gives FXF_WRITE | FXF_CREAT | FXF_TRUNC. I'm guessing that 'put' on an
823 # existing file gives the same.
825 if not (flags & (FXF_READ | FXF_WRITE)):
826 raise SFTPError(FX_BAD_MESSAGE,
827 "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
829 if not (flags & FXF_CREAT):
830 if flags & FXF_TRUNC:
831 raise SFTPError(FX_BAD_MESSAGE,
832 "invalid file open flags: FXF_TRUNC cannot be set without FXF_CREAT")
834 raise SFTPError(FX_BAD_MESSAGE,
835 "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
837 path = self._path_from_string(pathstring)
839 raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
841 # The combination of flags is potentially valid. Now there are two major cases:
843 # 1. The path is specified as /uri/FILECAP, with no parent directory.
844 # If the FILECAP is mutable and writeable, then we can open it in write-only
845 # or read/write mode (non-exclusively), otherwise we can only open it in
846 # read-only mode. The open should succeed immediately as long as FILECAP is
847 # a valid known filecap that grants the required permission.
849 # 2. The path is specified relative to a parent. We find the parent dirnode and
850 # get the child's URI and metadata if it exists. There are four subcases:
851 # a. the child does not exist: FXF_CREAT must be set, and we must be able
852 # to write to the parent directory.
853 # b. the child exists but is not a valid known filecap: fail
854 # c. the child is mutable: if we are trying to open it write-only or
855 # read/write, then we must be able to write to the file.
856 # d. the child is immutable: if we are trying to open it write-only or
857 # read/write, then we must be able to write to the parent directory.
859 # To reduce latency, open succeeds as soon as these conditions are met, even
860 # though there might be a failure in downloading the existing file or uploading
863 # Note that the permission checks below are for more precise error reporting on
864 # the open call; later operations would fail even if we did not make these checks.
866 stash = {'parent': None}
867 d = self._get_root(path)
868 def _got_root((root, path)):
869 if root.is_unknown():
870 raise SFTPError(FX_PERMISSION_DENIED,
871 "cannot open an unknown cap (or child of an unknown directory). "
872 "Upgrading the gateway to a later Tahoe-LAFS version may help")
875 if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
876 if not IFileNode.providedBy(root):
877 raise SFTPError(FX_PERMISSION_DENIED,
878 "cannot open a directory cap")
879 if (flags & FXF_WRITE) and root.is_readonly():
880 raise SFTPError(FX_PERMISSION_DENIED,
881 "cannot write to a non-writeable filecap without a parent directory")
883 raise SFTPError(FX_PERMISSION_DENIED,
884 "cannot create a file exclusively when it already exists")
886 return _make_sftp_file(self.check_abort, flags, self.convergence, filenode=root)
890 if noisy: self.log("case 2: root = %r, childname = %r, path[:-1] = %r" %
891 (root, childname, path[:-1]), level=NOISY)
892 d2 = root.get_child_at_path(path[:-1])
893 def _got_parent(parent):
894 if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
895 stash['parent'] = parent
898 # FXF_EXCL means that the link to the file (not the file itself) must
899 # be created atomically wrt updates by this storage client.
900 # That is, we need to create the link before returning success to the
901 # SFTP open request (and not just on close, as would normally be the
902 # case). We make the link initially point to a zero-length LIT file,
903 # which is consistent with what might happen on a POSIX filesystem.
905 if parent.is_readonly():
906 raise SFTPError(FX_PERMISSION_DENIED,
907 "cannot create a file exclusively when the parent directory is read-only")
909 # 'overwrite=False' ensures failure if the link already exists.
910 # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
911 zero_length_lit = "URI:LIT:"
912 d3 = parent.set_uri(childname, None, zero_length_lit, overwrite=False)
913 def _seturi_done(child):
914 stash['child'] = child
915 return parent.get_metadata_for(childname)
916 d3.addCallback(_seturi_done)
917 d3.addCallback(lambda metadata: (stash['child'], metadata))
920 if noisy: self.log("get_child_and_metadata(%r)" % (childname,), level=NOISY)
921 return parent.get_child_and_metadata(childname)
922 d2.addCallback(_got_parent)
924 def _got_child( (filenode, metadata) ):
925 if noisy: self.log("_got_child( (%r, %r) )" % (filenode, metadata), level=NOISY)
926 parent = stash['parent']
927 if filenode.is_unknown():
928 raise SFTPError(FX_PERMISSION_DENIED,
929 "cannot open an unknown cap. Upgrading the gateway "
930 "to a later Tahoe-LAFS version may help")
931 if not IFileNode.providedBy(filenode):
932 raise SFTPError(FX_PERMISSION_DENIED,
933 "cannot open a directory as if it were a file")
934 if (flags & FXF_WRITE) and filenode.is_mutable() and filenode.is_readonly():
935 raise SFTPError(FX_PERMISSION_DENIED,
936 "cannot open a read-only mutable file for writing")
937 if (flags & FXF_WRITE) and parent.is_readonly():
938 raise SFTPError(FX_PERMISSION_DENIED,
939 "cannot open a file for writing when the parent directory is read-only")
941 return _make_sftp_file(self.check_abort, flags, self.convergence, parent=parent,
942 childname=childname, filenode=filenode, metadata=metadata)
944 if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
945 f.trap(NoSuchChildError)
946 parent = stash['parent']
949 if not (flags & FXF_CREAT):
950 raise SFTPError(FX_NO_SUCH_FILE,
951 "the file does not exist, and was not opened with the creation (CREAT) flag")
952 if parent.is_readonly():
953 raise SFTPError(FX_PERMISSION_DENIED,
954 "cannot create a file when the parent directory is read-only")
956 return _make_sftp_file(self.check_abort, flags, self.convergence, parent=parent,
958 d2.addCallbacks(_got_child, _no_child)
960 d.addCallback(_got_root)
961 d.addErrback(_raise_error)
964 def removeFile(self, pathstring):
965 self.log(".removeFile(%r)" % (pathstring,), level=OPERATIONAL)
967 path = self._path_from_string(pathstring)
968 return self._remove_object(path, must_be_file=True)
970 def renameFile(self, oldpathstring, newpathstring):
971 self.log(".renameFile(%r, %r)" % (oldpathstring, newpathstring), level=OPERATIONAL)
973 fromPath = self._path_from_string(oldpathstring)
974 toPath = self._path_from_string(newpathstring)
976 # the target directory must already exist
977 d = deferredutil.gatherResults([self._get_parent(fromPath),
978 self._get_parent(toPath)])
979 def _got( (fromPair, toPair) ):
980 if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r)" %
981 (fromPair, toPair, oldpathstring, newpathstring), level=NOISY)
982 (fromParent, fromChildname) = fromPair
983 (toParent, toChildname) = toPair
985 # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
986 # "It is an error if there already exists a file with the name specified
988 # FIXME: use move_child_to_path to avoid possible data loss due to #943
989 d = fromParent.move_child_to(fromChildname, toParent, toChildname, overwrite=False)
990 #d = parent.move_child_to_path(fromChildname, toRoot, toPath[:-1],
991 # toPath[-1], overwrite=False)
994 d.addErrback(_raise_error)
997 def makeDirectory(self, pathstring, attrs):
998 self.log(".makeDirectory(%r, %r)" % (pathstring, attrs), level=OPERATIONAL)
1000 path = self._path_from_string(pathstring)
1001 metadata = self._attrs_to_metadata(attrs)
1002 d = self._get_root(path)
1003 d.addCallback(lambda (root, path):
1004 self._get_or_create_directories(root, path, metadata))
1005 d.addErrback(_raise_error)
1008 def _get_or_create_directories(self, node, path, metadata):
1009 if not IDirectoryNode.providedBy(node):
1010 # unfortunately it is too late to provide the name of the
1011 # blocking file in the error message.
1012 raise SFTPError(FX_PERMISSION_DENIED,
1013 "cannot create directory because there "
1014 "is a file in the way") # close enough
1016 return defer.succeed(node)
1017 d = node.get(path[0])
1018 def _maybe_create(f):
1019 f.trap(NoSuchChildError)
1020 return node.create_subdirectory(path[0])
1021 d.addErrback(_maybe_create)
1022 d.addCallback(self._get_or_create_directories, path[1:], metadata)
1023 d.addErrback(_raise_error)
1026 def removeDirectory(self, pathstring):
1027 self.log(".removeDirectory(%r)" % (pathstring,), level=OPERATIONAL)
1029 path = self._path_from_string(pathstring)
1030 return self._remove_object(path, must_be_directory=True)
1032 def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1033 d = defer.maybeDeferred(self._get_parent, path)
1034 def _got_parent( (parent, childname) ):
1035 d2 = parent.get(childname)
1036 def _got_child(child):
1037 # Unknown children can be removed by either removeFile or removeDirectory.
1038 if must_be_directory and IFileNode.providedBy(child):
1039 raise SFTPError(FX_PERMISSION_DENIED, "rmdir called on a file")
1040 if must_be_file and IDirectoryNode.providedBy(child):
1041 raise SFTPError(FX_PERMISSION_DENIED, "rmfile called on a directory")
1042 return parent.delete(childname)
1043 d2.addCallback(_got_child)
1045 d.addCallback(_got_parent)
1046 d.addErrback(_raise_error)
1049 def openDirectory(self, pathstring):
1050 self.log(".openDirectory(%r)" % (pathstring,), level=OPERATIONAL)
1052 path = self._path_from_string(pathstring)
1053 d = self._get_node_and_metadata_for_path(path)
1054 def _list( (dirnode, metadata) ):
1055 if dirnode.is_unknown():
1056 raise SFTPError(FX_PERMISSION_DENIED,
1057 "cannot list an unknown cap as a directory. Upgrading the gateway "
1058 "to a later Tahoe-LAFS version may help")
1059 if not IDirectoryNode.providedBy(dirnode):
1060 raise SFTPError(FX_PERMISSION_DENIED,
1061 "cannot list a file as if it were a directory")
1063 def _render(children):
1064 parent_writeable = not dirnode.is_readonly()
1066 for filename, (node, metadata) in children.iteritems():
1067 # The file size may be cached or absent.
1068 writeable = parent_writeable and (node.is_unknown() or
1069 not (node.is_mutable() and node.is_readonly()))
1070 attrs = _populate_attrs(node, metadata, writeable)
1071 filename_utf8 = filename.encode('utf-8')
1072 longname = _lsLine(filename_utf8, attrs)
1073 results.append( (filename_utf8, longname, attrs) )
1074 return StoppableList(results)
1075 d2.addCallback(_render)
1077 d.addCallback(_list)
1078 d.addErrback(_raise_error)
1081 def getAttrs(self, pathstring, followLinks):
1082 self.log(".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks), level=OPERATIONAL)
1084 d = self._get_node_and_metadata_for_path(self._path_from_string(pathstring))
1085 def _render( (node, metadata) ):
1086 # When asked about a specific file, report its current size.
1087 # TODO: the modification time for a mutable file should be
1088 # reported as the update time of the best version. But that
1089 # information isn't currently stored in mutable shares, I think.
1090 d2 = node.get_current_size()
1091 def _got_size(size):
1092 # FIXME: pass correct value for writeable
1093 attrs = _populate_attrs(node, metadata, False, size=size)
1095 d2.addCallback(_got_size)
1097 d.addCallback(_render)
1098 d.addErrback(_raise_error)
1101 def setAttrs(self, pathstring, attrs):
1102 self.log(".setAttrs(%r, %r)" % (pathstring, attrs), level=OPERATIONAL)
1105 # this would require us to download and re-upload the truncated/extended
1107 raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute")
1110 def readLink(self, pathstring):
1111 self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1113 raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1115 def makeLink(self, linkPathstring, targetPathstring):
1116 self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1118 raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1120 def extendedRequest(self, extendedName, extendedData):
1121 self.log(".extendedRequest(%r, %r)" % (extendedName, extendedData), level=OPERATIONAL)
1123 # A client 'df' command requires the 'statvfs@openssh.com' extension,
1124 # but there's little point to implementing that since we would only
1125 # have faked values to report.
1126 raise SFTPError(FX_OP_UNSUPPORTED, "extendedRequest %r" % extendedName)
1128 def realPath(self, pathstring):
1129 self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1131 path_utf8 = [p.encode('utf-8') for p in self._path_from_string(pathstring)]
1132 return "/" + "/".join(path_utf8)
1134 def _path_from_string(self, pathstring):
1135 if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1137 # The home directory is the root directory.
1138 pathstring = pathstring.strip("/")
1139 if pathstring == "" or pathstring == ".":
1142 path_utf8 = pathstring.split("/")
1144 # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1145 # "Servers SHOULD interpret a path name component ".." as referring to
1146 # the parent directory, and "." as referring to the current directory."
1148 for p_utf8 in path_utf8:
1150 # ignore excess .. components at the root
1155 p = p_utf8.decode('utf-8', 'strict')
1156 except UnicodeError:
1157 raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1160 if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1163 def _get_node_and_metadata_for_path(self, path):
1164 d = self._get_root(path)
1165 def _got_root( (root, path) ):
1166 if noisy: self.log("_got_root( (%r, %r) )" % (root, path), level=NOISY)
1168 return root.get_child_and_metadata_at_path(path)
1171 d.addCallback(_got_root)
1174 def _get_root(self, path):
1175 # return (root, remaining_path)
1176 if path and path[0] == u"uri":
1177 d = defer.maybeDeferred(self.client.create_node_from_uri, path[1].encode('utf-8'))
1178 d.addCallback(lambda root: (root, path[2:]))
1180 d = defer.succeed((self.root, path))
1183 def _get_parent(self, path):
1184 # fire with (parentnode, childname)
1186 def _nosuch(): raise SFTPError(FX_NO_SUCH_FILE, "path does not exist")
1187 return defer.execute(_nosuch)
1189 childname = path[-1]
1190 assert isinstance(childname, unicode), repr(childname)
1191 d = self._get_root(path)
1192 def _got_root( (root, path) ):
1194 raise SFTPError(FX_NO_SUCH_FILE, "path does not exist")
1195 return root.get_child_at_path(path[:-1])
1196 d.addCallback(_got_root)
1197 def _got_parent(parent):
1198 return (parent, childname)
1199 d.addCallback(_got_parent)
1202 def _attrs_to_metadata(self, attrs):
1206 if key == "mtime" or key == "ctime" or key == "createtime":
1207 metadata[key] = long(attrs[key])
1208 elif key.startswith("ext_"):
1209 metadata[key] = str(attrs[key])
1214 class SFTPUser(ConchUser, PrefixingLogMixin):
1215 implements(ISession)
1216 def __init__(self, check_abort, client, rootnode, username, convergence):
1217 ConchUser.__init__(self)
1218 PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1220 self.channelLookup["session"] = session.SSHSession
1221 self.subsystemLookup["sftp"] = FileTransferServer
1223 self.check_abort = check_abort
1224 self.client = client
1225 self.root = rootnode
1226 self.username = username
1227 self.convergence = convergence
1229 def getPty(self, terminal, windowSize, attrs):
1230 self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1231 raise NotImplementedError
1233 def openShell(self, protocol):
1234 self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1235 raise NotImplementedError
1237 def execCommand(self, protocol, cmd):
1238 self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1239 raise NotImplementedError
1241 def windowChanged(self, newWindowSize):
1242 self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1245 self.log(".eofReceived()", level=OPERATIONAL)
1248 self.log(".closed()", level=OPERATIONAL)
1251 # if you have an SFTPUser, and you want something that provides ISFTPServer,
1252 # then you get SFTPHandler(user)
1253 components.registerAdapter(SFTPHandler, SFTPUser, ISFTPServer)
1255 from auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1258 implements(portal.IRealm)
1259 def __init__(self, client):
1260 self.client = client
1262 def requestAvatar(self, avatarID, mind, interface):
1263 assert interface == IConchUser
1264 rootnode = self.client.create_node_from_uri(avatarID.rootcap)
1265 convergence = self.client.convergence
1266 logged_out = {'flag': False}
1268 return logged_out['flag']
1270 logged_out['flag'] = True
1271 s = SFTPUser(check_abort, self.client, rootnode, avatarID.username, convergence)
1272 return (interface, s, logout)
1274 class SFTPServer(service.MultiService):
1275 def __init__(self, client, accountfile, accounturl,
1276 sftp_portstr, pubkey_file, privkey_file):
1277 service.MultiService.__init__(self)
1279 r = Dispatcher(client)
1280 p = portal.Portal(r)
1283 c = AccountFileChecker(self, accountfile)
1284 p.registerChecker(c)
1286 c = AccountURLChecker(self, accounturl)
1287 p.registerChecker(c)
1288 if not accountfile and not accounturl:
1289 # we could leave this anonymous, with just the /uri/CAP form
1290 raise NeedRootcapLookupScheme("must provide some translation")
1292 pubkey = keys.Key.fromFile(pubkey_file)
1293 privkey = keys.Key.fromFile(privkey_file)
1294 class SSHFactory(factory.SSHFactory):
1295 publicKeys = {pubkey.sshType(): pubkey}
1296 privateKeys = {privkey.sshType(): privkey}
1297 def getPrimes(self):
1299 # if present, this enables diffie-hellman-group-exchange
1300 return primes.parseModuliFile("/etc/ssh/moduli")
1307 s = strports.service(sftp_portstr, f)
1308 s.setServiceParent(self)