2 import os, tempfile, heapq, binascii, traceback, array, stat, struct
3 from stat import S_IFREG, S_IFDIR
4 from time import time, strftime, localtime
6 from zope.interface import implements
7 from twisted.python import components
8 from twisted.application import service, strports
9 from twisted.conch.ssh import factory, keys, session
10 from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
11 FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
12 FX_BAD_MESSAGE, FX_FAILURE, FX_OK
13 from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
14 FXF_CREAT, FXF_TRUNC, FXF_EXCL
15 from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
16 from twisted.conch.avatar import ConchUser
17 from twisted.conch.openssh_compat import primes
18 from twisted.cred import portal
19 from twisted.internet.error import ProcessDone, ProcessTerminated
20 from twisted.python.failure import Failure
21 from twisted.internet.interfaces import ITransport
23 from twisted.internet import defer
24 from twisted.internet.interfaces import IFinishableConsumer
25 from foolscap.api import eventually
26 from allmydata.util import deferredutil
28 from allmydata.util.consumer import download_to_data
29 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
30 NoSuchChildError, ChildOfWrongTypeError
31 from allmydata.mutable.common import NotWriteableError
32 from allmydata.immutable.upload import FileHandle
34 from pycryptopp.cipher.aes import AES
37 use_foolscap_logging = True
39 from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
40 msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
42 if use_foolscap_logging:
43 (logmsg, logerr, PrefixingLogMixin) = (_msg, _err, _PrefixingLogMixin)
44 else: # pragma: no cover
45 def logmsg(s, level=None):
47 def logerr(s, level=None):
49 class PrefixingLogMixin:
50 def __init__(self, facility=None, prefix=''):
52 def log(self, s, level=None):
53 print "%r %s" % (self.prefix, s)
56 def eventually_callback(d):
57 return lambda res: eventually(d.callback, res)
59 def eventually_errback(d):
60 return lambda err: eventually(d.errback, err)
64 if isinstance(x, unicode):
65 return x.encode('utf-8')
66 if isinstance(x, str):
72 """SFTP times are unsigned 32-bit integers representing UTC seconds
73 (ignoring leap seconds) since the Unix epoch, January 1 1970 00:00 UTC.
74 A Tahoe time is the corresponding float."""
75 return long(t) & 0xFFFFFFFFL
78 def _convert_error(res, request):
79 if not isinstance(res, Failure):
81 if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
82 logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
86 logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
88 if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
89 except: # pragma: no cover
92 # The message argument to SFTPError must not reveal information that
93 # might compromise anonymity.
95 if err.check(SFTPError):
96 # original raiser of SFTPError has responsibility to ensure anonymity
98 if err.check(NoSuchChildError):
99 childname = _utf8(err.value.args[0])
100 raise SFTPError(FX_NO_SUCH_FILE, childname)
101 if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
102 msg = _utf8(err.value.args[0])
103 raise SFTPError(FX_PERMISSION_DENIED, msg)
104 if err.check(ExistingChildError):
105 # Versions of SFTP after v3 (which is what twisted.conch implements)
106 # define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
107 # However v3 doesn't; instead, other servers such as sshd return
108 # FX_FAILURE. The gvfs SFTP backend, for example, depends on this
109 # to translate the error to the equivalent of POSIX EEXIST, which is
110 # necessary for some picky programs (such as gedit).
111 msg = _utf8(err.value.args[0])
112 raise SFTPError(FX_FAILURE, msg)
113 if err.check(NotImplementedError):
114 raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
115 if err.check(EOFError):
116 raise SFTPError(FX_EOF, "end of file reached")
117 if err.check(defer.FirstError):
118 _convert_error(err.value.subFailure, request)
120 # We assume that the error message is not anonymity-sensitive.
121 raise SFTPError(FX_FAILURE, _utf8(err.value))
124 def _repr_flags(flags):
125 return "|".join([f for f in
126 [(flags & FXF_READ) and "FXF_READ" or None,
127 (flags & FXF_WRITE) and "FXF_WRITE" or None,
128 (flags & FXF_APPEND) and "FXF_APPEND" or None,
129 (flags & FXF_CREAT) and "FXF_CREAT" or None,
130 (flags & FXF_TRUNC) and "FXF_TRUNC" or None,
131 (flags & FXF_EXCL) and "FXF_EXCL" or None,
136 def _lsLine(name, attrs):
139 st_mtime = attrs.get("mtime", 0)
140 st_mode = attrs["permissions"]
141 # TODO: check that clients are okay with this being a "?".
142 # (They should be because the longname is intended for human
144 st_size = attrs.get("size", "?")
145 # We don't know how many links there really are to this object.
148 # Based on <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
149 # We can't call the version in Twisted because we might have a version earlier than
150 # <http://twistedmatrix.com/trac/changeset/25412> (released in Twisted 8.2).
153 perms = array.array('c', '-'*10)
154 ft = stat.S_IFMT(mode)
155 if stat.S_ISDIR(ft): perms[0] = 'd'
156 elif stat.S_ISREG(ft): perms[0] = '-'
159 if mode&stat.S_IRUSR: perms[1] = 'r'
160 if mode&stat.S_IWUSR: perms[2] = 'w'
161 if mode&stat.S_IXUSR: perms[3] = 'x'
163 if mode&stat.S_IRGRP: perms[4] = 'r'
164 if mode&stat.S_IWGRP: perms[5] = 'w'
165 if mode&stat.S_IXGRP: perms[6] = 'x'
167 if mode&stat.S_IROTH: perms[7] = 'r'
168 if mode&stat.S_IWOTH: perms[8] = 'w'
169 if mode&stat.S_IXOTH: perms[9] = 'x'
170 # suid/sgid never set
173 l += str(st_nlink).rjust(5) + ' '
184 if st_mtime + sixmo < now or st_mtime > now + day:
185 # mtime is more than 6 months ago, or more than one day in the future
186 l += strftime("%b %d %Y ", localtime(st_mtime))
188 l += strftime("%b %d %H:%M ", localtime(st_mtime))
193 def _is_readonly(parent_readonly, child):
194 """Whether child should be listed as having read-only permissions in parent."""
196 if child.is_unknown():
198 elif child.is_mutable():
199 return child.is_readonly()
201 return parent_readonly
204 def _populate_attrs(childnode, metadata, size=None):
207 # The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
208 # bits, otherwise the client may refuse to open a directory.
209 # Also, sshfs run as a non-root user requires files and directories
210 # to be world-readable/writeable.
212 # Directories and unknown nodes have no size, and SFTP doesn't
213 # require us to make one up.
215 # childnode might be None, meaning that the file doesn't exist yet,
216 # but we're going to write it later.
218 if childnode and childnode.is_unknown():
220 elif childnode and IDirectoryNode.providedBy(childnode):
221 perms = S_IFDIR | 0777
223 # For files, omit the size if we don't immediately know it.
224 if childnode and size is None:
225 size = childnode.get_size()
227 assert isinstance(size, (int, long)) and not isinstance(size, bool), repr(size)
229 perms = S_IFREG | 0666
232 assert 'readonly' in metadata, metadata
233 if metadata['readonly']:
234 perms &= S_IFDIR | S_IFREG | 0555 # clear 'w' bits
236 # see webapi.txt for what these times mean
237 if 'linkmotime' in metadata.get('tahoe', {}):
238 attrs['mtime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
239 elif 'mtime' in metadata:
240 # We would prefer to omit atime, but SFTP version 3 can only
241 # accept mtime if atime is also set.
242 attrs['mtime'] = _to_sftp_time(metadata['mtime'])
243 attrs['atime'] = attrs['mtime']
245 if 'linkcrtime' in metadata.get('tahoe', {}):
246 attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
248 if 'ctime' in metadata:
249 attrs['ctime'] = _to_sftp_time(metadata['ctime'])
251 attrs['permissions'] = perms
253 # twisted.conch.ssh.filetransfer only implements SFTP version 3,
254 # which doesn't include SSH_FILEXFER_ATTR_FLAGS.
259 class EncryptedTemporaryFile(PrefixingLogMixin):
260 # not implemented: next, readline, readlines, xreadlines, writelines
263 PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
264 self.file = tempfile.TemporaryFile()
265 self.key = os.urandom(16) # AES-128
267 def _crypt(self, offset, data):
268 # TODO: use random-access AES (pycryptopp ticket #18)
269 offset_big = offset // 16
270 offset_small = offset % 16
271 iv = binascii.unhexlify("%032x" % offset_big)
272 cipher = AES(self.key, iv=iv)
273 cipher.process("\x00"*offset_small)
274 return cipher.process(data)
282 def seek(self, offset, whence=os.SEEK_SET):
283 if noisy: self.log(".seek(%r, %r)" % (offset, whence), level=NOISY)
284 self.file.seek(offset, whence)
287 offset = self.file.tell()
288 if noisy: self.log(".tell() = %r" % (offset,), level=NOISY)
291 def read(self, size=-1):
292 if noisy: self.log(".read(%r)" % (size,), level=NOISY)
293 index = self.file.tell()
294 ciphertext = self.file.read(size)
295 plaintext = self._crypt(index, ciphertext)
298 def write(self, plaintext):
299 if noisy: self.log(".write(<data of length %r>)" % (len(plaintext),), level=NOISY)
300 index = self.file.tell()
301 ciphertext = self._crypt(index, plaintext)
302 self.file.write(ciphertext)
304 def truncate(self, newsize):
305 if noisy: self.log(".truncate(%r)" % (newsize,), level=NOISY)
306 self.file.truncate(newsize)
309 class OverwriteableFileConsumer(PrefixingLogMixin):
310 implements(IFinishableConsumer)
311 """I act both as a consumer for the download of the original file contents, and as a
312 wrapper for a temporary file that records the downloaded data and any overwrites.
313 I use a priority queue to keep track of which regions of the file have been overwritten
314 but not yet downloaded, so that the download does not clobber overwritten data.
315 I use another priority queue to record milestones at which to make callbacks
316 indicating that a given number of bytes have been downloaded.
318 The temporary file reflects the contents of the file that I represent, except that:
319 - regions that have neither been downloaded nor overwritten, if present,
321 - the temporary file may be shorter than the represented file (it is never longer).
322 The latter's current size is stored in self.current_size.
324 This abstraction is mostly independent of SFTP. Consider moving it, if it is found
325 useful for other frontends."""
327 def __init__(self, download_size, tempfile_maker):
328 PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
329 if noisy: self.log(".__init__(%r, %r)" % (download_size, tempfile_maker), level=NOISY)
330 self.download_size = download_size
331 self.current_size = download_size
332 self.f = tempfile_maker()
334 self.milestones = [] # empty heap of (offset, d)
335 self.overwrites = [] # empty heap of (start, end)
336 self.is_closed = False
337 self.done = self.when_reached(download_size) # adds a milestone
339 def _signal_done(ign):
340 if noisy: self.log("DONE", level=NOISY)
342 self.done.addCallback(_signal_done)
348 def get_current_size(self):
349 return self.current_size
351 def set_current_size(self, size):
352 if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
353 (size, self.current_size, self.downloaded), level=NOISY)
354 if size < self.current_size or size < self.downloaded:
355 self.f.truncate(size)
356 if size > self.current_size:
357 self.overwrite(self.current_size, "\x00" * (size - self.current_size))
358 self.current_size = size
360 # invariant: self.download_size <= self.current_size
361 if size < self.download_size:
362 self.download_size = size
363 if self.downloaded >= self.download_size:
366 def registerProducer(self, p, streaming):
367 if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
370 # call resumeProducing once to start things off
379 def write(self, data):
380 if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
384 if self.downloaded >= self.download_size:
387 next_downloaded = self.downloaded + len(data)
388 if next_downloaded > self.download_size:
389 data = data[:(self.download_size - self.downloaded)]
391 while len(self.overwrites) > 0:
392 (start, end) = self.overwrites[0]
393 if start >= next_downloaded:
394 # This and all remaining overwrites are after the data we just downloaded.
396 if start > self.downloaded:
397 # The data we just downloaded has been partially overwritten.
398 # Write the prefix of it that precedes the overwritten region.
399 self.f.seek(self.downloaded)
400 self.f.write(data[:(start - self.downloaded)])
402 # This merges consecutive overwrites if possible, which allows us to detect the
403 # case where the download can be stopped early because the remaining region
404 # to download has already been fully overwritten.
405 heapq.heappop(self.overwrites)
406 while len(self.overwrites) > 0:
407 (start1, end1) = self.overwrites[0]
411 heapq.heappop(self.overwrites)
413 if end >= next_downloaded:
414 # This overwrite extends past the downloaded data, so there is no
415 # more data to consider on this call.
416 heapq.heappush(self.overwrites, (next_downloaded, end))
417 self._update_downloaded(next_downloaded)
419 elif end >= self.downloaded:
420 data = data[(end - self.downloaded):]
421 self._update_downloaded(end)
423 self.f.seek(self.downloaded)
425 self._update_downloaded(next_downloaded)
427 def _update_downloaded(self, new_downloaded):
428 self.downloaded = new_downloaded
429 milestone = new_downloaded
430 if len(self.overwrites) > 0:
431 (start, end) = self.overwrites[0]
432 if start <= new_downloaded and end > milestone:
435 while len(self.milestones) > 0:
436 (next, d) = self.milestones[0]
439 if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
440 heapq.heappop(self.milestones)
441 eventually_callback(d)(None)
443 if milestone >= self.download_size:
446 def overwrite(self, offset, data):
447 if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
448 if offset > self.current_size:
449 # Normally writing at an offset beyond the current end-of-file
450 # would leave a hole that appears filled with zeroes. However, an
451 # EncryptedTemporaryFile doesn't behave like that (if there is a
452 # hole in the file on disk, the zeroes that are read back will be
453 # XORed with the keystream). So we must explicitly write zeroes in
454 # the gap between the current EOF and the offset.
456 self.f.seek(self.current_size)
457 self.f.write("\x00" * (offset - self.current_size))
458 start = self.current_size
464 end = offset + len(data)
465 self.current_size = max(self.current_size, end)
466 if end > self.downloaded:
467 heapq.heappush(self.overwrites, (start, end))
469 def read(self, offset, length):
470 """When the data has been read, callback the Deferred that we return with this data.
471 Otherwise errback the Deferred that we return.
472 The caller must perform no more overwrites until the Deferred has fired."""
474 if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
475 if offset >= self.current_size:
476 def _eof(): raise EOFError("read past end of file")
477 return defer.execute(_eof)
479 if offset + length > self.current_size:
480 length = self.current_size - offset
481 if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
483 needed = min(offset + length, self.download_size)
484 d = self.when_reached(needed)
486 # It is not necessarily the case that self.downloaded >= needed, because
487 # the file might have been truncated (thus truncating the download) and
490 assert self.current_size >= offset + length, (self.current_size, offset, length)
491 if noisy: self.log("self.f = %r" % (self.f,), level=NOISY)
493 return self.f.read(length)
494 d.addCallback(_reached)
497 def when_reached(self, index):
498 if noisy: self.log(".when_reached(%r)" % (index,), level=NOISY)
499 if index <= self.downloaded: # already reached
500 if noisy: self.log("already reached %r" % (index,), level=NOISY)
501 return defer.succeed(None)
504 if noisy: self.log("reached %r" % (index,), level=NOISY)
506 d.addCallback(_reached)
507 heapq.heappush(self.milestones, (index, d))
514 while len(self.milestones) > 0:
515 (next, d) = self.milestones[0]
516 if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
517 heapq.heappop(self.milestones)
518 # The callback means that the milestone has been reached if
519 # it is ever going to be. Note that the file may have been
520 # truncated to before the milestone.
521 eventually_callback(d)(None)
523 # FIXME: causes spurious failures
524 #self.unregisterProducer()
527 self.is_closed = True
529 if not self.is_closed:
532 except BaseException as e:
533 self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
535 def unregisterProducer(self):
537 self.producer.stopProducing()
541 SIZE_THRESHOLD = 1000
544 class ShortReadOnlySFTPFile(PrefixingLogMixin):
545 implements(ISFTPFile)
546 """I represent a file handle to a particular file on an SFTP connection.
547 I am used only for short immutable files opened in read-only mode.
548 The file contents are downloaded to memory when I am created."""
550 def __init__(self, userpath, filenode, metadata):
551 PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
552 if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
554 assert IFileNode.providedBy(filenode), filenode
555 self.filenode = filenode
556 self.metadata = metadata
557 self.async = download_to_data(filenode)
560 def readChunk(self, offset, length):
561 request = ".readChunk(%r, %r)" % (offset, length)
562 self.log(request, level=OPERATIONAL)
565 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
566 return defer.execute(_closed)
570 if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
572 # "In response to this request, the server will read as many bytes as it
573 # can from the file (up to 'len'), and return them in a SSH_FXP_DATA
574 # message. If an error occurs or EOF is encountered before reading any
575 # data, the server will respond with SSH_FXP_STATUS. For normal disk
576 # files, it is guaranteed that this will read the specified number of
577 # bytes, or up to end of file."
579 # i.e. we respond with an EOF error iff offset is already at EOF.
581 if offset >= len(data):
582 eventually_errback(d)(SFTPError(FX_EOF, "read at or past end of file"))
584 eventually_callback(d)(data[offset:min(offset+length, len(data))])
586 self.async.addCallbacks(_read, eventually_errback(d))
587 d.addBoth(_convert_error, request)
590 def writeChunk(self, offset, data):
591 self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
593 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
594 return defer.execute(_denied)
597 self.log(".close()", level=OPERATIONAL)
600 return defer.succeed(None)
603 request = ".getAttrs()"
604 self.log(request, level=OPERATIONAL)
607 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
608 return defer.execute(_closed)
610 d = defer.execute(_populate_attrs, self.filenode, self.metadata)
611 d.addBoth(_convert_error, request)
614 def setAttrs(self, attrs):
615 self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
616 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
617 return defer.execute(_denied)
620 class GeneralSFTPFile(PrefixingLogMixin):
621 implements(ISFTPFile)
622 """I represent a file handle to a particular file on an SFTP connection.
623 I wrap an instance of OverwriteableFileConsumer, which is responsible for
624 storing the file contents. In order to allow write requests to be satisfied
625 immediately, there is effectively a FIFO queue between requests made to this
626 file handle, and requests to my OverwriteableFileConsumer. This queue is
627 implemented by the callback chain of self.async.
629 When first constructed, I am in an 'unopened' state that causes most
630 operations to be delayed until 'open' is called."""
632 def __init__(self, userpath, flags, close_notify, convergence):
633 PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
634 if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
635 (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
637 self.userpath = userpath
639 self.close_notify = close_notify
640 self.convergence = convergence
641 self.async = defer.Deferred()
642 # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
643 self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
645 self.abandoned = False
647 self.childname = None
651 # self.consumer should only be relied on in callbacks for self.async, since it might
652 # not be set before then.
655 def open(self, parent=None, childname=None, filenode=None, metadata=None):
656 self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
657 (parent, childname, filenode, metadata), level=OPERATIONAL)
659 # If the file has been renamed, the new (parent, childname) takes precedence.
660 if self.parent is None:
662 if self.childname is None:
663 self.childname = childname
664 self.filenode = filenode
665 self.metadata = metadata
668 tempfile_maker = EncryptedTemporaryFile
670 if (self.flags & FXF_TRUNC) or not filenode:
671 # We're either truncating or creating the file, so we don't need the old contents.
672 self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
673 self.consumer.finish()
675 assert IFileNode.providedBy(filenode), filenode
677 # TODO: use download interface described in #993 when implemented.
678 if filenode.is_mutable():
679 self.async.addCallback(lambda ign: filenode.download_best_version())
680 def _downloaded(data):
681 self.consumer = OverwriteableFileConsumer(len(data), tempfile_maker)
682 self.consumer.write(data)
683 self.consumer.finish()
685 self.async.addCallback(_downloaded)
687 download_size = filenode.get_size()
688 assert download_size is not None, "download_size is None"
689 self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
691 if noisy: self.log("_read immutable", level=NOISY)
692 filenode.read(self.consumer, 0, None)
693 self.async.addCallback(_read)
695 eventually_callback(self.async)(None)
697 if noisy: self.log("open done", level=NOISY)
700 def rename(self, new_userpath, new_parent, new_childname):
701 self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
703 self.userpath = new_userpath
704 self.parent = new_parent
705 self.childname = new_childname
708 self.log(".abandon()", level=OPERATIONAL)
710 self.abandoned = True
713 self.log(".sync()", level=OPERATIONAL)
716 self.async.addBoth(eventually_callback(d))
718 if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
723 def readChunk(self, offset, length):
724 request = ".readChunk(%r, %r)" % (offset, length)
725 self.log(request, level=OPERATIONAL)
727 if not (self.flags & FXF_READ):
728 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
729 return defer.execute(_denied)
732 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
733 return defer.execute(_closed)
737 if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
738 d2 = self.consumer.read(offset, length)
739 d2.addCallbacks(eventually_callback(d), eventually_errback(d))
740 # It is correct to drop d2 here.
742 self.async.addCallbacks(_read, eventually_errback(d))
743 d.addBoth(_convert_error, request)
746 def writeChunk(self, offset, data):
747 self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
749 if not (self.flags & FXF_WRITE):
750 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
751 return defer.execute(_denied)
754 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
755 return defer.execute(_closed)
757 self.has_changed = True
759 # Note that we return without waiting for the write to occur. Reads and
760 # close wait for prior writes, and will fail if any prior operation failed.
761 # This is ok because SFTP makes no guarantee that the write completes
762 # before the request does. In fact it explicitly allows write errors to be
763 # delayed until close:
764 # "One should note that on some server platforms even a close can fail.
765 # This can happen e.g. if the server operating system caches writes,
766 # and an error occurs while flushing cached writes during the close."
769 if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
770 (offset, len(data), self.consumer.get_current_size()), level=NOISY)
771 # FXF_APPEND means that we should always write at the current end of file.
772 write_offset = offset
773 if self.flags & FXF_APPEND:
774 write_offset = self.consumer.get_current_size()
776 self.consumer.overwrite(write_offset, data)
777 if noisy: self.log("overwrite done", level=NOISY)
779 self.async.addCallback(_write)
780 # don't addErrback to self.async, just allow subsequent async ops to fail.
781 return defer.succeed(None)
785 self.log(request, level=OPERATIONAL)
788 return defer.succeed(None)
790 # This means that close has been called, not that the close has succeeded.
793 if not (self.flags & (FXF_WRITE | FXF_CREAT)):
794 def _readonly_close():
796 self.consumer.close()
797 return defer.execute(_readonly_close)
799 # We must capture the abandoned, parent, and childname variables synchronously
800 # at the close call. This is needed by the correctness arguments in the comments
801 # for _abandon_any_heisenfiles and _rename_heisenfiles.
802 abandoned = self.abandoned
804 childname = self.childname
806 # has_changed is set when writeChunk is called, not when the write occurs, so
807 # it is correct to optimize out the commit if it is False at the close call.
808 has_changed = self.has_changed
811 if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
813 self.consumer.close()
815 # We must close_notify before re-firing self.async.
816 if self.close_notify:
817 self.close_notify(self.userpath, self.parent, self.childname, self)
821 d2 = self.consumer.when_done()
822 if self.filenode and self.filenode.is_mutable():
823 self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
824 d2.addCallback(lambda ign: self.consumer.get_current_size())
825 d2.addCallback(lambda size: self.consumer.read(0, size))
826 d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
829 self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
830 u = FileHandle(self.consumer.get_file(), self.convergence)
831 return parent.add_file(childname, u)
832 d2.addCallback(_add_file)
834 d2.addBoth(_committed)
839 # If the file has been abandoned, we don't want the close operation to get "stuck",
840 # even if self.async fails to re-fire. Doing the close independently of self.async
841 # in that case ensures that dropping an ssh connection is sufficient to abandon
842 # any heisenfiles that were not explicitly closed in that connection.
843 if abandoned or not has_changed:
844 d.addCallback(_committed)
846 self.async.addCallback(_close)
848 self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
849 d.addBoth(_convert_error, request)
853 request = ".getAttrs()"
854 self.log(request, level=OPERATIONAL)
857 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
858 return defer.execute(_closed)
860 # Optimization for read-only handles, when we already know the metadata.
861 if not(self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
862 return defer.succeed(_populate_attrs(self.filenode, self.metadata))
866 # self.filenode might be None, but that's ok.
867 attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
868 eventually_callback(d)(attrs)
870 self.async.addCallbacks(_get, eventually_errback(d))
871 d.addBoth(_convert_error, request)
874 def setAttrs(self, attrs):
875 request = ".setAttrs(attrs) %r" % (attrs,)
876 self.log(request, level=OPERATIONAL)
878 if not (self.flags & FXF_WRITE):
879 def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
880 return defer.execute(_denied)
883 def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
884 return defer.execute(_closed)
886 if not "size" in attrs:
887 return defer.succeed(None)
890 if not isinstance(size, (int, long)) or size < 0:
891 def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
892 return defer.execute(_bad)
896 self.consumer.set_current_size(size)
897 eventually_callback(d)(None)
899 self.async.addCallbacks(_resize, eventually_errback(d))
900 d.addBoth(_convert_error, request)
905 def __init__(self, items):
915 def __init__(self, value):
919 # A "heisenfile" is a file that has been opened with write flags
920 # (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
921 # 'all_heisenfiles' maps from a direntry string to
922 # (list_of_GeneralSFTPFile, open_time_utc).
923 # A direntry string is parent_write_uri + "/" + childname_utf8 for
924 # an immutable file, or file_write_uri for a mutable file.
925 # Updates to this dict are single-threaded.
930 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
931 implements(ISFTPServer)
932 def __init__(self, client, rootnode, username):
933 ConchUser.__init__(self)
934 PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
935 if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
937 self.channelLookup["session"] = session.SSHSession
938 self.subsystemLookup["sftp"] = FileTransferServer
940 self._client = client
941 self._root = rootnode
942 self._username = username
943 self._convergence = client.convergence
945 # maps from UTF-8 paths for this user, to files written and still open
946 self._heisenfiles = {}
948 def gotVersion(self, otherVersion, extData):
949 self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
951 # advertise the same extensions as the OpenSSH SFTP server
952 # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
953 return {'posix-rename@openssh.com': '1',
954 'statvfs@openssh.com': '2',
955 'fstatvfs@openssh.com': '2',
959 self.log(".logout()", level=OPERATIONAL)
961 for files in self._heisenfiles.itervalues():
965 def _add_heisenfiles_by_path(self, userpath, files):
966 if noisy: self.log("._add_heisenfiles_by_path(%r, %r)" % (userpath, files), level=NOISY)
968 if userpath in self._heisenfiles:
969 self._heisenfiles[userpath] += files
971 self._heisenfiles[userpath] = files
973 def _add_heisenfiles_by_direntry(self, direntry, files_to_add):
974 if noisy: self.log("._add_heisenfiles_by_direntry(%r, %r)" % (direntry, files_to_add), level=NOISY)
977 if direntry in all_heisenfiles:
978 (old_files, opentime) = all_heisenfiles[direntry]
979 all_heisenfiles[direntry] = (old_files + files_to_add, opentime)
981 all_heisenfiles[direntry] = (files_to_add, time())
983 def _abandon_any_heisenfiles(self, userpath, direntry):
984 if noisy: self.log("._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY)
986 # First we synchronously mark all heisenfiles matching the userpath or direntry
987 # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
988 # each file that we abandoned.
990 # For each file, the call to .abandon() occurs:
991 # * before the file is closed, in which case it will never be committed
992 # (uploaded+linked or published); or
993 # * after it is closed but before it has been close_notified, in which case the
994 # .sync() ensures that it has been committed (successfully or not) before we
997 # This avoids a race that might otherwise cause the file to be committed after
998 # the remove operation has completed.
1000 # We return a Deferred that fires with True if any files were abandoned (this
1001 # does not mean that they were not committed; it is used to determine whether
1002 # a NoSuchChildError from the attempt to delete the file should be suppressed).
1005 if direntry in all_heisenfiles:
1006 (files, opentime) = all_heisenfiles[direntry]
1007 del all_heisenfiles[direntry]
1008 if userpath in self._heisenfiles:
1009 files += self._heisenfiles[userpath]
1010 del self._heisenfiles[userpath]
1015 d = defer.succeed(None)
1017 d.addBoth(lambda ign: f.sync())
1019 d.addBoth(lambda ign: len(files) > 0)
1022 def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
1023 to_userpath, to_parent, to_childname, overwrite=True):
1024 if noisy: self.log("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
1025 (from_userpath, from_parent, from_childname,
1026 to_userpath, to_parent, to_childname, overwrite), level=NOISY)
1028 # First we synchronously rename all heisenfiles matching the userpath or direntry.
1029 # Then we .sync() each file that we renamed.
1031 # For each file, the call to .rename occurs:
1032 # * before the file is closed, in which case it will be committed at the
1034 # * after it is closed but before it has been close_notified, in which case the
1035 # .sync() ensures that it has been committed (successfully or not) before we
1038 # This avoids a race that might otherwise cause the file to be committed at the
1039 # old name after the rename operation has completed.
1041 # Note that if overwrite is False, the caller should already have checked
1042 # whether a real direntry exists at the destination. It is possible that another
1043 # direntry (heisen or real) comes to exist at the destination after that check,
1044 # but in that case it is correct for the rename to succeed (and for the commit
1045 # of the heisenfile at the destination to possibly clobber the other entry, since
1046 # that can happen anyway when we have concurrent write handles to the same direntry).
1048 # We return a Deferred that fires with True if any files were renamed (this
1049 # does not mean that they were not committed; it is used to determine whether
1050 # a NoSuchChildError from the rename attempt should be suppressed). If overwrite
1051 # is False and there were already heisenfiles at the destination userpath or
1052 # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
1054 from_direntry = self._direntry_for(from_parent, from_childname)
1055 to_direntry = self._direntry_for(to_parent, to_childname)
1057 if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
1058 def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1059 return defer.execute(_existing)
1062 if from_direntry in all_heisenfiles:
1063 (from_files, opentime) = all_heisenfiles[from_direntry]
1064 del all_heisenfiles[from_direntry]
1065 if from_userpath in self._heisenfiles:
1066 from_files += self._heisenfiles[from_userpath]
1067 del self._heisenfiles[from_userpath]
1069 self._add_heisenfiles_by_direntry(to_direntry, from_files)
1070 self._add_heisenfiles_by_path(to_userpath, from_files)
1072 for f in from_files:
1073 f.rename(to_userpath, to_parent, to_childname)
1075 d = defer.succeed(None)
1076 for f in from_files:
1077 d.addBoth(lambda ign: f.sync())
1079 d.addBoth(lambda ign: len(from_files) > 0)
1082 def _sync_heisenfiles(self, userpath, direntry, ignore=None):
1083 request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
1084 self.log(request, level=OPERATIONAL)
1087 if direntry in all_heisenfiles:
1088 (files, opentime) = all_heisenfiles[direntry]
1089 if userpath in self._heisenfiles:
1090 files += self._heisenfiles[userpath]
1092 if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1094 d = defer.succeed(None)
1096 if not (f is ignore):
1097 def _sync(ign, current_f):
1098 if noisy: self.log("_sync %r in %r" % (current_f, request), level=NOISY)
1099 return current_f.sync()
1103 self.log("done %r" % (request,), level=OPERATIONAL)
1108 def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
1109 if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
1111 direntry = self._direntry_for(parent, childname)
1112 if direntry in all_heisenfiles:
1113 (all_old_files, opentime) = all_heisenfiles[direntry]
1114 all_new_files = [f for f in all_old_files if f is not file_to_remove]
1115 if len(all_new_files) > 0:
1116 all_heisenfiles[direntry] = (all_new_files, opentime)
1118 del all_heisenfiles[direntry]
1120 if userpath in self._heisenfiles:
1121 old_files = self._heisenfiles[userpath]
1122 new_files = [f for f in old_files if f is not file_to_remove]
1123 if len(new_files) > 0:
1124 self._heisenfiles[userpath] = new_files
1126 del self._heisenfiles[userpath]
1128 def _direntry_for(self, filenode_or_parent, childname=None):
1129 if filenode_or_parent:
1130 rw_uri = filenode_or_parent.get_write_uri()
1131 if rw_uri and childname:
1132 return rw_uri + "/" + childname.encode('utf-8')
1138 def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
1139 if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
1140 (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
1143 assert metadata is None or 'readonly' in metadata, metadata
1145 writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
1147 direntry = self._direntry_for(parent, childname)
1149 direntry = self._direntry_for(filenode)
1151 d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
1153 if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
1154 d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
1158 close_notify = self._remove_heisenfile
1160 d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
1161 def _got_file(file):
1163 self._add_heisenfiles_by_direntry(direntry, [file])
1164 return file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
1165 d.addCallback(_got_file)
1168 def openFile(self, pathstring, flags, attrs):
1169 request = ".openFile(%r, %r = %r, %r)" % (pathstring, flags, _repr_flags(flags), attrs)
1170 self.log(request, level=OPERATIONAL)
1172 # This is used for both reading and writing.
1173 # First exclude invalid combinations of flags, and empty paths.
1175 if not (flags & (FXF_READ | FXF_WRITE)):
1176 def _bad_readwrite():
1177 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
1178 return defer.execute(_bad_readwrite)
1180 if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
1181 def _bad_exclcreat():
1182 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
1183 return defer.execute(_bad_exclcreat)
1185 path = self._path_from_string(pathstring)
1187 def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
1188 return defer.execute(_emptypath)
1190 # The combination of flags is potentially valid.
1192 # To work around clients that have race condition bugs, a getAttr, rename, or
1193 # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
1194 # should succeed even if the 'open' request has not yet completed. So we now
1195 # synchronously add a file object into the self._heisenfiles dict, indexed
1196 # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
1197 # because we don't yet have a user-independent path for the file.) The file
1198 # object does not know its filenode, parent, or childname at this point.
1200 userpath = self._path_to_utf8(path)
1202 if flags & (FXF_WRITE | FXF_CREAT):
1203 file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
1204 self._add_heisenfiles_by_path(userpath, [file])
1206 # We haven't decided which file implementation to use yet.
1209 # Now there are two major cases:
1211 # 1. The path is specified as /uri/FILECAP, with no parent directory.
1212 # If the FILECAP is mutable and writeable, then we can open it in write-only
1213 # or read/write mode (non-exclusively), otherwise we can only open it in
1214 # read-only mode. The open should succeed immediately as long as FILECAP is
1215 # a valid known filecap that grants the required permission.
1217 # 2. The path is specified relative to a parent. We find the parent dirnode and
1218 # get the child's URI and metadata if it exists. There are four subcases:
1219 # a. the child does not exist: FXF_CREAT must be set, and we must be able
1220 # to write to the parent directory.
1221 # b. the child exists but is not a valid known filecap: fail
1222 # c. the child is mutable: if we are trying to open it write-only or
1223 # read/write, then we must be able to write to the file.
1224 # d. the child is immutable: if we are trying to open it write-only or
1225 # read/write, then we must be able to write to the parent directory.
1227 # To reduce latency, open normally succeeds as soon as these conditions are
1228 # met, even though there might be a failure in downloading the existing file
1229 # or uploading a new one. However, there is an exception: if a file has been
1230 # written, then closed, and is now being reopened, then we have to delay the
1231 # open until the previous upload/publish has completed. This is necessary
1232 # because sshfs does not wait for the result of an FXF_CLOSE message before
1233 # reporting to the client that a file has been closed. It applies both to
1234 # mutable files, and to directory entries linked to an immutable file.
1236 # Note that the permission checks below are for more precise error reporting on
1237 # the open call; later operations would fail even if we did not make these checks.
1239 d = self._get_root(path)
1240 def _got_root( (root, path) ):
1241 if root.is_unknown():
1242 raise SFTPError(FX_PERMISSION_DENIED,
1243 "cannot open an unknown cap (or child of an unknown directory). "
1244 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1247 if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
1248 if not IFileNode.providedBy(root):
1249 raise SFTPError(FX_PERMISSION_DENIED,
1250 "cannot open a directory cap")
1251 if (flags & FXF_WRITE) and root.is_readonly():
1252 raise SFTPError(FX_PERMISSION_DENIED,
1253 "cannot write to a non-writeable filecap without a parent directory")
1254 if flags & FXF_EXCL:
1255 raise SFTPError(FX_FAILURE,
1256 "cannot create a file exclusively when it already exists")
1258 # The file does not need to be added to all_heisenfiles, because it is not
1259 # associated with a directory entry that needs to be updated.
1261 return self._make_file(file, userpath, flags, filenode=root)
1264 childname = path[-1]
1265 if noisy: self.log("case 2: root = %r, childname = %r, path[:-1] = %r" %
1266 (root, childname, path[:-1]), level=NOISY)
1267 d2 = root.get_child_at_path(path[:-1])
1268 def _got_parent(parent):
1269 if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
1270 if parent.is_unknown():
1271 raise SFTPError(FX_PERMISSION_DENIED,
1272 "cannot open an unknown cap (or child of an unknown directory). "
1273 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1275 parent_readonly = parent.is_readonly()
1276 d3 = defer.succeed(None)
1277 if flags & FXF_EXCL:
1278 # FXF_EXCL means that the link to the file (not the file itself) must
1279 # be created atomically wrt updates by this storage client.
1280 # That is, we need to create the link before returning success to the
1281 # SFTP open request (and not just on close, as would normally be the
1282 # case). We make the link initially point to a zero-length LIT file,
1283 # which is consistent with what might happen on a POSIX filesystem.
1286 raise SFTPError(FX_FAILURE,
1287 "cannot create a file exclusively when the parent directory is read-only")
1289 # 'overwrite=False' ensures failure if the link already exists.
1290 # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
1292 zero_length_lit = "URI:LIT:"
1293 if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
1294 (parent, zero_length_lit, childname), level=NOISY)
1295 d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit, overwrite=False))
1296 def _seturi_done(child):
1297 if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
1298 d4 = parent.get_metadata_for(childname)
1299 d4.addCallback(lambda metadata: (child, metadata))
1301 d3.addCallback(_seturi_done)
1303 if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
1304 d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
1306 def _got_child( (filenode, metadata) ):
1307 if noisy: self.log("_got_child( (%r, %r) )" % (filenode, metadata), level=NOISY)
1309 if filenode.is_unknown():
1310 raise SFTPError(FX_PERMISSION_DENIED,
1311 "cannot open an unknown cap. Upgrading the gateway "
1312 "to a later Tahoe-LAFS version may help")
1313 if not IFileNode.providedBy(filenode):
1314 raise SFTPError(FX_PERMISSION_DENIED,
1315 "cannot open a directory as if it were a file")
1316 if (flags & FXF_WRITE) and filenode.is_mutable() and filenode.is_readonly():
1317 raise SFTPError(FX_PERMISSION_DENIED,
1318 "cannot open a read-only mutable file for writing")
1319 if (flags & FXF_WRITE) and parent_readonly:
1320 raise SFTPError(FX_PERMISSION_DENIED,
1321 "cannot open a file for writing when the parent directory is read-only")
1323 metadata['readonly'] = _is_readonly(parent_readonly, filenode)
1324 return self._make_file(file, userpath, flags, parent=parent, childname=childname,
1325 filenode=filenode, metadata=metadata)
1327 if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
1328 f.trap(NoSuchChildError)
1330 if not (flags & FXF_CREAT):
1331 raise SFTPError(FX_NO_SUCH_FILE,
1332 "the file does not exist, and was not opened with the creation (CREAT) flag")
1334 raise SFTPError(FX_PERMISSION_DENIED,
1335 "cannot create a file when the parent directory is read-only")
1337 return self._make_file(file, userpath, flags, parent=parent, childname=childname)
1338 d3.addCallbacks(_got_child, _no_child)
1341 d2.addCallback(_got_parent)
1344 d.addCallback(_got_root)
1345 def _remove_on_error(err):
1347 self._remove_heisenfile(userpath, None, None, file)
1349 d.addErrback(_remove_on_error)
1350 d.addBoth(_convert_error, request)
1353 def renameFile(self, from_pathstring, to_pathstring, overwrite=False):
1354 request = ".renameFile(%r, %r)" % (from_pathstring, to_pathstring)
1355 self.log(request, level=OPERATIONAL)
1357 from_path = self._path_from_string(from_pathstring)
1358 to_path = self._path_from_string(to_pathstring)
1359 from_userpath = self._path_to_utf8(from_path)
1360 to_userpath = self._path_to_utf8(to_path)
1362 # the target directory must already exist
1363 d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
1364 self._get_parent_or_node(to_path)])
1365 def _got( (from_pair, to_pair) ):
1366 if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
1367 (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
1368 (from_parent, from_childname) = from_pair
1369 (to_parent, to_childname) = to_pair
1371 if from_childname is None:
1372 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
1373 if to_childname is None:
1374 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename to a destination specified by URI")
1376 # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
1377 # "It is an error if there already exists a file with the name specified
1379 # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
1381 # For the standard SSH_FXP_RENAME operation, overwrite=False.
1382 # We also support the posix-rename@openssh.com extension, which uses overwrite=True.
1384 d2 = defer.fail(NoSuchChildError())
1386 d2.addCallback(lambda ign: to_parent.get(to_childname))
1387 def _expect_fail(res):
1388 if not isinstance(res, Failure):
1389 raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1391 # It is OK if we fail for errors other than NoSuchChildError, since that probably
1392 # indicates some problem accessing the destination directory.
1393 res.trap(NoSuchChildError)
1394 d2.addBoth(_expect_fail)
1396 # If there are heisenfiles to be written at the 'from' direntry, then ensure
1397 # they will now be written at the 'to' direntry instead.
1398 d2.addCallback(lambda ign:
1399 self._rename_heisenfiles(from_userpath, from_parent, from_childname,
1400 to_userpath, to_parent, to_childname, overwrite=overwrite))
1403 # FIXME: use move_child_to_path to avoid possible data loss due to #943
1404 #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
1406 d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
1408 if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
1409 (err, from_pathstring, to_pathstring, overwrite), level=NOISY)
1411 if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
1413 if not overwrite and err.check(ExistingChildError):
1414 raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1419 d2.addCallback(_move)
1422 d.addBoth(_convert_error, request)
1425 def makeDirectory(self, pathstring, attrs):
1426 request = ".makeDirectory(%r, %r)" % (pathstring, attrs)
1427 self.log(request, level=OPERATIONAL)
1429 path = self._path_from_string(pathstring)
1430 metadata = self._attrs_to_metadata(attrs)
1431 d = self._get_root(path)
1432 d.addCallback(lambda (root, path):
1433 self._get_or_create_directories(root, path, metadata))
1434 d.addBoth(_convert_error, request)
1437 def _get_or_create_directories(self, node, path, metadata):
1438 if not IDirectoryNode.providedBy(node):
1439 # TODO: provide the name of the blocking file in the error message.
1440 def _blocked(): raise SFTPError(FX_FAILURE, "cannot create directory because there "
1441 "is a file in the way") # close enough
1442 return defer.execute(_blocked)
1445 return defer.succeed(node)
1446 d = node.get(path[0])
1447 def _maybe_create(f):
1448 f.trap(NoSuchChildError)
1449 return node.create_subdirectory(path[0])
1450 d.addErrback(_maybe_create)
1451 d.addCallback(self._get_or_create_directories, path[1:], metadata)
1454 def removeFile(self, pathstring):
1455 request = ".removeFile(%r)" % (pathstring,)
1456 self.log(request, level=OPERATIONAL)
1458 path = self._path_from_string(pathstring)
1459 d = self._remove_object(path, must_be_file=True)
1460 d.addBoth(_convert_error, request)
1463 def removeDirectory(self, pathstring):
1464 request = ".removeDirectory(%r)" % (pathstring,)
1465 self.log(request, level=OPERATIONAL)
1467 path = self._path_from_string(pathstring)
1468 d = self._remove_object(path, must_be_directory=True)
1469 d.addBoth(_convert_error, request)
1472 def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1473 userpath = self._path_to_utf8(path)
1474 d = self._get_parent_or_node(path)
1475 def _got_parent( (parent, childname) ):
1476 if childname is None:
1477 raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
1479 direntry = self._direntry_for(parent, childname)
1480 d2 = defer.succeed(False)
1481 if not must_be_directory:
1482 d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
1484 d2.addCallback(lambda abandoned:
1485 parent.delete(childname, must_exist=not abandoned,
1486 must_be_directory=must_be_directory, must_be_file=must_be_file))
1488 d.addCallback(_got_parent)
1491 def openDirectory(self, pathstring):
1492 request = ".openDirectory(%r)" % (pathstring,)
1493 self.log(request, level=OPERATIONAL)
1495 path = self._path_from_string(pathstring)
1496 d = self._get_parent_or_node(path)
1497 def _got_parent_or_node( (parent_or_node, childname) ):
1498 if noisy: self.log("_got_parent_or_node( (%r, %r) ) in openDirectory(%r)" %
1499 (parent_or_node, childname, pathstring), level=NOISY)
1500 if childname is None:
1501 return parent_or_node
1503 return parent_or_node.get(childname)
1504 d.addCallback(_got_parent_or_node)
1506 if dirnode.is_unknown():
1507 raise SFTPError(FX_PERMISSION_DENIED,
1508 "cannot list an unknown cap as a directory. Upgrading the gateway "
1509 "to a later Tahoe-LAFS version may help")
1510 if not IDirectoryNode.providedBy(dirnode):
1511 raise SFTPError(FX_PERMISSION_DENIED,
1512 "cannot list a file as if it were a directory")
1515 def _render(children):
1516 parent_readonly = dirnode.is_readonly()
1518 for filename, (child, metadata) in children.iteritems():
1519 # The file size may be cached or absent.
1520 metadata['readonly'] = _is_readonly(parent_readonly, child)
1521 attrs = _populate_attrs(child, metadata)
1522 filename_utf8 = filename.encode('utf-8')
1523 longname = _lsLine(filename_utf8, attrs)
1524 results.append( (filename_utf8, longname, attrs) )
1525 return StoppableList(results)
1526 d2.addCallback(_render)
1528 d.addCallback(_list)
1529 d.addBoth(_convert_error, request)
1532 def getAttrs(self, pathstring, followLinks):
1533 request = ".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks)
1534 self.log(request, level=OPERATIONAL)
1536 # When asked about a specific file, report its current size.
1537 # TODO: the modification time for a mutable file should be
1538 # reported as the update time of the best version. But that
1539 # information isn't currently stored in mutable shares, I think.
1541 # Some clients will incorrectly try to get the attributes
1542 # of a file immediately after opening it, before it has been put
1543 # into the all_heisenfiles table. This is a race condition bug in
1544 # the client, but we probably need to handle it anyway.
1546 path = self._path_from_string(pathstring)
1547 userpath = self._path_to_utf8(path)
1548 d = self._get_parent_or_node(path)
1549 def _got_parent_or_node( (parent_or_node, childname) ):
1550 if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1552 direntry = self._direntry_for(parent_or_node, childname)
1553 d2 = self._sync_heisenfiles(userpath, direntry)
1555 if childname is None:
1556 node = parent_or_node
1557 d2.addCallback(lambda ign: node.get_current_size())
1558 d2.addCallback(lambda size:
1559 _populate_attrs(node, {'readonly': node.is_unknown() or node.is_readonly()}, size=size))
1561 parent = parent_or_node
1562 d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
1563 def _got( (child, metadata) ):
1564 if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
1565 assert IDirectoryNode.providedBy(parent), parent
1566 metadata['readonly'] = _is_readonly(parent.is_readonly(), child)
1567 d3 = child.get_current_size()
1568 d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
1571 if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
1572 err.trap(NoSuchChildError)
1573 direntry = self._direntry_for(parent, childname)
1574 if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
1575 (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
1576 if direntry in all_heisenfiles:
1577 (files, opentime) = all_heisenfiles[direntry]
1578 sftptime = _to_sftp_time(opentime)
1579 # A file that has been opened for writing necessarily has permissions rw-rw-rw-.
1580 return {'permissions': S_IFREG | 0666,
1582 'createtime': sftptime,
1588 d2.addCallbacks(_got, _nosuch)
1590 d.addCallback(_got_parent_or_node)
1591 d.addBoth(_convert_error, request)
1594 def setAttrs(self, pathstring, attrs):
1595 self.log(".setAttrs(%r, %r)" % (pathstring, attrs), level=OPERATIONAL)
1598 # this would require us to download and re-upload the truncated/extended
1600 def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute unsupported")
1601 return defer.execute(_unsupported)
1602 return defer.succeed(None)
1604 def readLink(self, pathstring):
1605 self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1607 def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1608 return defer.execute(_unsupported)
1610 def makeLink(self, linkPathstring, targetPathstring):
1611 self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1613 # If this is implemented, note the reversal of arguments described in point 7 of
1614 # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>.
1616 def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1617 return defer.execute(_unsupported)
1619 def extendedRequest(self, extensionName, extensionData):
1620 self.log(".extendedRequest(%r, <data of length %r>)" % (extensionName, len(extensionData)), level=OPERATIONAL)
1622 # We implement the three main OpenSSH SFTP extensions; see
1623 # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1625 if extensionName == 'posix-rename@openssh.com':
1626 def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
1628 (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
1629 if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
1631 (toPathLen,) = struct.unpack('>L', extensionData[(4 + fromPathLen):(8 + fromPathLen)])
1632 if 8 + fromPathLen + toPathLen != len(extensionData): return defer.execute(_bad)
1634 fromPathstring = extensionData[4:(4 + fromPathLen)]
1635 toPathstring = extensionData[(8 + fromPathLen):]
1636 d = self.renameFile(fromPathstring, toPathstring, overwrite=True)
1638 # Twisted conch assumes that the response from an extended request is either
1639 # an error, or an FXP_EXTENDED_REPLY. But it happens to do the right thing
1640 # (respond with an FXP_STATUS message) if we return a Failure with code FX_OK.
1641 def _succeeded(ign):
1642 raise SFTPError(FX_OK, "request succeeded")
1643 d.addCallback(_succeeded)
1646 if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
1647 return defer.succeed(struct.pack('>11Q',
1648 1024, # uint64 f_bsize /* file system block size */
1649 1024, # uint64 f_frsize /* fundamental fs block size */
1650 628318530, # uint64 f_blocks /* number of blocks (unit f_frsize) */
1651 314159265, # uint64 f_bfree /* free blocks in file system */
1652 314159265, # uint64 f_bavail /* free blocks for non-root */
1653 200000000, # uint64 f_files /* total file inodes */
1654 100000000, # uint64 f_ffree /* free file inodes */
1655 100000000, # uint64 f_favail /* free file inodes for non-root */
1656 0x1AF5, # uint64 f_fsid /* file system id */
1657 2, # uint64 f_flag /* bit mask = ST_NOSUID; not ST_RDONLY */
1658 65535, # uint64 f_namemax /* maximum filename length */
1661 def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "unsupported %r request <data of length %r>" %
1662 (extensionName, len(extensionData)))
1663 return defer.execute(_unsupported)
1665 def realPath(self, pathstring):
1666 self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1668 return self._path_to_utf8(self._path_from_string(pathstring))
1670 def _path_to_utf8(self, path):
1671 return (u"/" + u"/".join(path)).encode('utf-8')
1673 def _path_from_string(self, pathstring):
1674 if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1676 # The home directory is the root directory.
1677 pathstring = pathstring.strip("/")
1678 if pathstring == "" or pathstring == ".":
1681 path_utf8 = pathstring.split("/")
1683 # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1684 # "Servers SHOULD interpret a path name component ".." as referring to
1685 # the parent directory, and "." as referring to the current directory."
1687 for p_utf8 in path_utf8:
1689 # ignore excess .. components at the root
1694 p = p_utf8.decode('utf-8', 'strict')
1695 except UnicodeError:
1696 raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1699 if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1702 def _get_root(self, path):
1703 # return Deferred (root, remaining_path)
1704 d = defer.succeed(None)
1705 if path and path[0] == u"uri":
1706 d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
1707 d.addCallback(lambda root: (root, path[2:]))
1709 d.addCallback(lambda ign: (self._root, path))
1712 def _get_parent_or_node(self, path):
1713 # return Deferred (parent, childname) or (node, None)
1714 d = self._get_root(path)
1715 def _got_root( (root, remaining_path) ):
1716 if not remaining_path:
1719 d2 = root.get_child_at_path(remaining_path[:-1])
1720 d2.addCallback(lambda parent: (parent, remaining_path[-1]))
1722 d.addCallback(_got_root)
1725 def _attrs_to_metadata(self, attrs):
1729 if key == "mtime" or key == "ctime" or key == "createtime":
1730 metadata[key] = long(attrs[key])
1731 elif key.startswith("ext_"):
1732 metadata[key] = str(attrs[key])
1737 class SFTPUser(ConchUser, PrefixingLogMixin):
1738 implements(ISession)
1739 def __init__(self, check_abort, client, rootnode, username, convergence):
1740 ConchUser.__init__(self)
1741 PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1743 self.channelLookup["session"] = session.SSHSession
1744 self.subsystemLookup["sftp"] = FileTransferServer
1746 self.check_abort = check_abort
1747 self.client = client
1748 self.root = rootnode
1749 self.username = username
1750 self.convergence = convergence
1752 def getPty(self, terminal, windowSize, attrs):
1753 self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1754 raise NotImplementedError
1756 def openShell(self, protocol):
1757 self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1758 raise NotImplementedError
1760 def execCommand(self, protocol, cmd):
1761 self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1762 raise NotImplementedError
1764 def windowChanged(self, newWindowSize):
1765 self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1768 self.log(".eofReceived()", level=OPERATIONAL)
1771 self.log(".closed()", level=OPERATIONAL)
1774 # if you have an SFTPUser, and you want something that provides ISFTPServer,
1775 # then you get SFTPHandler(user)
1776 components.registerAdapter(SFTPHandler, SFTPUser, ISFTPServer)
1778 from auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1781 implements(portal.IRealm)
1782 def __init__(self, client):
1783 self._client = client
1785 def requestAvatar(self, avatarID, mind, interface):
1786 assert interface == IConchUser, interface
1787 rootnode = self._client.create_node_from_uri(avatarID.rootcap)
1788 handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
1789 return (interface, handler, handler.logout)
1792 class SFTPServer(service.MultiService):
1793 def __init__(self, client, accountfile, accounturl,
1794 sftp_portstr, pubkey_file, privkey_file):
1795 service.MultiService.__init__(self)
1797 r = Dispatcher(client)
1798 p = portal.Portal(r)
1801 c = AccountFileChecker(self, accountfile)
1802 p.registerChecker(c)
1804 c = AccountURLChecker(self, accounturl)
1805 p.registerChecker(c)
1806 if not accountfile and not accounturl:
1807 # we could leave this anonymous, with just the /uri/CAP form
1808 raise NeedRootcapLookupScheme("must provide an account file or URL")
1810 pubkey = keys.Key.fromFile(pubkey_file)
1811 privkey = keys.Key.fromFile(privkey_file)
1812 class SSHFactory(factory.SSHFactory):
1813 publicKeys = {pubkey.sshType(): pubkey}
1814 privateKeys = {privkey.sshType(): privkey}
1815 def getPrimes(self):
1817 # if present, this enables diffie-hellman-group-exchange
1818 return primes.parseModuliFile("/etc/ssh/moduli")
1825 s = strports.service(sftp_portstr, f)
1826 s.setServiceParent(self)