2 import os, tempfile, heapq, binascii, traceback, array, stat, struct
3 from stat import S_IFREG, S_IFDIR
4 from time import time, strftime, localtime
6 from zope.interface import implements
7 from twisted.python import components
8 from twisted.application import service, strports
9 from twisted.conch.ssh import factory, keys, session
10 from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
11 FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
12 FX_BAD_MESSAGE, FX_FAILURE, FX_OK
13 from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
14 FXF_CREAT, FXF_TRUNC, FXF_EXCL
15 from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
16 from twisted.conch.avatar import ConchUser
17 from twisted.conch.openssh_compat import primes
18 from twisted.cred import portal
19 from twisted.internet.error import ProcessDone, ProcessTerminated
20 from twisted.python.failure import Failure
21 from twisted.internet.interfaces import ITransport
23 from twisted.internet import defer
24 from twisted.internet.interfaces import IFinishableConsumer
25 from foolscap.api import eventually
26 from allmydata.util import deferredutil
28 from allmydata.util.consumer import download_to_data
29 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
30 NoSuchChildError, ChildOfWrongTypeError
31 from allmydata.mutable.common import NotWriteableError
32 from allmydata.immutable.upload import FileHandle
34 from pycryptopp.cipher.aes import AES
37 use_foolscap_logging = True
39 from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
40 msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
42 if use_foolscap_logging:
43 (logmsg, logerr, PrefixingLogMixin) = (_msg, _err, _PrefixingLogMixin)
44 else: # pragma: no cover
45 def logmsg(s, level=None):
47 def logerr(s, level=None):
49 class PrefixingLogMixin:
50 def __init__(self, facility=None, prefix=''):
52 def log(self, s, level=None):
53 print "%r %s" % (self.prefix, s)
56 def eventually_callback(d):
57 return lambda res: eventually(d.callback, res)
59 def eventually_errback(d):
60 return lambda err: eventually(d.errback, err)
64 if isinstance(x, unicode):
65 return x.encode('utf-8')
66 if isinstance(x, str):
72 """SFTP times are unsigned 32-bit integers representing UTC seconds
73 (ignoring leap seconds) since the Unix epoch, January 1 1970 00:00 UTC.
74 A Tahoe time is the corresponding float."""
75 return long(t) & 0xFFFFFFFFL
78 def _convert_error(res, request):
79 if not isinstance(res, Failure):
81 if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
82 logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
86 logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
88 if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
89 except: # pragma: no cover
92 # The message argument to SFTPError must not reveal information that
93 # might compromise anonymity.
95 if err.check(SFTPError):
96 # original raiser of SFTPError has responsibility to ensure anonymity
98 if err.check(NoSuchChildError):
99 childname = _utf8(err.value.args[0])
100 raise SFTPError(FX_NO_SUCH_FILE, childname)
101 if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
102 msg = _utf8(err.value.args[0])
103 raise SFTPError(FX_PERMISSION_DENIED, msg)
104 if err.check(ExistingChildError):
105 # Versions of SFTP after v3 (which is what twisted.conch implements)
106 # define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
107 # However v3 doesn't; instead, other servers such as sshd return
108 # FX_FAILURE. The gvfs SFTP backend, for example, depends on this
109 # to translate the error to the equivalent of POSIX EEXIST, which is
110 # necessary for some picky programs (such as gedit).
111 msg = _utf8(err.value.args[0])
112 raise SFTPError(FX_FAILURE, msg)
113 if err.check(NotImplementedError):
114 raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
115 if err.check(EOFError):
116 raise SFTPError(FX_EOF, "end of file reached")
117 if err.check(defer.FirstError):
118 _convert_error(err.value.subFailure, request)
120 # We assume that the error message is not anonymity-sensitive.
121 raise SFTPError(FX_FAILURE, _utf8(err.value))
124 def _repr_flags(flags):
125 return "|".join([f for f in
126 [(flags & FXF_READ) and "FXF_READ" or None,
127 (flags & FXF_WRITE) and "FXF_WRITE" or None,
128 (flags & FXF_APPEND) and "FXF_APPEND" or None,
129 (flags & FXF_CREAT) and "FXF_CREAT" or None,
130 (flags & FXF_TRUNC) and "FXF_TRUNC" or None,
131 (flags & FXF_EXCL) and "FXF_EXCL" or None,
136 def _lsLine(name, attrs):
139 st_mtime = attrs.get("mtime", 0)
140 st_mode = attrs["permissions"]
141 # TODO: check that clients are okay with this being a "?".
142 # (They should be because the longname is intended for human
144 st_size = attrs.get("size", "?")
145 # We don't know how many links there really are to this object.
148 # Based on <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
149 # We can't call the version in Twisted because we might have a version earlier than
150 # <http://twistedmatrix.com/trac/changeset/25412> (released in Twisted 8.2).
153 perms = array.array('c', '-'*10)
154 ft = stat.S_IFMT(mode)
155 if stat.S_ISDIR(ft): perms[0] = 'd'
156 elif stat.S_ISREG(ft): perms[0] = '-'
159 if mode&stat.S_IRUSR: perms[1] = 'r'
160 if mode&stat.S_IWUSR: perms[2] = 'w'
161 if mode&stat.S_IXUSR: perms[3] = 'x'
163 if mode&stat.S_IRGRP: perms[4] = 'r'
164 if mode&stat.S_IWGRP: perms[5] = 'w'
165 if mode&stat.S_IXGRP: perms[6] = 'x'
167 if mode&stat.S_IROTH: perms[7] = 'r'
168 if mode&stat.S_IWOTH: perms[8] = 'w'
169 if mode&stat.S_IXOTH: perms[9] = 'x'
170 # suid/sgid never set
173 l += str(st_nlink).rjust(5) + ' '
184 if st_mtime + sixmo < now or st_mtime > now + day:
185 # mtime is more than 6 months ago, or more than one day in the future
186 l += strftime("%b %d %Y ", localtime(st_mtime))
188 l += strftime("%b %d %H:%M ", localtime(st_mtime))
193 def _is_readonly(parent_readonly, child):
194 """Whether child should be listed as having read-only permissions in parent."""
196 if child.is_unknown():
198 elif child.is_mutable():
199 return child.is_readonly()
201 return parent_readonly
204 def _populate_attrs(childnode, metadata, size=None):
207 # The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
208 # bits, otherwise the client may refuse to open a directory.
209 # Also, sshfs run as a non-root user requires files and directories
210 # to be world-readable/writeable.
212 # Directories and unknown nodes have no size, and SFTP doesn't
213 # require us to make one up.
215 # childnode might be None, meaning that the file doesn't exist yet,
216 # but we're going to write it later.
218 if childnode and childnode.is_unknown():
220 elif childnode and IDirectoryNode.providedBy(childnode):
221 perms = S_IFDIR | 0777
223 # For files, omit the size if we don't immediately know it.
224 if childnode and size is None:
225 size = childnode.get_size()
227 assert isinstance(size, (int, long)) and not isinstance(size, bool), repr(size)
229 perms = S_IFREG | 0666
232 assert 'readonly' in metadata, metadata
233 if metadata['readonly']:
234 perms &= S_IFDIR | S_IFREG | 0555 # clear 'w' bits
236 # see webapi.txt for what these times mean
237 if 'linkmotime' in metadata.get('tahoe', {}):
238 attrs['mtime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
239 elif 'mtime' in metadata:
240 # We would prefer to omit atime, but SFTP version 3 can only
241 # accept mtime if atime is also set.
242 attrs['mtime'] = _to_sftp_time(metadata['mtime'])
243 attrs['atime'] = attrs['mtime']
245 if 'linkcrtime' in metadata.get('tahoe', {}):
246 attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
248 if 'ctime' in metadata:
249 attrs['ctime'] = _to_sftp_time(metadata['ctime'])
251 attrs['permissions'] = perms
253 # twisted.conch.ssh.filetransfer only implements SFTP version 3,
254 # which doesn't include SSH_FILEXFER_ATTR_FLAGS.
259 class EncryptedTemporaryFile(PrefixingLogMixin):
260 # not implemented: next, readline, readlines, xreadlines, writelines
263 PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
264 self.file = tempfile.TemporaryFile()
265 self.key = os.urandom(16) # AES-128
267 def _crypt(self, offset, data):
268 # TODO: use random-access AES (pycryptopp ticket #18)
269 offset_big = offset // 16
270 offset_small = offset % 16
271 iv = binascii.unhexlify("%032x" % offset_big)
272 cipher = AES(self.key, iv=iv)
273 cipher.process("\x00"*offset_small)
274 return cipher.process(data)
282 def seek(self, offset, whence=os.SEEK_SET):
283 if noisy: self.log(".seek(%r, %r)" % (offset, whence), level=NOISY)
284 self.file.seek(offset, whence)
287 offset = self.file.tell()
288 if noisy: self.log(".tell() = %r" % (offset,), level=NOISY)
291 def read(self, size=-1):
292 if noisy: self.log(".read(%r)" % (size,), level=NOISY)
293 index = self.file.tell()
294 ciphertext = self.file.read(size)
295 plaintext = self._crypt(index, ciphertext)
298 def write(self, plaintext):
299 if noisy: self.log(".write(<data of length %r>)" % (len(plaintext),), level=NOISY)
300 index = self.file.tell()
301 ciphertext = self._crypt(index, plaintext)
302 self.file.write(ciphertext)
304 def truncate(self, newsize):
305 if noisy: self.log(".truncate(%r)" % (newsize,), level=NOISY)
306 self.file.truncate(newsize)
309 class OverwriteableFileConsumer(PrefixingLogMixin):
310 implements(IFinishableConsumer)
311 """I act both as a consumer for the download of the original file contents, and as a
312 wrapper for a temporary file that records the downloaded data and any overwrites.
313 I use a priority queue to keep track of which regions of the file have been overwritten
314 but not yet downloaded, so that the download does not clobber overwritten data.
315 I use another priority queue to record milestones at which to make callbacks
316 indicating that a given number of bytes have been downloaded.
318 The temporary file reflects the contents of the file that I represent, except that:
319 - regions that have neither been downloaded nor overwritten, if present,
321 - the temporary file may be shorter than the represented file (it is never longer).
322 The latter's current size is stored in self.current_size.
324 This abstraction is mostly independent of SFTP. Consider moving it, if it is found
325 useful for other frontends."""
327 def __init__(self, download_size, tempfile_maker):
328 PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
329 if noisy: self.log(".__init__(%r, %r)" % (download_size, tempfile_maker), level=NOISY)
330 self.download_size = download_size
331 self.current_size = download_size
332 self.f = tempfile_maker()
334 self.milestones = [] # empty heap of (offset, d)
335 self.overwrites = [] # empty heap of (start, end)
336 self.is_closed = False
337 self.done = self.when_reached(download_size) # adds a milestone
339 def _signal_done(ign):
340 if noisy: self.log("DONE", level=NOISY)
342 self.done.addCallback(_signal_done)
348 def get_current_size(self):
349 return self.current_size
351 def set_current_size(self, size):
352 if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
353 (size, self.current_size, self.downloaded), level=NOISY)
354 if size < self.current_size or size < self.downloaded:
355 self.f.truncate(size)
356 if size > self.current_size:
357 self.overwrite(self.current_size, "\x00" * (size - self.current_size))
358 self.current_size = size
360 # invariant: self.download_size <= self.current_size
361 if size < self.download_size:
362 self.download_size = size
363 if self.downloaded >= self.download_size:
366 def registerProducer(self, p, streaming):
367 if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
370 # call resumeProducing once to start things off
379 def write(self, data):
380 if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
384 if self.downloaded >= self.download_size:
387 next_downloaded = self.downloaded + len(data)
388 if next_downloaded > self.download_size:
389 data = data[:(self.download_size - self.downloaded)]
391 while len(self.overwrites) > 0:
392 (start, end) = self.overwrites[0]
393 if start >= next_downloaded:
394 # This and all remaining overwrites are after the data we just downloaded.
396 if start > self.downloaded:
397 # The data we just downloaded has been partially overwritten.
398 # Write the prefix of it that precedes the overwritten region.
399 self.f.seek(self.downloaded)
400 self.f.write(data[:(start - self.downloaded)])
402 # This merges consecutive overwrites if possible, which allows us to detect the
403 # case where the download can be stopped early because the remaining region
404 # to download has already been fully overwritten.
405 heapq.heappop(self.overwrites)
406 while len(self.overwrites) > 0:
407 (start1, end1) = self.overwrites[0]
411 heapq.heappop(self.overwrites)
413 if end >= next_downloaded:
414 # This overwrite extends past the downloaded data, so there is no
415 # more data to consider on this call.
416 heapq.heappush(self.overwrites, (next_downloaded, end))
417 self._update_downloaded(next_downloaded)
419 elif end >= self.downloaded:
420 data = data[(end - self.downloaded):]
421 self._update_downloaded(end)
423 self.f.seek(self.downloaded)
425 self._update_downloaded(next_downloaded)
427 def _update_downloaded(self, new_downloaded):
428 self.downloaded = new_downloaded
429 milestone = new_downloaded
430 if len(self.overwrites) > 0:
431 (start, end) = self.overwrites[0]
432 if start <= new_downloaded and end > milestone:
435 while len(self.milestones) > 0:
436 (next, d) = self.milestones[0]
439 if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
440 heapq.heappop(self.milestones)
441 eventually_callback(d)(None)
443 if milestone >= self.download_size:
446 def overwrite(self, offset, data):
447 if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
448 if offset > self.current_size:
449 # Normally writing at an offset beyond the current end-of-file
450 # would leave a hole that appears filled with zeroes. However, an
451 # EncryptedTemporaryFile doesn't behave like that (if there is a
452 # hole in the file on disk, the zeroes that are read back will be
453 # XORed with the keystream). So we must explicitly write zeroes in
454 # the gap between the current EOF and the offset.
456 self.f.seek(self.current_size)
457 self.f.write("\x00" * (offset - self.current_size))
458 start = self.current_size
464 end = offset + len(data)
465 self.current_size = max(self.current_size, end)
466 if end > self.downloaded:
467 heapq.heappush(self.overwrites, (start, end))
469 def read(self, offset, length):
470 """When the data has been read, callback the Deferred that we return with this data.
471 Otherwise errback the Deferred that we return.
472 The caller must perform no more overwrites until the Deferred has fired."""
474 if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
475 if offset >= self.current_size:
476 def _eof(): raise EOFError("read past end of file")
477 return defer.execute(_eof)
479 if offset + length > self.current_size:
480 length = self.current_size - offset
481 if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
483 needed = min(offset + length, self.download_size)
484 d = self.when_reached(needed)
486 # It is not necessarily the case that self.downloaded >= needed, because
487 # the file might have been truncated (thus truncating the download) and
490 assert self.current_size >= offset + length, (self.current_size, offset, length)
491 if noisy: self.log("self.f = %r" % (self.f,), level=NOISY)
493 return self.f.read(length)
494 d.addCallback(_reached)
497 def when_reached(self, index):
498 if noisy: self.log(".when_reached(%r)" % (index,), level=NOISY)
499 if index <= self.downloaded: # already reached
500 if noisy: self.log("already reached %r" % (index,), level=NOISY)
501 return defer.succeed(None)
504 if noisy: self.log("reached %r" % (index,), level=NOISY)
506 d.addCallback(_reached)
507 heapq.heappush(self.milestones, (index, d))
514 while len(self.milestones) > 0:
515 (next, d) = self.milestones[0]
516 if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
517 heapq.heappop(self.milestones)
518 # The callback means that the milestone has been reached if
519 # it is ever going to be. Note that the file may have been
520 # truncated to before the milestone.
521 eventually_callback(d)(None)
523 # FIXME: causes spurious failures
524 #self.unregisterProducer()
527 self.is_closed = True
529 if not self.is_closed:
532 except BaseException as e:
533 self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
535 def unregisterProducer(self):
537 self.producer.stopProducing()
541 SIZE_THRESHOLD = 1000
544 class ShortReadOnlySFTPFile(PrefixingLogMixin):
545 implements(ISFTPFile)
546 """I represent a file handle to a particular file on an SFTP connection.
547 I am used only for short immutable files opened in read-only mode.
548 The file contents are downloaded to memory when I am created."""
550 def __init__(self, userpath, filenode, metadata):
551 PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
552 if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
554 assert IFileNode.providedBy(filenode), filenode
555 self.filenode = filenode
556 self.metadata = metadata
557 self.async = download_to_data(filenode)
560 def readChunk(self, offset, length):
561 request = ".readChunk(%r, %r)" % (offset, length)
562 self.log(request, level=OPERATIONAL)
565 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
566 return defer.execute(_closed)
570 if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
572 # "In response to this request, the server will read as many bytes as it
573 # can from the file (up to 'len'), and return them in a SSH_FXP_DATA
574 # message. If an error occurs or EOF is encountered before reading any
575 # data, the server will respond with SSH_FXP_STATUS. For normal disk
576 # files, it is guaranteed that this will read the specified number of
577 # bytes, or up to end of file."
579 # i.e. we respond with an EOF error iff offset is already at EOF.
581 if offset >= len(data):
582 eventually_errback(d)(SFTPError(FX_EOF, "read at or past end of file"))
584 eventually_callback(d)(data[offset:min(offset+length, len(data))])
586 self.async.addCallbacks(_read, eventually_errback(d))
587 d.addBoth(_convert_error, request)
590 def writeChunk(self, offset, data):
591 self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
593 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
594 return defer.execute(_denied)
597 self.log(".close()", level=OPERATIONAL)
600 return defer.succeed(None)
603 request = ".getAttrs()"
604 self.log(request, level=OPERATIONAL)
607 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
608 return defer.execute(_closed)
610 d = defer.execute(_populate_attrs, self.filenode, self.metadata)
611 d.addBoth(_convert_error, request)
614 def setAttrs(self, attrs):
615 self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
616 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
617 return defer.execute(_denied)
620 class GeneralSFTPFile(PrefixingLogMixin):
621 implements(ISFTPFile)
622 """I represent a file handle to a particular file on an SFTP connection.
623 I wrap an instance of OverwriteableFileConsumer, which is responsible for
624 storing the file contents. In order to allow write requests to be satisfied
625 immediately, there is effectively a FIFO queue between requests made to this
626 file handle, and requests to my OverwriteableFileConsumer. This queue is
627 implemented by the callback chain of self.async.
629 When first constructed, I am in an 'unopened' state that causes most
630 operations to be delayed until 'open' is called."""
632 def __init__(self, userpath, flags, close_notify, convergence):
633 PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
634 if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
635 (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
637 self.userpath = userpath
639 self.close_notify = close_notify
640 self.convergence = convergence
641 self.async = defer.Deferred()
642 # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
643 self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
645 self.abandoned = False
647 self.childname = None
651 # self.consumer should only be relied on in callbacks for self.async, since it might
652 # not be set before then.
655 def open(self, parent=None, childname=None, filenode=None, metadata=None):
656 self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
657 (parent, childname, filenode, metadata), level=OPERATIONAL)
659 # If the file has been renamed, the new (parent, childname) takes precedence.
660 if self.parent is None:
662 if self.childname is None:
663 self.childname = childname
664 self.filenode = filenode
665 self.metadata = metadata
668 tempfile_maker = EncryptedTemporaryFile
670 if (self.flags & FXF_TRUNC) or not filenode:
671 # We're either truncating or creating the file, so we don't need the old contents.
672 self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
673 self.consumer.finish()
675 assert IFileNode.providedBy(filenode), filenode
677 # TODO: use download interface described in #993 when implemented.
678 if filenode.is_mutable():
679 self.async.addCallback(lambda ign: filenode.download_best_version())
680 def _downloaded(data):
681 self.consumer = OverwriteableFileConsumer(len(data), tempfile_maker)
682 self.consumer.write(data)
683 self.consumer.finish()
685 self.async.addCallback(_downloaded)
687 download_size = filenode.get_size()
688 assert download_size is not None, "download_size is None"
689 self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
691 if noisy: self.log("_read immutable", level=NOISY)
692 filenode.read(self.consumer, 0, None)
693 self.async.addCallback(_read)
695 eventually_callback(self.async)(None)
697 if noisy: self.log("open done", level=NOISY)
700 def rename(self, new_userpath, new_parent, new_childname):
701 self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
703 self.userpath = new_userpath
704 self.parent = new_parent
705 self.childname = new_childname
708 self.log(".abandon()", level=OPERATIONAL)
710 self.abandoned = True
713 self.log(".sync()", level=OPERATIONAL)
716 self.async.addBoth(eventually_callback(d))
718 if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
723 def readChunk(self, offset, length):
724 request = ".readChunk(%r, %r)" % (offset, length)
725 self.log(request, level=OPERATIONAL)
727 if not (self.flags & FXF_READ):
728 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
729 return defer.execute(_denied)
732 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
733 return defer.execute(_closed)
737 if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
738 d2 = self.consumer.read(offset, length)
739 d2.addCallbacks(eventually_callback(d), eventually_errback(d))
740 # It is correct to drop d2 here.
742 self.async.addCallbacks(_read, eventually_errback(d))
743 d.addBoth(_convert_error, request)
746 def writeChunk(self, offset, data):
747 self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
749 if not (self.flags & FXF_WRITE):
750 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
751 return defer.execute(_denied)
754 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
755 return defer.execute(_closed)
757 self.has_changed = True
759 # Note that we return without waiting for the write to occur. Reads and
760 # close wait for prior writes, and will fail if any prior operation failed.
761 # This is ok because SFTP makes no guarantee that the write completes
762 # before the request does. In fact it explicitly allows write errors to be
763 # delayed until close:
764 # "One should note that on some server platforms even a close can fail.
765 # This can happen e.g. if the server operating system caches writes,
766 # and an error occurs while flushing cached writes during the close."
769 if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
770 (offset, len(data), self.consumer.get_current_size()), level=NOISY)
771 # FXF_APPEND means that we should always write at the current end of file.
772 write_offset = offset
773 if self.flags & FXF_APPEND:
774 write_offset = self.consumer.get_current_size()
776 self.consumer.overwrite(write_offset, data)
777 if noisy: self.log("overwrite done", level=NOISY)
779 self.async.addCallback(_write)
780 # don't addErrback to self.async, just allow subsequent async ops to fail.
781 return defer.succeed(None)
785 self.log(request, level=OPERATIONAL)
788 return defer.succeed(None)
790 # This means that close has been called, not that the close has succeeded.
793 if not (self.flags & (FXF_WRITE | FXF_CREAT)):
794 def _readonly_close():
796 self.consumer.close()
797 return defer.execute(_readonly_close)
799 # We must capture the abandoned, parent, and childname variables synchronously
800 # at the close call. This is needed by the correctness arguments in the comments
801 # for _abandon_any_heisenfiles and _rename_heisenfiles.
802 abandoned = self.abandoned
804 childname = self.childname
806 # has_changed is set when writeChunk is called, not when the write occurs, so
807 # it is correct to optimize out the commit if it is False at the close call.
808 has_changed = self.has_changed
811 if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
813 self.consumer.close()
815 # We must close_notify before re-firing self.async.
816 if self.close_notify:
817 self.close_notify(self.userpath, self.parent, self.childname, self)
821 d2 = self.consumer.when_done()
822 if self.filenode and self.filenode.is_mutable():
823 self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
824 d2.addCallback(lambda ign: self.consumer.get_current_size())
825 d2.addCallback(lambda size: self.consumer.read(0, size))
826 d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
829 self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
830 u = FileHandle(self.consumer.get_file(), self.convergence)
831 return parent.add_file(childname, u)
832 d2.addCallback(_add_file)
834 d2.addBoth(_committed)
839 # If the file has been abandoned, we don't want the close operation to get "stuck",
840 # even if self.async fails to re-fire. Doing the close independently of self.async
841 # in that case ensures that dropping an ssh connection is sufficient to abandon
842 # any heisenfiles that were not explicitly closed in that connection.
843 if abandoned or not has_changed:
844 d.addCallback(_committed)
846 self.async.addCallback(_close)
848 self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
849 d.addBoth(_convert_error, request)
853 request = ".getAttrs()"
854 self.log(request, level=OPERATIONAL)
857 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
858 return defer.execute(_closed)
860 # Optimization for read-only handles, when we already know the metadata.
861 if not(self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
862 return defer.succeed(_populate_attrs(self.filenode, self.metadata))
866 # self.filenode might be None, but that's ok.
867 attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
868 eventually_callback(d)(attrs)
870 self.async.addCallbacks(_get, eventually_errback(d))
871 d.addBoth(_convert_error, request)
874 def setAttrs(self, attrs):
875 request = ".setAttrs(attrs) %r" % (attrs,)
876 self.log(request, level=OPERATIONAL)
878 if not (self.flags & FXF_WRITE):
879 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
880 return defer.execute(_denied)
883 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
884 return defer.execute(_closed)
886 if not "size" in attrs:
887 return defer.succeed(None)
890 if not isinstance(size, (int, long)) or size < 0:
891 def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
892 return defer.execute(_bad)
896 self.consumer.set_current_size(size)
897 eventually_callback(d)(None)
899 self.async.addCallbacks(_resize, eventually_errback(d))
900 d.addBoth(_convert_error, request)
905 def __init__(self, items):
915 def __init__(self, value):
919 # A "heisenfile" is a file that has been opened with write flags
920 # (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
921 # 'all_heisenfiles' maps from a direntry string to
922 # (list_of_GeneralSFTPFile, open_time_utc).
923 # A direntry string is parent_write_uri + "/" + childname_utf8 for
924 # an immutable file, or file_write_uri for a mutable file.
925 # Updates to this dict are single-threaded.
930 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
931 implements(ISFTPServer)
932 def __init__(self, client, rootnode, username):
933 ConchUser.__init__(self)
934 PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
935 if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
937 self.channelLookup["session"] = session.SSHSession
938 self.subsystemLookup["sftp"] = FileTransferServer
940 self._client = client
941 self._root = rootnode
942 self._username = username
943 self._convergence = client.convergence
945 # maps from UTF-8 paths for this user, to files written and still open
946 self._heisenfiles = {}
948 def gotVersion(self, otherVersion, extData):
949 self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
951 # advertise the same extensions as the OpenSSH SFTP server
952 # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
953 return {'posix-rename@openssh.com': '1',
954 'statvfs@openssh.com': '2',
955 'fstatvfs@openssh.com': '2',
959 self.log(".logout()", level=OPERATIONAL)
961 for files in self._heisenfiles.itervalues():
965 def _add_heisenfiles_by_path(self, userpath, files):
966 if noisy: self.log("._add_heisenfiles_by_path(%r, %r)" % (userpath, files), level=NOISY)
968 if userpath in self._heisenfiles:
969 self._heisenfiles[userpath] += files
971 self._heisenfiles[userpath] = files
973 def _add_heisenfiles_by_direntry(self, direntry, files_to_add):
974 if noisy: self.log("._add_heisenfiles_by_direntry(%r, %r)" % (direntry, files_to_add), level=NOISY)
977 if direntry in all_heisenfiles:
978 (old_files, opentime) = all_heisenfiles[direntry]
979 all_heisenfiles[direntry] = (old_files + files_to_add, opentime)
981 all_heisenfiles[direntry] = (files_to_add, time())
983 def _abandon_any_heisenfiles(self, userpath, direntry):
984 if noisy: self.log("._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY)
986 # First we synchronously mark all heisenfiles matching the userpath or direntry
987 # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
988 # each file that we abandoned.
990 # For each file, the call to .abandon() occurs:
991 # * before the file is closed, in which case it will never be committed
992 # (uploaded+linked or published); or
993 # * after it is closed but before it has been close_notified, in which case the
994 # .sync() ensures that it has been committed (successfully or not) before we
997 # This avoids a race that might otherwise cause the file to be committed after
998 # the remove operation has completed.
1000 # We return a Deferred that fires with True if any files were abandoned (this
1001 # does not mean that they were not committed; it is used to determine whether
1002 # a NoSuchChildError from the attempt to delete the file should be suppressed).
1005 if direntry in all_heisenfiles:
1006 (files, opentime) = all_heisenfiles[direntry]
1007 del all_heisenfiles[direntry]
1008 if userpath in self._heisenfiles:
1009 files += self._heisenfiles[userpath]
1010 del self._heisenfiles[userpath]
1015 d = defer.succeed(None)
1017 def _sync(ign, current_f):
1018 return current_f.sync()
1021 d.addBoth(lambda ign: len(files) > 0)
1024 def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
1025 to_userpath, to_parent, to_childname, overwrite=True):
1026 if noisy: self.log("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
1027 (from_userpath, from_parent, from_childname,
1028 to_userpath, to_parent, to_childname, overwrite), level=NOISY)
1030 # First we synchronously rename all heisenfiles matching the userpath or direntry.
1031 # Then we .sync() each file that we renamed.
1033 # For each file, the call to .rename occurs:
1034 # * before the file is closed, in which case it will be committed at the
1036 # * after it is closed but before it has been close_notified, in which case the
1037 # .sync() ensures that it has been committed (successfully or not) before we
1040 # This avoids a race that might otherwise cause the file to be committed at the
1041 # old name after the rename operation has completed.
1043 # Note that if overwrite is False, the caller should already have checked
1044 # whether a real direntry exists at the destination. It is possible that another
1045 # direntry (heisen or real) comes to exist at the destination after that check,
1046 # but in that case it is correct for the rename to succeed (and for the commit
1047 # of the heisenfile at the destination to possibly clobber the other entry, since
1048 # that can happen anyway when we have concurrent write handles to the same direntry).
1050 # We return a Deferred that fires with True if any files were renamed (this
1051 # does not mean that they were not committed; it is used to determine whether
1052 # a NoSuchChildError from the rename attempt should be suppressed). If overwrite
1053 # is False and there were already heisenfiles at the destination userpath or
1054 # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
1056 from_direntry = self._direntry_for(from_parent, from_childname)
1057 to_direntry = self._direntry_for(to_parent, to_childname)
1059 if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
1060 def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1061 return defer.execute(_existing)
1064 if from_direntry in all_heisenfiles:
1065 (from_files, opentime) = all_heisenfiles[from_direntry]
1066 del all_heisenfiles[from_direntry]
1067 if from_userpath in self._heisenfiles:
1068 from_files += self._heisenfiles[from_userpath]
1069 del self._heisenfiles[from_userpath]
1071 self._add_heisenfiles_by_direntry(to_direntry, from_files)
1072 self._add_heisenfiles_by_path(to_userpath, from_files)
1074 for f in from_files:
1075 f.rename(to_userpath, to_parent, to_childname)
1077 d = defer.succeed(None)
1078 for f in from_files:
1079 def _sync(ign, current_f):
1080 return current_f.sync()
1083 d.addBoth(lambda ign: len(from_files) > 0)
1086 def _sync_heisenfiles(self, userpath, direntry, ignore=None):
1087 request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
1088 self.log(request, level=OPERATIONAL)
1091 if direntry in all_heisenfiles:
1092 (files, opentime) = all_heisenfiles[direntry]
1093 if userpath in self._heisenfiles:
1094 files += self._heisenfiles[userpath]
1096 if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1098 d = defer.succeed(None)
1100 if not (f is ignore):
1101 def _sync(ign, current_f):
1102 if noisy: self.log("_sync %r in %r" % (current_f, request), level=NOISY)
1103 return current_f.sync()
1107 self.log("done %r" % (request,), level=OPERATIONAL)
1112 def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
1113 if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
1115 direntry = self._direntry_for(parent, childname)
1116 if direntry in all_heisenfiles:
1117 (all_old_files, opentime) = all_heisenfiles[direntry]
1118 all_new_files = [f for f in all_old_files if f is not file_to_remove]
1119 if len(all_new_files) > 0:
1120 all_heisenfiles[direntry] = (all_new_files, opentime)
1122 del all_heisenfiles[direntry]
1124 if userpath in self._heisenfiles:
1125 old_files = self._heisenfiles[userpath]
1126 new_files = [f for f in old_files if f is not file_to_remove]
1127 if len(new_files) > 0:
1128 self._heisenfiles[userpath] = new_files
1130 del self._heisenfiles[userpath]
1132 def _direntry_for(self, filenode_or_parent, childname=None):
1133 if filenode_or_parent:
1134 rw_uri = filenode_or_parent.get_write_uri()
1135 if rw_uri and childname:
1136 return rw_uri + "/" + childname.encode('utf-8')
1142 def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
1143 if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
1144 (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
1147 assert metadata is None or 'readonly' in metadata, metadata
1149 writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
1151 direntry = self._direntry_for(parent, childname)
1153 direntry = self._direntry_for(filenode)
1155 d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
1157 if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
1158 d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
1162 close_notify = self._remove_heisenfile
1164 d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
1165 def _got_file(file):
1167 self._add_heisenfiles_by_direntry(direntry, [file])
1168 return file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
1169 d.addCallback(_got_file)
1172 def openFile(self, pathstring, flags, attrs):
1173 request = ".openFile(%r, %r = %r, %r)" % (pathstring, flags, _repr_flags(flags), attrs)
1174 self.log(request, level=OPERATIONAL)
1176 # This is used for both reading and writing.
1177 # First exclude invalid combinations of flags, and empty paths.
1179 if not (flags & (FXF_READ | FXF_WRITE)):
1180 def _bad_readwrite():
1181 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
1182 return defer.execute(_bad_readwrite)
1184 if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
1185 def _bad_exclcreat():
1186 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
1187 return defer.execute(_bad_exclcreat)
1189 path = self._path_from_string(pathstring)
1191 def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
1192 return defer.execute(_emptypath)
1194 # The combination of flags is potentially valid.
1196 # To work around clients that have race condition bugs, a getAttr, rename, or
1197 # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
1198 # should succeed even if the 'open' request has not yet completed. So we now
1199 # synchronously add a file object into the self._heisenfiles dict, indexed
1200 # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
1201 # because we don't yet have a user-independent path for the file.) The file
1202 # object does not know its filenode, parent, or childname at this point.
1204 userpath = self._path_to_utf8(path)
1206 if flags & (FXF_WRITE | FXF_CREAT):
1207 file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
1208 self._add_heisenfiles_by_path(userpath, [file])
1210 # We haven't decided which file implementation to use yet.
1213 # Now there are two major cases:
1215 # 1. The path is specified as /uri/FILECAP, with no parent directory.
1216 # If the FILECAP is mutable and writeable, then we can open it in write-only
1217 # or read/write mode (non-exclusively), otherwise we can only open it in
1218 # read-only mode. The open should succeed immediately as long as FILECAP is
1219 # a valid known filecap that grants the required permission.
1221 # 2. The path is specified relative to a parent. We find the parent dirnode and
1222 # get the child's URI and metadata if it exists. There are four subcases:
1223 # a. the child does not exist: FXF_CREAT must be set, and we must be able
1224 # to write to the parent directory.
1225 # b. the child exists but is not a valid known filecap: fail
1226 # c. the child is mutable: if we are trying to open it write-only or
1227 # read/write, then we must be able to write to the file.
1228 # d. the child is immutable: if we are trying to open it write-only or
1229 # read/write, then we must be able to write to the parent directory.
1231 # To reduce latency, open normally succeeds as soon as these conditions are
1232 # met, even though there might be a failure in downloading the existing file
1233 # or uploading a new one. However, there is an exception: if a file has been
1234 # written, then closed, and is now being reopened, then we have to delay the
1235 # open until the previous upload/publish has completed. This is necessary
1236 # because sshfs does not wait for the result of an FXF_CLOSE message before
1237 # reporting to the client that a file has been closed. It applies both to
1238 # mutable files, and to directory entries linked to an immutable file.
1240 # Note that the permission checks below are for more precise error reporting on
1241 # the open call; later operations would fail even if we did not make these checks.
1243 d = self._get_root(path)
1244 def _got_root( (root, path) ):
1245 if root.is_unknown():
1246 raise SFTPError(FX_PERMISSION_DENIED,
1247 "cannot open an unknown cap (or child of an unknown directory). "
1248 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1251 if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
1252 if not IFileNode.providedBy(root):
1253 raise SFTPError(FX_PERMISSION_DENIED,
1254 "cannot open a directory cap")
1255 if (flags & FXF_WRITE) and root.is_readonly():
1256 raise SFTPError(FX_PERMISSION_DENIED,
1257 "cannot write to a non-writeable filecap without a parent directory")
1258 if flags & FXF_EXCL:
1259 raise SFTPError(FX_FAILURE,
1260 "cannot create a file exclusively when it already exists")
1262 # The file does not need to be added to all_heisenfiles, because it is not
1263 # associated with a directory entry that needs to be updated.
1265 return self._make_file(file, userpath, flags, filenode=root)
1268 childname = path[-1]
1269 if noisy: self.log("case 2: root = %r, childname = %r, path[:-1] = %r" %
1270 (root, childname, path[:-1]), level=NOISY)
1271 d2 = root.get_child_at_path(path[:-1])
1272 def _got_parent(parent):
1273 if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
1274 if parent.is_unknown():
1275 raise SFTPError(FX_PERMISSION_DENIED,
1276 "cannot open an unknown cap (or child of an unknown directory). "
1277 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1279 parent_readonly = parent.is_readonly()
1280 d3 = defer.succeed(None)
1281 if flags & FXF_EXCL:
1282 # FXF_EXCL means that the link to the file (not the file itself) must
1283 # be created atomically wrt updates by this storage client.
1284 # That is, we need to create the link before returning success to the
1285 # SFTP open request (and not just on close, as would normally be the
1286 # case). We make the link initially point to a zero-length LIT file,
1287 # which is consistent with what might happen on a POSIX filesystem.
1290 raise SFTPError(FX_FAILURE,
1291 "cannot create a file exclusively when the parent directory is read-only")
1293 # 'overwrite=False' ensures failure if the link already exists.
1294 # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
1296 zero_length_lit = "URI:LIT:"
1297 if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
1298 (parent, zero_length_lit, childname), level=NOISY)
1299 d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit, overwrite=False))
1300 def _seturi_done(child):
1301 if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
1302 d4 = parent.get_metadata_for(childname)
1303 d4.addCallback(lambda metadata: (child, metadata))
1305 d3.addCallback(_seturi_done)
1307 if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
1308 d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
1310 def _got_child( (filenode, metadata) ):
1311 if noisy: self.log("_got_child( (%r, %r) )" % (filenode, metadata), level=NOISY)
1313 if filenode.is_unknown():
1314 raise SFTPError(FX_PERMISSION_DENIED,
1315 "cannot open an unknown cap. Upgrading the gateway "
1316 "to a later Tahoe-LAFS version may help")
1317 if not IFileNode.providedBy(filenode):
1318 raise SFTPError(FX_PERMISSION_DENIED,
1319 "cannot open a directory as if it were a file")
1320 if (flags & FXF_WRITE) and filenode.is_mutable() and filenode.is_readonly():
1321 raise SFTPError(FX_PERMISSION_DENIED,
1322 "cannot open a read-only mutable file for writing")
1323 if (flags & FXF_WRITE) and parent_readonly:
1324 raise SFTPError(FX_PERMISSION_DENIED,
1325 "cannot open a file for writing when the parent directory is read-only")
1327 metadata['readonly'] = _is_readonly(parent_readonly, filenode)
1328 return self._make_file(file, userpath, flags, parent=parent, childname=childname,
1329 filenode=filenode, metadata=metadata)
1331 if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
1332 f.trap(NoSuchChildError)
1334 if not (flags & FXF_CREAT):
1335 raise SFTPError(FX_NO_SUCH_FILE,
1336 "the file does not exist, and was not opened with the creation (CREAT) flag")
1338 raise SFTPError(FX_PERMISSION_DENIED,
1339 "cannot create a file when the parent directory is read-only")
1341 return self._make_file(file, userpath, flags, parent=parent, childname=childname)
1342 d3.addCallbacks(_got_child, _no_child)
1345 d2.addCallback(_got_parent)
1348 d.addCallback(_got_root)
1349 def _remove_on_error(err):
1351 self._remove_heisenfile(userpath, None, None, file)
1353 d.addErrback(_remove_on_error)
1354 d.addBoth(_convert_error, request)
1357 def renameFile(self, from_pathstring, to_pathstring, overwrite=False):
1358 request = ".renameFile(%r, %r)" % (from_pathstring, to_pathstring)
1359 self.log(request, level=OPERATIONAL)
1361 from_path = self._path_from_string(from_pathstring)
1362 to_path = self._path_from_string(to_pathstring)
1363 from_userpath = self._path_to_utf8(from_path)
1364 to_userpath = self._path_to_utf8(to_path)
1366 # the target directory must already exist
1367 d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
1368 self._get_parent_or_node(to_path)])
1369 def _got( (from_pair, to_pair) ):
1370 if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
1371 (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
1372 (from_parent, from_childname) = from_pair
1373 (to_parent, to_childname) = to_pair
1375 if from_childname is None:
1376 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
1377 if to_childname is None:
1378 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename to a destination specified by URI")
1380 # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
1381 # "It is an error if there already exists a file with the name specified
1383 # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
1385 # For the standard SSH_FXP_RENAME operation, overwrite=False.
1386 # We also support the posix-rename@openssh.com extension, which uses overwrite=True.
1388 d2 = defer.fail(NoSuchChildError())
1390 d2.addCallback(lambda ign: to_parent.get(to_childname))
1391 def _expect_fail(res):
1392 if not isinstance(res, Failure):
1393 raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1395 # It is OK if we fail for errors other than NoSuchChildError, since that probably
1396 # indicates some problem accessing the destination directory.
1397 res.trap(NoSuchChildError)
1398 d2.addBoth(_expect_fail)
1400 # If there are heisenfiles to be written at the 'from' direntry, then ensure
1401 # they will now be written at the 'to' direntry instead.
1402 d2.addCallback(lambda ign:
1403 self._rename_heisenfiles(from_userpath, from_parent, from_childname,
1404 to_userpath, to_parent, to_childname, overwrite=overwrite))
1407 # FIXME: use move_child_to_path to avoid possible data loss due to #943
1408 #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
1410 d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
1412 if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
1413 (err, from_pathstring, to_pathstring, overwrite), level=NOISY)
1415 if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
1417 if not overwrite and err.check(ExistingChildError):
1418 raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1423 d2.addCallback(_move)
1426 d.addBoth(_convert_error, request)
1429 def makeDirectory(self, pathstring, attrs):
1430 request = ".makeDirectory(%r, %r)" % (pathstring, attrs)
1431 self.log(request, level=OPERATIONAL)
1433 path = self._path_from_string(pathstring)
1434 metadata = self._attrs_to_metadata(attrs)
1435 d = self._get_root(path)
1436 d.addCallback(lambda (root, path):
1437 self._get_or_create_directories(root, path, metadata))
1438 d.addBoth(_convert_error, request)
1441 def _get_or_create_directories(self, node, path, metadata):
1442 if not IDirectoryNode.providedBy(node):
1443 # TODO: provide the name of the blocking file in the error message.
1444 def _blocked(): raise SFTPError(FX_FAILURE, "cannot create directory because there "
1445 "is a file in the way") # close enough
1446 return defer.execute(_blocked)
1449 return defer.succeed(node)
1450 d = node.get(path[0])
1451 def _maybe_create(f):
1452 f.trap(NoSuchChildError)
1453 return node.create_subdirectory(path[0])
1454 d.addErrback(_maybe_create)
1455 d.addCallback(self._get_or_create_directories, path[1:], metadata)
1458 def removeFile(self, pathstring):
1459 request = ".removeFile(%r)" % (pathstring,)
1460 self.log(request, level=OPERATIONAL)
1462 path = self._path_from_string(pathstring)
1463 d = self._remove_object(path, must_be_file=True)
1464 d.addBoth(_convert_error, request)
1467 def removeDirectory(self, pathstring):
1468 request = ".removeDirectory(%r)" % (pathstring,)
1469 self.log(request, level=OPERATIONAL)
1471 path = self._path_from_string(pathstring)
1472 d = self._remove_object(path, must_be_directory=True)
1473 d.addBoth(_convert_error, request)
1476 def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1477 userpath = self._path_to_utf8(path)
1478 d = self._get_parent_or_node(path)
1479 def _got_parent( (parent, childname) ):
1480 if childname is None:
1481 raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
1483 direntry = self._direntry_for(parent, childname)
1484 d2 = defer.succeed(False)
1485 if not must_be_directory:
1486 d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
1488 d2.addCallback(lambda abandoned:
1489 parent.delete(childname, must_exist=not abandoned,
1490 must_be_directory=must_be_directory, must_be_file=must_be_file))
1492 d.addCallback(_got_parent)
1495 def openDirectory(self, pathstring):
1496 request = ".openDirectory(%r)" % (pathstring,)
1497 self.log(request, level=OPERATIONAL)
1499 path = self._path_from_string(pathstring)
1500 d = self._get_parent_or_node(path)
1501 def _got_parent_or_node( (parent_or_node, childname) ):
1502 if noisy: self.log("_got_parent_or_node( (%r, %r) ) in openDirectory(%r)" %
1503 (parent_or_node, childname, pathstring), level=NOISY)
1504 if childname is None:
1505 return parent_or_node
1507 return parent_or_node.get(childname)
1508 d.addCallback(_got_parent_or_node)
1510 if dirnode.is_unknown():
1511 raise SFTPError(FX_PERMISSION_DENIED,
1512 "cannot list an unknown cap as a directory. Upgrading the gateway "
1513 "to a later Tahoe-LAFS version may help")
1514 if not IDirectoryNode.providedBy(dirnode):
1515 raise SFTPError(FX_PERMISSION_DENIED,
1516 "cannot list a file as if it were a directory")
1519 def _render(children):
1520 parent_readonly = dirnode.is_readonly()
1522 for filename, (child, metadata) in children.iteritems():
1523 # The file size may be cached or absent.
1524 metadata['readonly'] = _is_readonly(parent_readonly, child)
1525 attrs = _populate_attrs(child, metadata)
1526 filename_utf8 = filename.encode('utf-8')
1527 longname = _lsLine(filename_utf8, attrs)
1528 results.append( (filename_utf8, longname, attrs) )
1529 return StoppableList(results)
1530 d2.addCallback(_render)
1532 d.addCallback(_list)
1533 d.addBoth(_convert_error, request)
1536 def getAttrs(self, pathstring, followLinks):
1537 request = ".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks)
1538 self.log(request, level=OPERATIONAL)
1540 # When asked about a specific file, report its current size.
1541 # TODO: the modification time for a mutable file should be
1542 # reported as the update time of the best version. But that
1543 # information isn't currently stored in mutable shares, I think.
1545 # Some clients will incorrectly try to get the attributes
1546 # of a file immediately after opening it, before it has been put
1547 # into the all_heisenfiles table. This is a race condition bug in
1548 # the client, but we probably need to handle it anyway.
1550 path = self._path_from_string(pathstring)
1551 userpath = self._path_to_utf8(path)
1552 d = self._get_parent_or_node(path)
1553 def _got_parent_or_node( (parent_or_node, childname) ):
1554 if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1556 direntry = self._direntry_for(parent_or_node, childname)
1557 d2 = self._sync_heisenfiles(userpath, direntry)
1559 if childname is None:
1560 node = parent_or_node
1561 d2.addCallback(lambda ign: node.get_current_size())
1562 d2.addCallback(lambda size:
1563 _populate_attrs(node, {'readonly': node.is_unknown() or node.is_readonly()}, size=size))
1565 parent = parent_or_node
1566 d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
1567 def _got( (child, metadata) ):
1568 if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
1569 assert IDirectoryNode.providedBy(parent), parent
1570 metadata['readonly'] = _is_readonly(parent.is_readonly(), child)
1571 d3 = child.get_current_size()
1572 d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
1575 if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
1576 err.trap(NoSuchChildError)
1577 direntry = self._direntry_for(parent, childname)
1578 if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
1579 (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
1580 if direntry in all_heisenfiles:
1581 (files, opentime) = all_heisenfiles[direntry]
1582 sftptime = _to_sftp_time(opentime)
1583 # A file that has been opened for writing necessarily has permissions rw-rw-rw-.
1584 return {'permissions': S_IFREG | 0666,
1586 'createtime': sftptime,
1592 d2.addCallbacks(_got, _nosuch)
1594 d.addCallback(_got_parent_or_node)
1595 d.addBoth(_convert_error, request)
1598 def setAttrs(self, pathstring, attrs):
1599 self.log(".setAttrs(%r, %r)" % (pathstring, attrs), level=OPERATIONAL)
1602 # this would require us to download and re-upload the truncated/extended
1604 def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute unsupported")
1605 return defer.execute(_unsupported)
1606 return defer.succeed(None)
1608 def readLink(self, pathstring):
1609 self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1611 def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1612 return defer.execute(_unsupported)
1614 def makeLink(self, linkPathstring, targetPathstring):
1615 self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1617 # If this is implemented, note the reversal of arguments described in point 7 of
1618 # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>.
1620 def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1621 return defer.execute(_unsupported)
1623 def extendedRequest(self, extensionName, extensionData):
1624 self.log(".extendedRequest(%r, <data of length %r>)" % (extensionName, len(extensionData)), level=OPERATIONAL)
1626 # We implement the three main OpenSSH SFTP extensions; see
1627 # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1629 if extensionName == 'posix-rename@openssh.com':
1630 def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
1632 (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
1633 if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
1635 (toPathLen,) = struct.unpack('>L', extensionData[(4 + fromPathLen):(8 + fromPathLen)])
1636 if 8 + fromPathLen + toPathLen != len(extensionData): return defer.execute(_bad)
1638 fromPathstring = extensionData[4:(4 + fromPathLen)]
1639 toPathstring = extensionData[(8 + fromPathLen):]
1640 d = self.renameFile(fromPathstring, toPathstring, overwrite=True)
1642 # Twisted conch assumes that the response from an extended request is either
1643 # an error, or an FXP_EXTENDED_REPLY. But it happens to do the right thing
1644 # (respond with an FXP_STATUS message) if we return a Failure with code FX_OK.
1645 def _succeeded(ign):
1646 raise SFTPError(FX_OK, "request succeeded")
1647 d.addCallback(_succeeded)
1650 if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
1651 return defer.succeed(struct.pack('>11Q',
1652 1024, # uint64 f_bsize /* file system block size */
1653 1024, # uint64 f_frsize /* fundamental fs block size */
1654 628318530, # uint64 f_blocks /* number of blocks (unit f_frsize) */
1655 314159265, # uint64 f_bfree /* free blocks in file system */
1656 314159265, # uint64 f_bavail /* free blocks for non-root */
1657 200000000, # uint64 f_files /* total file inodes */
1658 100000000, # uint64 f_ffree /* free file inodes */
1659 100000000, # uint64 f_favail /* free file inodes for non-root */
1660 0x1AF5, # uint64 f_fsid /* file system id */
1661 2, # uint64 f_flag /* bit mask = ST_NOSUID; not ST_RDONLY */
1662 65535, # uint64 f_namemax /* maximum filename length */
1665 def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "unsupported %r request <data of length %r>" %
1666 (extensionName, len(extensionData)))
1667 return defer.execute(_unsupported)
1669 def realPath(self, pathstring):
1670 self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1672 return self._path_to_utf8(self._path_from_string(pathstring))
1674 def _path_to_utf8(self, path):
1675 return (u"/" + u"/".join(path)).encode('utf-8')
1677 def _path_from_string(self, pathstring):
1678 if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1680 # The home directory is the root directory.
1681 pathstring = pathstring.strip("/")
1682 if pathstring == "" or pathstring == ".":
1685 path_utf8 = pathstring.split("/")
1687 # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1688 # "Servers SHOULD interpret a path name component ".." as referring to
1689 # the parent directory, and "." as referring to the current directory."
1691 for p_utf8 in path_utf8:
1693 # ignore excess .. components at the root
1698 p = p_utf8.decode('utf-8', 'strict')
1699 except UnicodeError:
1700 raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1703 if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1706 def _get_root(self, path):
1707 # return Deferred (root, remaining_path)
1708 d = defer.succeed(None)
1709 if path and path[0] == u"uri":
1710 d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
1711 d.addCallback(lambda root: (root, path[2:]))
1713 d.addCallback(lambda ign: (self._root, path))
1716 def _get_parent_or_node(self, path):
1717 # return Deferred (parent, childname) or (node, None)
1718 d = self._get_root(path)
1719 def _got_root( (root, remaining_path) ):
1720 if not remaining_path:
1723 d2 = root.get_child_at_path(remaining_path[:-1])
1724 d2.addCallback(lambda parent: (parent, remaining_path[-1]))
1726 d.addCallback(_got_root)
1729 def _attrs_to_metadata(self, attrs):
1733 if key == "mtime" or key == "ctime" or key == "createtime":
1734 metadata[key] = long(attrs[key])
1735 elif key.startswith("ext_"):
1736 metadata[key] = str(attrs[key])
1741 class SFTPUser(ConchUser, PrefixingLogMixin):
1742 implements(ISession)
1743 def __init__(self, check_abort, client, rootnode, username, convergence):
1744 ConchUser.__init__(self)
1745 PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1747 self.channelLookup["session"] = session.SSHSession
1748 self.subsystemLookup["sftp"] = FileTransferServer
1750 self.check_abort = check_abort
1751 self.client = client
1752 self.root = rootnode
1753 self.username = username
1754 self.convergence = convergence
1756 def getPty(self, terminal, windowSize, attrs):
1757 self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1758 raise NotImplementedError
1760 def openShell(self, protocol):
1761 self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1762 raise NotImplementedError
1764 def execCommand(self, protocol, cmd):
1765 self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1766 raise NotImplementedError
1768 def windowChanged(self, newWindowSize):
1769 self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1772 self.log(".eofReceived()", level=OPERATIONAL)
1775 self.log(".closed()", level=OPERATIONAL)
1778 # if you have an SFTPUser, and you want something that provides ISFTPServer,
1779 # then you get SFTPHandler(user)
1780 components.registerAdapter(SFTPHandler, SFTPUser, ISFTPServer)
1782 from auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1785 implements(portal.IRealm)
1786 def __init__(self, client):
1787 self._client = client
1789 def requestAvatar(self, avatarID, mind, interface):
1790 assert interface == IConchUser, interface
1791 rootnode = self._client.create_node_from_uri(avatarID.rootcap)
1792 handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
1793 return (interface, handler, handler.logout)
1796 class SFTPServer(service.MultiService):
1797 def __init__(self, client, accountfile, accounturl,
1798 sftp_portstr, pubkey_file, privkey_file):
1799 service.MultiService.__init__(self)
1801 r = Dispatcher(client)
1802 p = portal.Portal(r)
1805 c = AccountFileChecker(self, accountfile)
1806 p.registerChecker(c)
1808 c = AccountURLChecker(self, accounturl)
1809 p.registerChecker(c)
1810 if not accountfile and not accounturl:
1811 # we could leave this anonymous, with just the /uri/CAP form
1812 raise NeedRootcapLookupScheme("must provide an account file or URL")
1814 pubkey = keys.Key.fromFile(pubkey_file)
1815 privkey = keys.Key.fromFile(privkey_file)
1816 class SSHFactory(factory.SSHFactory):
1817 publicKeys = {pubkey.sshType(): pubkey}
1818 privateKeys = {privkey.sshType(): privkey}
1819 def getPrimes(self):
1821 # if present, this enables diffie-hellman-group-exchange
1822 return primes.parseModuliFile("/etc/ssh/moduli")
1829 s = strports.service(sftp_portstr, f)
1830 s.setServiceParent(self)